写点什么

高并发编程 / 并行任务组件 ForkJoinPool 设计图分解 (高手篇)

作者:肖哥弹架构
  • 2024-11-03
    河北
  • 本文字数:5309 字

    阅读完需:约 17 分钟

高并发编程/并行任务组件ForkJoinPool设计图分解(高手篇)


ForkJoinTask 是 Java 并发编程中的强大工具,专为大规模并行计算设计。它通过将大型任务分解成小块(fork),并在多个处理器上并行执行这些小块,然后将结果合并(join),实现了高效的并行处理。这种分治策略不仅简化了并行编程,还充分利用了多核处理器的能力,特别适用于计算密集型任务。如果你是并发编程的爱好者或需要处理复杂计算任务的开发者,ForkJoinTask 提供了一种优雅且高效的解决方案。


肖哥弹架构 跟大家“弹弹” 高并发锁, 关注公号回复 'mvcc' 获得手写数据库事务代码

欢迎 点赞,关注,评论。

关注公号 Solomon 肖哥弹架构获取更多精彩内容

历史热点文章

0、ForkJoinPool 业务执行流程


图说明:


  1. 提交任务到 ForkJoinPool:将一个 ForkJoinTask 任务提交到 ForkJoinPool 中执行。

  2. 任务分解(Fork) :任务被分解成多个子任务,这是 ForkJoinTask 设计的核心,允许任务递归地分解成更小的子任务。

  3. 子任务 1、子任务 2、更多子任务... :表示分解出来的多个子任务。

  4. 子任务执行:每个子任务被执行。

  5. 检查是否还有子任务:在子任务执行完成后,检查是否还有更多的子任务需要分解和执行。

  6. 任务合并(Join) :所有子任务完成后,父任务会合并所有子任务的结果。

  7. 返回结果:合并后的结果被返回。

  8. 结束:流程的终点。

1、ForkJoinTask 设计目的

ForkJoinTask 是 Java 7 引入的一个并行计算框架的核心组件,特别适用于可以被自然地分解为子任务的工作负载,这些子任务可以独立执行并最终合并结果。以下是 ForkJoinTask 的设计因素:


  1. 分治算法的并行化ForkJoinTask 的设计初衷是为了支持分治算法(Divide and Conquer)的并行计算。这种算法将大问题分解成小问题,递归解决小问题,然后将结果合并以解决原始问题。

  2. 利用多核处理器:随着多核处理器的普及,ForkJoinTask 提供了一种高效的方式来利用多核处理器的并行能力,通过将任务分解并行执行,从而提高程序的执行效率。

  3. 工作窃取算法ForkJoinPool 实现了工作窃取算法,允许空闲线程从其他线程的任务队列中“窃取”任务来执行,减少线程空闲时间,提高资源利用率。

  4. 递归分解任务ForkJoinTask 允许任务在执行过程中动态地将自身分解成更小的子任务,这些子任务可以并行执行,最终合并结果。

  5. 线程池友好ForkJoinTask 专为 ForkJoinPool 设计,与标准的 ThreadPoolExecutor 相比,它提供了更细粒度的任务管理,适合于可以并行执行的任务。

  6. 任务合并ForkJoinTask 的子类通常包含一个 join 方法,用于获取任务的结果。当一个任务被分解成多个子任务时,可以通过调用父任务的 join 方法来等待所有子任务完成并合并结果。

  7. 灵活性和扩展性ForkJoinTask 提供了 forkjoin 方法,允许开发者手动控制任务的分解和结果的合并,提供了极高的灵活性和扩展性。

  8. 内置任务:Java 提供了一些内置的 ForkJoinTask 实现,如 RecursiveActionRecursiveTaskRecursiveAction 用于没有返回结果的任务,而 RecursiveTask 用于有返回结果的任务。

