☕【Java 技术指南】「并发编程专题」Fork/Join 框架基本使用和原理探究(基础篇)
前提概述
Java 7 开始引入了一种新的 Fork/Join 线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。
我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:
算法原理介绍
相信大家此前或多或少有了解到 ForkJoin 框架,ForkJoin 框架其实就是一个线程池 ExecutorService 的实现,通过工作窃取(work-stealing)算法,获取其他线程中未完成的任务来执行。可以充分利用机器的多处理器优势,利用空闲的线程去并行快速完成一个可拆分为小任务的大任务,类似于分治算法。
实现达成目标
ForkJoin 的目标,就是利用所有可用的处理能力来提高程序的响应和性能。本文将介绍 ForkJoin 框架,依次介绍基础特性、案例使用、源码剖析和实现亮点。
java.util.concurrent.ForkJoinPool 由 Java 大师 Doug Lea 主持编写,它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。
基本使用
入门例子,用 Fork/Join 框架使用示例,在这个示例中我们计算了 1-5000 累加后的值:
对此我封装了一个框架集合,基于 JDK1.8+中的 Fork/Join 框架实现,参考的 Fork/Join 框架主要源代码也基于 JDK1.8+。
WorkTaskCallable 实现抽象模型层次操作转换
ArrayListWorkTaskCallable 实现 List 集合层次操作转换
ForkJoin 代码分析
ForkJoinPool 构造函数
parallelism:可并行级别,Fork/Join 框架将依据这个并行级别的设定,决定框架内并行执行的线程数量。并行的每一个任务都会有一个线程进行处理,但是千万不要将这个属性理解成 Fork/Join 框架中最多存在的线程数量。
factory:当 Fork/Join 框架创建一个新的线程时,同样会用到线程创建工厂。只不过这个线程工厂不再需要实现 ThreadFactory 接口,而是需要实现 ForkJoinWorkerThreadFactory 接口。后者是一个函数式接口,只需要实现一个名叫 newThread 的方法。
在 Fork/Join 框架中有一个默认的 ForkJoinWorkerThreadFactory 接口实现:DefaultForkJoinWorkerThreadFactory。
handler:异常捕获处理器。当执行的任务中出现异常,并从任务中被抛出时,就会被 handler 捕获。
asyncMode:这个参数也非常重要,从字面意思来看是指的异步模式,它并不是说 Fork/Join 框架是采用同步模式还是采用异步模式工作。Fork/Join 框架中为每一个独立工作的线程准备了对应的待执行任务队列,这个任务队列是使用数组进行组合的双向队列。即是说存在于队列中的待执行任务,即可以使用先进先出的工作模式,也可以使用后进先出的工作模式。
先进先出
后进先出
当 asyncMode 设置为 true 的时候,队列采用先进先出方式工作;反之则是采用后进先出的方式工作,该值默认为 false
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
需要注意点
ForkJoinPool 一个构造函数只带有 parallelism 参数,既是可以设定 Fork/Join 框架的最大并行任务数量;另一个构造函数则不带有任何参数,对于最大并行任务数量也只是一个默认值——当前操作系统可以使用的 CPU 内核数量(Runtime.getRuntime().availableProcessors())。实际上 ForkJoinPool 还有一个私有的、原生构造函数,之上提到的三个构造函数都是对这个私有的、原生构造函数的调用。
如果你对 Fork/Join 框架没有特定的执行要求,可以直接使用不带有任何参数的构造函数。也就是说推荐基于当前操作系统可以使用的 CPU 内核数作为 Fork/Join 框架内最大并行任务数量,这样可以保证 CPU 在处理并行任务时,尽量少发生任务线程间的运行状态切换(实际上单个 CPU 内核上的线程间状态切换基本上无法避免,因为操作系统同时运行多个线程和多个进程)。
从上面的的类关系图可以看出来,ForkJoin 框架的核心是 ForkJoinPool 类,基于 AbstractExecutorService 扩展(@sun.misc.Contended 注解)。
ForkJoinPool 中维护了一个队列数组 WorkQueue[],每个 WorkQueue 维护一个 ForkJoinTask 数组和当前工作线程。ForkJoinPool 实现了工作窃取(work-stealing)算法并执行 ForkJoinTask。
ForkJoinPool 类的属性介绍
ADD_WORKER: 100000000000000000000000000000000000000000000000 -> 1000 0000 0000 0000,用来配合 ctl 在控制线程数量时使用
ctl: 控制 ForkJoinPool 创建线程数量,(ctl & ADD_WORKER) != 0L 时创建线程,也就是当 ctl 的第 16 位不为 0 时,可以继续创建线程
defaultForkJoinWorkerThreadFactory: 默认线程工厂,默认实现是 DefaultForkJoinWorkerThreadFactory
runState: 全局锁控制,全局运行状态
workQueues: 工作队列数组 WorkQueue[]
config: 记录并行数量和 ForkJoinPool 的模式(异步或同步)
WorkQueue 类
qlock: 并发控制,put 任务时的锁控制
array: 任务数组 ForkJoinTask<?>[]
pool: ForkJoinPool,所有线程和 WorkQueue 共享,用于工作窃取、任务状态和工作状态同步
base: array 数组中取任务的下标
top: array 数组中放置任务的下标
owner: 所属线程,ForkJoin 框架中,只有一个 WorkQueue 是没有 owner 的,其他的均有具体线程 owner
ForkJoinTask 是能够在 ForkJoinPool 中执行的任务抽象类,父类是 Future,具体实现类有很多,这里主要关注 RecursiveAction 和 RecursiveTask。
RecursiveAction 是没有返回结果的任务
RecursiveTask 是需要返回结果的任务。
ForkJoinTask 类属性的介绍
status: 任务的状态,对其他工作线程和 pool 可见,运行正常则 status 为负数,异常情况为正数。
ForkJoinTask 功能介绍
ForkJoinTask 任务是一种能在 Fork/Join 框架中运行的特定任务,也只有这种类型的任务可以在 Fork/Join 框架中被拆分运行和合并运行。
ForkJoinWorkerThread 线程是一种在 Fork/Join 框架中运行的特性线程,它除了具有普通线程的特性外,最主要的特点是每一个 ForkJoinWorkerThread 线程都具有一个独立的任务等待队列(work queue),这个任务队列用于存储在本线程中被拆分的若干子任务。
只需要实现其 compute()方法,在 compute()中做最小任务控制,任务分解(fork)和结果合并(join)。
ForkJoinPool 中执行的默认线程是 ForkJoinWorkerThread,由默认工厂产生,可以自己重写要实现的工作线程。同时会将 ForkJoinPool 引用放在每个工作线程中,供工作窃取时使用。
ForkJoinWorkerThread 类属性介绍
pool: ForkJoinPool,所有线程和 WorkQueue 共享,用于工作窃取、任务状态和工作状态同步。
workQueue: 当前线程的任务队列,与 WorkQueue 的 owner 呼应。
简易执行图
实际上 Fork/Join 框架的内部工作过程要比这张图复杂得多,例如如何决定某一个 recursive task 是使用哪条线程进行运行;再例如如何决定当一个任务/子任务提交到 Fork/Join 框架内部后,是创建一个新的线程去运行还是让它进行队列等待。
逻辑模型图(盗一张图:)
()
fork 方法和 join 方法
Fork/Join 框架中提供的 fork 方法和 join 方法,可以说是该框架中提供的最重要的两个方法,它们和 parallelism“可并行任务数量”配合工作。
Fork 方法介绍
Fork 就是一个不断分枝的过程,在当前任务的基础上长出 n 多个子任务,他将新创建的子任务放入当前线程的 work queue 队列中,Fork/Join 框架将根据当前正在并发执行 ForkJoinTask 任务的 ForkJoinWorkerThread 线程状态,决定是让这个任务在队列中等待,还是创建一个新的 ForkJoinWorkerThread 线程运行它,又或者是唤起其它正在等待任务的 ForkJoinWorkerThread 线程运行它。
当一个 ForkJoinTask 任务调用 fork()方法时,当前线程会把这个任务放入到 queue 数组的 queueTop 位置,然后执行以下两句代码:
当调用 signalWork()方法。signalWork()方法做了两件事:1、唤配当前线程;2、当没有活动线程时或者线程数较少时,添加新的线程。
Join 方法介绍
Join 是一个不断等待,获取任务执行结果的过程。
第 4 行,(s=status)<0 表示这个任务被执行完,直接返回执行结果状态,上层捕获到状态后,决定是要获取结果还是进行错误处理;
第 6 行,从 queue 中取出这个任务来执行,如果执行完了,就设置状态为 NORMAL;
前面 unpushTask()方法在队列中没有这个任务时会返回 false,15 行调用 joinTask 等待这个任务完成。
由于 ForkJoinPool 中有一个数组叫 submissionQueue,通过 submit 方法调用而且非 ForkJoinTask 这种任务会被放到这个队列中。这种任务有可能被非 ForkJoinWorkerThread 线程执行,第 18 行表示如果是这种任务,等待它执行完成。下面来看 joinTask 方法
(1)这里有个常量 MAX_HELP=16,表示帮助 join 的次数。第 11 行,queueTop!=queueBase 表示本地队列中有任务,如果这个任务刚好在队首,则尝试自己执行;否则返回 false。这时 retries 被设置为 0,表示不能帮助,因为自已队列不为空,自己并不空闲。在下一次循环就会进入第 24 行,等待这个任务执行完成。
(2)第 20 行 helpJoinTask()方法返回 false 时,retries-1,连续 8 次都没有帮到忙,就会进入第 14 行,调用 yield 让权等待。没办法人口太差,想做点好事都不行,只有停下来休息一下。
(3)当执行到第 20 行,表示自己队列为空,可以去帮助这个任务了,下面来看是怎么帮助的?
(1)通过查看 stealHint 这个字段的注释可以知道,它表示最近一次谁来偷过我的 queue 中的任务。因此通过 stealHint 并不能找到当前任务被谁偷了?所以第 4 行 v.currentSteal != task 完全可能。这时还有一个办法找到这个任务被谁偷了,看看 currentSteal 这个字段的注释表示最近偷的哪个任务。这里扫描所有偷来的任务与当前任务比较,如果相等,就是这个线程偷的。如果这两种方法都不能找到小偷,只能等待了。
(2)当找到了小偷后,以其人之身还之其人之道,从小偷那里偷任务过来,相当于你和小偷共同执行你的任务,会加速你的任务完成。
(3)小偷也是爷,如果小偷也在等待一个任务完成,权利反转(小偷等待的这个任务做为当前任务,小偷扮演当事人角色把前面的流程走一遍),这是一个递归的过程。
版权声明: 本文为 InfoQ 作者【李浩宇/Alex】的原创文章。
原文链接:【http://xie.infoq.cn/article/db7de166aa3a23564ac0861a4】。文章转载请联系作者。
评论