高并发编程 / 并行任务组件 ForkJoinPool 设计图分解 (高手篇)
ForkJoinTask
是 Java 并发编程中的强大工具,专为大规模并行计算设计。它通过将大型任务分解成小块(fork),并在多个处理器上并行执行这些小块,然后将结果合并(join),实现了高效的并行处理。这种分治策略不仅简化了并行编程,还充分利用了多核处理器的能力,特别适用于计算密集型任务。如果你是并发编程的爱好者或需要处理复杂计算任务的开发者,ForkJoinTask
提供了一种优雅且高效的解决方案。
肖哥弹架构 跟大家“弹弹” 高并发锁, 关注公号回复 'mvcc' 获得手写数据库事务代码
欢迎 点赞,关注,评论。
关注公号 Solomon 肖哥弹架构获取更多精彩内容
历史热点文章
0、ForkJoinPool 业务执行流程
图说明:
提交任务到 ForkJoinPool:将一个
ForkJoinTask
任务提交到ForkJoinPool
中执行。任务分解(Fork) :任务被分解成多个子任务,这是
ForkJoinTask
设计的核心,允许任务递归地分解成更小的子任务。子任务 1、子任务 2、更多子任务... :表示分解出来的多个子任务。
子任务执行:每个子任务被执行。
检查是否还有子任务:在子任务执行完成后,检查是否还有更多的子任务需要分解和执行。
任务合并(Join) :所有子任务完成后,父任务会合并所有子任务的结果。
返回结果:合并后的结果被返回。
结束:流程的终点。
1、ForkJoinTask 设计目的
ForkJoinTask
是 Java 7 引入的一个并行计算框架的核心组件,特别适用于可以被自然地分解为子任务的工作负载,这些子任务可以独立执行并最终合并结果。以下是 ForkJoinTask
的设计因素:
分治算法的并行化:
ForkJoinTask
的设计初衷是为了支持分治算法(Divide and Conquer)的并行计算。这种算法将大问题分解成小问题,递归解决小问题,然后将结果合并以解决原始问题。利用多核处理器:随着多核处理器的普及,
ForkJoinTask
提供了一种高效的方式来利用多核处理器的并行能力,通过将任务分解并行执行,从而提高程序的执行效率。工作窃取算法:
ForkJoinPool
实现了工作窃取算法,允许空闲线程从其他线程的任务队列中“窃取”任务来执行,减少线程空闲时间,提高资源利用率。递归分解任务:
ForkJoinTask
允许任务在执行过程中动态地将自身分解成更小的子任务,这些子任务可以并行执行,最终合并结果。线程池友好:
ForkJoinTask
专为ForkJoinPool
设计,与标准的ThreadPoolExecutor
相比,它提供了更细粒度的任务管理,适合于可以并行执行的任务。任务合并:
ForkJoinTask
的子类通常包含一个join
方法,用于获取任务的结果。当一个任务被分解成多个子任务时,可以通过调用父任务的join
方法来等待所有子任务完成并合并结果。灵活性和扩展性:
ForkJoinTask
提供了fork
和join
方法,允许开发者手动控制任务的分解和结果的合并,提供了极高的灵活性和扩展性。内置任务:Java 提供了一些内置的
ForkJoinTask
实现,如RecursiveAction
和RecursiveTask
。RecursiveAction
用于没有返回结果的任务,而RecursiveTask
用于有返回结果的任务。
2、ForkJoinTask 相关组件概述
ForkJoinPool:
ForkJoinPool
是ForkJoinTask
执行的核心,它实现了ExecutorService
接口,并通过工作窃取算法来平衡线程间的工作负载。每个工作线程都有一个自己的任务队列,当一个线程完成任务后,它会尝试从其他线程的任务队列中“窃取”任务来执行。这种机制有助于平衡负载,使得所有线程都能保持忙碌状态,从而提高 CPU 的利用率。ForkJoinTask:
ForkJoinTask
是一个抽象类,代表在ForkJoinPool
中执行的轻量级任务。它有两个重要的子类:RecursiveAction
和RecursiveTask
。RecursiveAction
用于没有返回结果的任务,而RecursiveTask
用于有返回结果的任务。这两个子类都需要实现compute()
方法来定义任务的逻辑。工作窃取算法:
工作窃取算法允许空闲线程从其他线程的任务队列中“窃取”任务来执行。默认情况下,工作线程从自己的任务队列头部获取任务。当队列为空时,线程会从其他忙碌线程的队列尾部“窃取”任务,或者从全局入口队列中获取任务,因为这些地方最有可能存在较大的工作量。
任务的分解与合并:
在
ForkJoinTask
的实现中,任务通常被递归地分解成更小的子任务(Fork),直到它们足够小,可以直接异步执行。然后,这些子任务的结果被递归地合并(Join)成一个单一的结果。ForkJoinTask 的方法:
fork()
:将任务放入队列并安排异步执行。join()
:等待任务完成并返回结果。invoke()
:结合fork()
和join()
,启动任务,等待其结束并返回结果。提交任务到 ForkJoinPool:
可以通过
ForkJoinPool
的invoke()
、execute()
或submit()
方法提交ForkJoinTask
任务。invokeAll()
方法可以同时提交多个任务,并返回一个Future
列表。
3、ForkJoinTask 业务组件设计
核心组件:
ForkJoinPool
:整个框架的中心节点。工作窃取算法
:包括空闲线程窃取任务和平衡工作负载。双端队列
:每个线程维护的任务队列。任务分解(Fork/Join)
:任务被分解成子任务,并行执行后合并结果。任务管理:
任务类型
:包括RecursiveAction
和RecursiveTask
。线程管理
:动态管理线程数量。局部性优化
:任务通常由提交它们的线程执行。异常与控制:
异常处理
:捕获并处理任务执行过程中的异常。任务提交
:包括execute
、submit
、invoke
方法。任务同步
:包括get
、join
、invoke
方法来同步任务执行和获取结果。取消和超时
:支持任务取消和超时控制。监控与配置:
管理界面
:监控和控制线程池行为。公平性和优先级
:默认不支持,但可以通过自定义实现。任务的不可中断性
:任务默认是不可中断的。
4、ForkJoinTask 工作窃取算法流程
步骤:
开始:并行计算的起点。
线程 X - 任务队列 X:每个线程都有自己的任务队列。
线程 X 执行任务:线程从自己的任务队列中取出任务并执行。
线程 X 空闲? :检查线程是否空闲(即任务队列为空)。
窃取任务:如果线程空闲,它会尝试从其他线程的任务队列中窃取任务。
5、ForkJoinTask 常用方法
5.1. 任务执行和控制
fork() :异步执行任务,不等待结果。
join() :等待任务完成并获取结果。
5.2. 结果处理
getRawResult() :获取任务的原始结果。
setRawResult(T value) :设置任务的结果。
5.3. 异常和取消
quietlyJoin(ForkJoinTask<> task) :等待任务完成,不抛出中断异常。
cancel(boolean mayInterruptIfRunning) :尝试取消任务的执行。
5.4. 任务组合和依赖
invokeAll(ForkJoinTask<>... tasks) :启动多个任务并等待它们全部完成。
invoke(ForkJoinTask<> task) :启动一个任务并等待其完成,返回结果。
5.5. 辅助方法
helpQuiesce() :帮助处理池中的任务,直到池中没有更多的任务可以执行。
tryUnfork() :撤销一个已经
fork()
但尚未开始的任务。
5.6. 任务重用
reinitialize() :重置任务的状态,使其可以被重新使用。
6、ForkJoinTask 应用案例
6.1 入门案例
业务数据,表示一年中每一天的销售额(单位:元):
代码实现
6.2 订单总销售额计算
一个电子商务平台需要计算所有订单的总销售额。这个平台有大量的订单数据,分布在不同的数据库分片中。为了快速得到总销售额,我们可以利用ForkJoinTask
来并行处理每个数据库分片的数据,然后将结果合并。
计算总销售额代码
版权声明: 本文为 InfoQ 作者【肖哥弹架构】的原创文章。
原文链接:【http://xie.infoq.cn/article/44478531a042f359a32f49174】。文章转载请联系作者。
评论