2、ForkJoinTask 相关组件概述

  1. ForkJoinPool

  2. ForkJoinPoolForkJoinTask 执行的核心,它实现了 ExecutorService 接口,并通过工作窃取算法来平衡线程间的工作负载。每个工作线程都有一个自己的任务队列,当一个线程完成任务后,它会尝试从其他线程的任务队列中“窃取”任务来执行。这种机制有助于平衡负载,使得所有线程都能保持忙碌状态,从而提高 CPU 的利用率。

  3. ForkJoinTask

  4. ForkJoinTask 是一个抽象类,代表在 ForkJoinPool 中执行的轻量级任务。它有两个重要的子类:RecursiveActionRecursiveTaskRecursiveAction 用于没有返回结果的任务,而 RecursiveTask 用于有返回结果的任务。这两个子类都需要实现 compute() 方法来定义任务的逻辑。

  5. 工作窃取算法

  6. 工作窃取算法允许空闲线程从其他线程的任务队列中“窃取”任务来执行。默认情况下,工作线程从自己的任务队列头部获取任务。当队列为空时,线程会从其他忙碌线程的队列尾部“窃取”任务,或者从全局入口队列中获取任务,因为这些地方最有可能存在较大的工作量。

  7. 任务的分解与合并

  8. ForkJoinTask 的实现中,任务通常被递归地分解成更小的子任务(Fork),直到它们足够小,可以直接异步执行。然后,这些子任务的结果被递归地合并(Join)成一个单一的结果。

  9. ForkJoinTask 的方法

  10. fork():将任务放入队列并安排异步执行。

  11. join():等待任务完成并返回结果。

  12. invoke():结合 fork()join(),启动任务,等待其结束并返回结果。

  13. 提交任务到 ForkJoinPool

  14. 可以通过 ForkJoinPoolinvoke()execute()submit() 方法提交 ForkJoinTask 任务。invokeAll() 方法可以同时提交多个任务,并返回一个 Future 列表。

3、ForkJoinTask 业务组件设计


  1. 核心组件

  2. ForkJoinPool:整个框架的中心节点。

  3. 工作窃取算法:包括空闲线程窃取任务和平衡工作负载。

  4. 双端队列:每个线程维护的任务队列。

  5. 任务分解(Fork/Join):任务被分解成子任务,并行执行后合并结果。

  6. 任务管理

  7. 任务类型:包括 RecursiveActionRecursiveTask

  8. 线程管理:动态管理线程数量。

  9. 局部性优化:任务通常由提交它们的线程执行。

  10. 异常与控制

  11. 异常处理:捕获并处理任务执行过程中的异常。

  12. 任务提交:包括 executesubmitinvoke 方法。

  13. 任务同步:包括 getjoininvoke 方法来同步任务执行和获取结果。

  14. 取消和超时:支持任务取消和超时控制。

  15. 监控与配置

  16. 管理界面:监控和控制线程池行为。

  17. 公平性和优先级:默认不支持,但可以通过自定义实现。

  18. 任务的不可中断性:任务默认是不可中断的。

4、ForkJoinTask 工作窃取算法流程


步骤:


  1. 开始:并行计算的起点。

  2. 线程 X - 任务队列 X:每个线程都有自己的任务队列。

  3. 线程 X 执行任务:线程从自己的任务队列中取出任务并执行。

  4. 线程 X 空闲? :检查线程是否空闲(即任务队列为空)。

  5. 窃取任务:如果线程空闲,它会尝试从其他线程的任务队列中窃取任务。

5、ForkJoinTask 常用方法

5.1. 任务执行和控制

  • fork() :异步执行任务,不等待结果。

  • join() :等待任务完成并获取结果。

5.2. 结果处理

  • getRawResult() :获取任务的原始结果。

  • setRawResult(T value) :设置任务的结果。

5.3. 异常和取消

  • quietlyJoin(ForkJoinTask<> task) :等待任务完成,不抛出中断异常。

  • cancel(boolean mayInterruptIfRunning) :尝试取消任务的执行。

5.4. 任务组合和依赖

  • invokeAll(ForkJoinTask<>... tasks) :启动多个任务并等待它们全部完成。

  • invoke(ForkJoinTask<> task) :启动一个任务并等待其完成,返回结果。

5.5. 辅助方法

  • helpQuiesce() :帮助处理池中的任务,直到池中没有更多的任务可以执行。

  • tryUnfork() :撤销一个已经 fork() 但尚未开始的任务。

5.6. 任务重用

  • reinitialize() :重置任务的状态,使其可以被重新使用。

6、ForkJoinTask 应用案例

6.1 入门案例

业务数据,表示一年中每一天的销售额(单位:元):


int[] dailySales = {    10000, 15000, 12000, 18000, 16000, 20000, 21000, 19000, 17000, 15000,    14000, 13000, 11000, 12000, 13000, 14000, 15000, 16000, 17000, 18000,    19000, 20000, 21000, 22000, 23000, 24000, 25000, 26000, 27000, 28000,    29000, 30000, 31000, 32000, 33000, 34000, 35000, 36000, 37000, 38000,    39000, 40000, 41000, 42000, 43000, 44000, 45000, 46000, 47000, 48000,    49000, 50000};
复制代码


