写点什么

ForkJoinPool 实践

作者:FunTester
  • 2023-02-21
    北京
  • 本文字数:1784 字

    阅读完需:约 6 分钟

最近在看一本 15 年出版的《Java 并发编程的艺术》一书,其中看到并发编程时间部分的 ForkJoinPool 功能时,突然发现这个功能实际使用上就是把一个大任务分成多个小的子任务,然后使用多个线程完成。

这个场景跟我之前写过的自定义Java自定义异步功能实践有点异曲同工之妙,只不过这里有有个子任务的概念,多个任务执行结果是具有相关性的。资料指出 ForkJoinPool 比较适合计算密集型的任务。

性能测试中QPS取样器和RT取样器中,有这样一个使用场景,在用例执行过程中,我想了解一下当前用例执行的 QPS 和 RT 信息,就需要有个触发开关,开始收集这些数据,等某一个终止条件被触发,结束收集,然后计算结果。在用例 QPS 超过 10 万的情况下,单次收集的数据可能会超过 100 万,计算 QPS 和 RT 就非常适合 ForkJoinPool 来完成。

如果一直实时展示或者上报这些信息的话,也可以使用 ForkJoinPool 来完成计算功能。这里还有另外的方案来实现,如果只是得到 QPS 和 RT 数据的话,比 ForkJoinPool 更加合适,这里先不分享了。

ForkJoinPool API 相比较 ExecutorService 还是比较简单的。主要的功能 3 个:创建任务的 ForkJoinPool、创建任务分配规则和收集任务结果。

下面我以一个数组求和的 Demo 演示一下 ForkJoinPool 的功能。

首先我们需要定义一个 ForkJoinPool,通常使用java.util.concurrent.ForkJoinPool#ForkJoinPool(int)或者java.util.concurrent.ForkJoinPool#commonPool这两个方法其中之一,如果你使用 JDK 7 及以前的版本,第二个 API 是不存在的。

翻看源码之后,看起来 ForkJoinPool 构造方法参数还是挺多的,如果都要自定义比较麻烦也是没多大必要的,所以我就选上面提到的第一种 API 来创建 ForkJoinPool。

然后我们要创建一个任务类实现任务分配规则,首先继承java.util.concurrent.RecursiveTask实现java.util.concurrent.RecursiveTask#compute方法。

拆分任务的思路如下:使用两个 int 属性,标记 List 中需要求和片段索引。这样每次分配任务的时候,只需要改变索引值即可。将一个很长的 List 求和分成 N 个小片段求和。

类代码设计如下:

import com.funtester.frame.SourceCodeimport groovy.util.logging.Log4j2
import java.util.concurrent.ForkJoinPoolimport java.util.concurrent.RecursiveTask
@Log4j2class ForkJoinT extends RecursiveTask<Integer> {
    static def data = 1..100 as List
    int start    int end
    ForkJoinT(int start, int end) {        this.start = start        this.end = end    }
    @Override    protected Integer compute() {        if (end - start < 5) {            sum(start, end)        } else {            def middle = ((start + end) / 2) as int            def left = new ForkJoinT(start, middle)            def right = new ForkJoinT(middle + 1, end)            left.fork()            right.fork()            left.join() + right.join()        }    }
    /**     * 片段求和     * @param i     * @param k     * @return     */    static def sum(int i, int k) {        SourceCode.range(i, k + 1).map(data::get).sum()    }
}

复制代码

总体感觉java.util.concurrent.RecursiveTask#compute方法写起来有点像递归,思路明确了以后还是很流畅的。

先来个高斯求和,下面是测试代码:

    static void main(String[] args) {        def pool = new ForkJoinPool(5)        def t = new ForkJoinT(0, data.size() - 1)        pool.submit(t)        log.info("sum: {}", t.get())    }
复制代码

控制台输出:

22:30:42.725 main sum: 5050
复制代码

性能方面等我先研究一波 JMH 之后再来。

FunTester 原创专题推荐~

-- By FunTester

发布于: 刚刚阅读数: 3
用户头像

FunTester

关注

公众号:FunTester,800篇原创,欢迎关注 2020-10-20 加入

Fun·BUG挖掘机·性能征服者·头顶锅盖·Tester

评论

发布
暂无评论
ForkJoinPool实践_FunTester_InfoQ写作社区