六问六答理解 ForkJoin 原理
摘要:ForkJoin 线程池是将任务分割为子任务,有可能子任务还是很大,还需要进一步拆解,最终得到足够小的任务。
本文分享自华为云社区《ForkJoin线程池的学习和思考》,作者:breakDraw。
ForkJoin 线程池在常规的 java 书籍里还是提到比较少的,毕竟是 java8 引入的产物。
首先这里简单解释一下 forkJoin 的运作原理,本质上有点像归并计算。
1. 他会将提交大任务按照一定规则拆解(fork)成多个小任务
2. 当任务小到一定程度时,就会执行计算
3. 执行完成时会和其他的小任务进行合并(join),逐步将所有小结果合成一个大结果。
可以看这个 forkJoinTask 的实现伪代码,即如果想使用 forkJoin 并发执行任务,需要自己把任务继承 RecursiveTask,作为 forkJoin 池的 submit 对象:
然后实际上整个 forkjoin 的细节非常多,这里我通过给自己提好几个问题,来逐步理解 forkJoin 的原理
Q: forkJoin 中各个线程是如何获取那些小任务的呢?
A:他是通过工作密取的方式获取。(java 并发那本书里提到过工作密取 workSteal,原来是用在这了)
假设我们给 forkJoin 设置 3 个工作线程,那么就会有 3 个工作队列, 注意,这个队列是双端队列。
每当执行任务时,如果不满足小任务的条件,他会 fork 出 2 个子任务,并 push 进自己的工作队列中。
每个工作线程不断取自己队头的任务执行。
关键点:如果自己队列里没有数据,则会从其他队列的队尾取数据。
Q: fork 时具体发生了什么?
A:是一个异步的操作, 就是向当前线程队列中添加这个 fork 出来任务,能放进去的话就返回,不会等待。注意,默认 fork 出的任务是先默认给自己的。 当自己做不完时,才可能被别人取走!
Q:join 是什么含义?什么时候做的?
A:见实现 forkJoin 任务接口时的代码:
可以看到时每次 fork 完之后,通过 join,来获取子 task 的结果,获取到之后,再合并计算,返回结果。
Q: join 这个阻塞过程是怎么做的?如果把线程挂起,那这个线程岂不是无法工作了?
A:首先,之前 fork 时,新的子任务已经被放入队列了。
每个子任务都有一个任务状态。当调用该子任务的 join 时, 会循环判断他的状态
如果这个子任务状态未完成, 则从自身队列或其他人的队列中取出新的任务执行,因此进入了下一层的 exec()操作。如果发现子任务状态更新为了完成(这个更新动作可能是自己线程完成的,也可能是别的线程完成的,反正这个任务的状态实现了同步和可见), 则将结果返回给上层。因此 join 的本质是一个递归的过程, 任务没完成的话,他就取其他任务继续递归往下执行。
更详细的可以看这个链接 fork+join 过程详细解读
Q: forkJoin 存放任务的时候,怎么保证不会出现并发问题?比如同时往队尾插入的话
A:
n 个工作线程是通过数组存放的(即有一个工作线程数组)
sun.misc.Unsafe 操作类直接基于操作系统控制层在硬件层面上进行原子操作,它是 ForkJoinPool 高效性能的一大保证,类似的编程思路还体现在 java.util.concurrent 包中相当规模的类功能实现中。
Q: forkJoin 应用在哪吗?
A:java8 stream 的 parallel 并发功能就是基于 forkJoin 做的, parallelStream 实现的 forkJoin 拆解任务和执行任务的接口,默认用机器所有 CPU 数量的 forkJoin 线程池。如果需要限制线程数量,可以用 new forkJoin(线程数).submit(()->(list.stream().parallel().map()…)); 即可
版权声明: 本文为 InfoQ 作者【华为云开发者社区】的原创文章。
原文链接:【http://xie.infoq.cn/article/6dc3b0ebf87d71bc52144ce90】。文章转载请联系作者。
评论