代码实现


import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;
// 继承 RecursiveTask 并实现 compute 方法,用于计算数组的和class SumTask extends RecursiveTask<Long> { private static final long serialVersionUID = 1L; private final int[] array; // 要计算的数组 private final int start; // 数组的开始索引 private final int end; // 数组的结束索引
// 构造函数,初始化数组和索引范围 public SumTask(int[] array, int start, int end) { this.array = array; this.start = start; this.end = end; }
@Override // 实现 compute 方法,该方法将被并行执行 protected Long compute() { long sum = 0; // 用于累积求和的结果 int length = end - start; // 计算当前任务处理的数组长度
// 如果任务足够小,直接计算 if (length <= 10000) { for (int i = start; i < end; i++) { sum += array[i]; } } else { // 如果任务较大,分成两个子任务 int middle = (start + end) / 2; // 创建子任务1,处理数组的前半部分 SumTask subTask1 = new SumTask(array, start, middle); // 创建子任务2,处理数组的后半部分 SumTask subTask2 = new SumTask(array, middle, end);
// 执行子任务,并行计算 subTask1.fork(); subTask2.fork();
// 等待子任务完成并合并结果 sum = subTask1.join() + subTask2.join(); } return sum; }}
public class ParallelSumCalculation { public static void main(String[] args) { // 拟一年的每天销售额数据 int[] dailySales = { // ... (一年的每天销售额数据) };
// 创建 ForkJoinPool 线程池 ForkJoinPool pool = new ForkJoinPool(); // 创建 SumTask 任务,处理整个销售额数组 SumTask task = new SumTask(dailySales, 0, dailySales.length); // 执行任务并获取结果 long totalSales = pool.invoke(task);
System.out.println("一年的总销售额为: " + totalSales + "元"); }}
复制代码

6.2 订单总销售额计算

一个电子商务平台需要计算所有订单的总销售额。这个平台有大量的订单数据,分布在不同的数据库分片中。为了快速得到总销售额,我们可以利用ForkJoinTask来并行处理每个数据库分片的数据,然后将结果合并。


计算总销售额代码


import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;import java.util.Arrays;
// 订单类,包含订单的ID和销售额class Order { long id; double sales;
public Order(long id, double sales) { this.id = id; this.sales = sales; }}
// 计算订单总销售额的任务class CalculateSalesTask extends RecursiveTask<Double> { private static final int THRESHOLD = 100; // 分解任务的阈值 private Order[] orders; private int start; private int end;
public CalculateSalesTask(Order[] orders, int start, int end) { this.orders = orders; this.start = start; this.end = end; }
@Override protected Double compute() { double totalSales = 0.0; if (end - start < THRESHOLD) { // 任务足够小,直接计算 for (int i = start; i < end; i++) { totalSales += orders[i].sales; } } else { // 任务较大,分成两个子任务 int middle = (start + end) / 2; CalculateSalesTask subTask1 = new CalculateSalesTask(orders, start, middle); CalculateSalesTask subTask2 = new CalculateSalesTask(orders, middle, end);
// 执行子任务 subTask1.fork(); subTask2.fork();
// 等待子任务完成并合并结果 totalSales = subTask1.join() + subTask2.join(); } return totalSales; }}
public class ECommerceSalesCalculation { public static void main(String[] args) { // 订单数据 Order[] orders = new Order[1000]; for (int i = 0; i < orders.length; i++) { orders[i] = new Order(i, Math.random() * 10000); // 随机生成销售额 }
// 创建 ForkJoinPool 线程池 ForkJoinPool pool = new ForkJoinPool(); // 创建任务,处理整个订单数组 CalculateSalesTask task = new CalculateSalesTask(orders, 0, orders.length); // 执行任务并获取结果 double totalSales = pool.invoke(task);
System.out.println("总销售额为: " + totalSales); }}
复制代码


发布于: 刚刚阅读数: 4
用户头像

智慧属心窍之锁 2019-05-27 加入

擅长于通信协议、微服务架构、框架设计、消息队列、服务治理、PAAS、SAAS、ACE\ACP、大模型

评论

发布
暂无评论
高并发编程/并行任务组件ForkJoinPool设计图分解(高手篇)_Java_肖哥弹架构_InfoQ写作社区