从Java 7开始引入了一种新的线程池 ForkJoinPool。和 ThreadPoolExecutor 一样,也实现了 Executor 和 ExecutorService 接口。优势在于可以充分利用多核cpu的优势,把一个任务拆分成多个“小任务”分发到不同的cpu核心上执行,执行完后再把结果收集到一起返回。典型的应用比如快速排序算法。这里的要点在于,ForkJoinPool需要使用相对少的线程来处理大量的任务。
1、ForkJoinPool 与 newWorkStealingPool
1.1 ForkJoinPool
ForkJoinPool 使用分治法(Divide-and-Conquer Algorithm) 思想来解决问题,它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,就会以当前计算机可用的CPU数量作为线程数量的默认值。
-
ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。
-
ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等。
-
ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。
1.2 newWorkStealingPool
使用 newWorkStealingPool 可以创建一个抢占式执行的线程池(任务执行顺序不确定),是Java 8 新增创建线程池的方法,只有在 JDK 1.8+ 版本中才能使用。创建时如果不设置任何参数,则以当前机器处理器个数作为线程个数,此线程池会并行处理任务,不能保证执行顺序。底层用的 ForkJoinPool 来实现的。
- newWorkStealingPool 通过工作窃取的方式,使得多核的 CPU 不会闲置,总会有活着的线程让 CPU 去运行。
- 使用场景:能够合理的使用CPU进行对任务的并行操作,适合使用在很耗时的任务中。底层用的ForkJoinPool 来实现的。
- newWorkStealingPool使用参考:Java并发编程之Executors线程池工厂详解实战
2、fork-join核心概念
fork-join框架允许在几个worker上中断某个任务,然后等待结果将它们组合起来。 它在很大程度上利用了多处理器机器的容量。
2.1 Fork
Fork是一个过程,在这个过程中,任务将自身分成较小且独立的子任务,这些子任务可以同时执行。
语法 (Syntax)
Sum left = new Sum(array, low, mid);
left.fork();
这里Sum是RecursiveTask的子类,left.fork()将任务转换为子任务。
2.2 Join
Join是一个任务在子任务完成执行后加入子任务的所有结果的过程,否则它会一直等待。
语法 (Syntax)
left.join();
left是Sum类的一个对象。
2.3 ForkJoinPool
是一个特殊的线程池,设计用于fork-and-join任务拆分。
语法 (Syntax)
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
这是一个新的ForkJoinPool,具有4个并行级别的CPU。
2.4 RecursiveAction
RecursiveAction表示不返回任何值的任务。
语法 (Syntax)
class Writer extends RecursiveAction {
@Override
protected void compute() { }
}
2.5 RecursiveTask
RecursiveTask表示返回值的任务。
语法 (Syntax)
class Sum extends RecursiveTask<Long> {
@Override
protected Long compute() { return null; }
}
3、ForkJoinPool实战
下面我们尝试用不同的解决方案计算1至10000000的正整数之和。
3.1 for循环方案
最简单的方案,没有使用任何并行编程的手段,只是用最直白的 for-loop 来实现。面向接口编程,我们把计算的方法定义成接口,不同的方案书写不同的实现即可。
public interface Calculator {
/**
* 把传进来的所有numbers 做求和处理
*
* @param numbers
* @return 总和
*/
long sumUp(long[] numbers);
}
for loop的实现
public class ForLoopCalculator implements Calculator {
@Override
public long sumUp(long[] numbers) {
long total = 0;
for (long i : numbers) {
total += i;
}
return total;
}
}
main测试方法
public static void main(String[] args) {
long[] numbers = LongStream.rangeClosed(1, 10000000).toArray();
Instant start = Instant.now();
Calculator calculator = new ForLoopCalculator();
long result = calculator.sumUp(numbers);
Instant end = Instant.now();
System.out.println("耗时:" + Duration.between(start, end).toMillis() + "ms");
System.out.println("结果为:" + result);
}
代码运行结果:
耗时:10ms
结果为:50000005000000
3.2 ExecutorService多线程方式实现方案
在 Java 1.5 引入 ExecutorService 之后,可以很便捷的创建线程池。由于方案一中已经做了面向接口的设计,因此我们只需要加一个使用 ExecutorService 的实现类:
ExecutorService的实现
public class ExecutorServiceCalculator implements Calculator {
private int parallism;
private ExecutorService pool;
public ExecutorServiceCalculator() {
parallism = Runtime.getRuntime().availableProcessors(); // CPU的核心数 默认就用cpu核心数了
pool = Executors.newFixedThreadPool(parallism);
}
//处理计算任务的线程
private static class SumTask implements Callable<Long> {
private long[] numbers;
private int from;
private int to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
@Override
public Long call() {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
}
}
@Override
public long sumUp(long[] numbers) {
List<Future<Long>> results = new ArrayList<>();
// 把任务分解为 n 份,交给 n 个线程处理 4核心 就等分成4份呗
// 然后把每一份都扔个一个SumTask线程 进行处理
int part = numbers.length / parallism;
for (int i = 0; i < parallism; i++) {
int from = i * part; //开始位置
int to = (i == parallism - 1) ? numbers.length - 1 : (i + 1) * part - 1; //结束位置
//扔给线程池计算
results.add(pool.submit(new SumTask(numbers, from, to)));
}
// 把每个线程的结果相加,得到最终结果 get()方法 是阻塞的
// 优化方案:可以采用CompletableFuture来优化 JDK1.8的新特性
long total = 0L;
for (Future<Long> f : results) {
try {
total += f.get();
} catch (Exception ignore) {
}
}
return total;
}
}
main测试方法
public static void main(String[] args) {
long[] numbers = LongStream.rangeClosed(1, 10000000).toArray();
Instant start = Instant.now();
Calculator calculator = new ExecutorServiceCalculator();
long result = calculator.sumUp(numbers);
Instant end = Instant.now();
System.out.println("耗时:" + Duration.between(start, end).toMillis() + "ms");
System.out.println("结果为:" + result); // 打印结果500500
}
代码运行结果:
耗时:31ms
结果为:50000005000000
3.3 ForkJoinPool(Fork/Join)实现方案
从下述代码中可以看出,使用了 ForkJoinPool 的实现逻辑全部集中在了 compute() 这个函数里,仅用了14行就实现了完整的计算过程。特别是在这段代码里没有显式地“把任务分配给线程”,只是分解了任务,而把具体的任务到线程的映射交给了 ForkJoinPool 来完成。
ForkJoinPool的实现
public class ForkJoinCalculator implements Calculator {
private ForkJoinPool pool;
//执行任务RecursiveTask:有返回值 RecursiveAction:无返回值
private static class SumTask extends RecursiveTask<Long> {
private long[] numbers;
private int from;
private int to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
//此方法为ForkJoin的核心方法:对任务进行拆分 拆分的好坏决定了效率的高低
@Override
protected Long compute() {
// 当需要计算的数字个数小于6时,直接采用for loop方式计算结果
if (to - from < 6) {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
} else { // 否则,把任务一分为二,递归拆分(注意此处有递归)到底拆分成多少分 需要根据具体情况而定
int middle = (from + to) / 2;
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle + 1, to);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}
public ForkJoinCalculator() {
// 也可以使用公用的线程池 ForkJoinPool.commonPool():
// pool = ForkJoinPool.commonPool()
pool = new ForkJoinPool();
}
@Override
public long sumUp(long[] numbers) {
Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
pool.shutdown();
return result;
}
}
main测试方法
public static void main(String[] args) {
long[] numbers = LongStream.rangeClosed(1, 10000000).toArray();
Instant start = Instant.now();
Calculator calculator = new ForkJoinCalculator();
long result = calculator.sumUp(numbers);
Instant end = Instant.now();
System.out.println("耗时:" + Duration.between(start, end).toMillis() + "ms");
System.out.println("结果为:" + result); // 打印结果500500
}
代码运行结果:
耗时:390ms
结果为:50000005000000
3.4 并行流实现方案(JDK8以后的推荐做法)
并行流底层还是Fork/Join框架,只是任务拆分优化得很好,是JDK8以后的推荐做法。
- 效率方面解释:Fork/Join 并行流等当计算的数字非常大的时候,优势才能体现出来。也就是说,如果你的计算比较小,或者不是CPU密集型的任务,不太建议使用并行处理
main方法
public static void main(String[] args) {
Instant start = Instant.now();
long result = LongStream.rangeClosed(0, 10000000L).parallel().reduce(0, Long::sum);
Instant end = Instant.now();
System.out.println("耗时:" + Duration.between(start, end).toMillis() + "ms");
System.out.println("结果为:" + result); // 打印结果500500
}
代码运行结果:
耗时:130ms
结果为:50000005000000
4、work stealing 算法
Fork/Join Framework 的实现算法为 work stealing 算法。并不是每个 fork() 都会促成一个新线程被创建,而每个 join() 也不是一定会造成线程被阻塞。fork()会开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。join()则等待该任务的处理线程处理完毕,获得返回值。
-
ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
-
每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
-
每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
-
在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
5、ThreadPoolExecutor和ForkJoinPool的性能差异
比如对1000万数据进行排序,可以将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如当元素的数量小于10时停止分割,转而使用插入排序对它们进行排序。这时所有的任务加起来会有大概2000000+个。
问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。所以当使用 ThreadPoolExecutor 时,使用分治法会存在问题,因为 ThreadPoolExecutor 中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。
而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。
-
使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。这是工作窃取模式的优点
-
使用ThreadPoolExecutor时,是不可能完成具有父子关系的任务的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。
评论区