写点什么

Fork Join 框架

作者:周杰伦本人
  • 2022 年 6 月 04 日
  • 本文字数:2641 字

    阅读完需:约 9 分钟

Fork Join 框架

Fork/Join 框架是 Java 7 提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。


package com.example.xppdemo.chapter6;
import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.Future;import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 2; // 阈值 private int start; private int end; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; // 如果任务足够小就计算任务 boolean canCompute = (end - start) <= THRESHOLD; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { // 如果任务大于阈值,就分裂成两个子任务计算 int middle = (start + end) / 2; CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); // 执行子任务 leftTask.fork(); rightTask.fork(); // 等待子任务执行完,并得到其结果 int leftResult=leftTask.join(); int rightResult=rightTask.join(); // 合并子任务 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); // 生成一个计算任务,负责计算1+2+3+4 CountTask task = new CountTask(1, 4); // 执行一个任务 Future<Integer> result = forkJoinPool.submit(task); try { System.out.println(result.get()); } catch (InterruptedException e) { } catch (ExecutionException e) { } }}
复制代码


ForkJoinTask 与一般任务的主要区别在于它需要实现 compute 方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用 fork 方法时,又会进入 compute 方法,看看当前子任务是否需要继续分割成子任务,如果不需要继续分割,则执行当前子任务并返回结果。使用 join 方法会等待子任务执行完并得到其结果。


Fork/Join 使用两个类来完成以上两件事情。①ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork()和 join()操作的机制。通常情况下,我们不需要直接继承 ForkJoinTask 类,只需要继承它的子类,Fork/Join 框架提供了以下两个子类。


  • RecursiveAction:用于没有返回结果的任务。

  • RecursiveTask:用于有返回结果的任务。


②ForkJoinPool:ForkJoinTask 需要通过 ForkJoinPool 来执行。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当 一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

Fork/Join 框架的实现原理

ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成,ForkJoinTask 数组负责将存放程序提交给 ForkJoinPool 的任务,而 ForkJoinWorkerThread 数组负责执行这些任务。

(1)ForkJoinTask 的 fork 方法实现原理

当我们调用 ForkJoinTask 的 fork 方法时,程序会调用 ForkJoinWorkerThread 的 pushTask 方法异步地执行这个任务,然后立即返回结果。代码如下。


public final ForkJoinTask<V> fork() {    Thread t;    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)        ((ForkJoinWorkerThread)t).workQueue.push(this);    else        ForkJoinPool.common.externalPush(this);    return this;}
复制代码


workQueue.push:


final void push(ForkJoinTask<?> task) {    ForkJoinTask<?>[] a; ForkJoinPool p;    int b = base, s = top, n;    if ((a = array) != null) {    // ignore if queue removed        int m = a.length - 1;     // fenced write for task visibility        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);        U.putOrderedInt(this, QTOP, s + 1);        if ((n = s - b) <= 1) {            if ((p = pool) != null)                p.signalWork(p.workQueues, this);        }        else if (n >= m)            growArray();    }}
复制代码


pushTask 方法把当前任务存放在 ForkJoinTask 数组队列里。然后再调用 ForkJoinPool 的 signalWork()方法唤醒或创建一个工作线程来执行任务。

(2)ForkJoinTask 的 join 方法实现原理

Join 方法的主要作用是阻塞当前线程并等待获取结果。让我们一起看看 ForkJoinTask 的 join 方法的实现


public final V join() {    int s;    if ((s = doJoin() & DONE_MASK) != NORMAL)        reportException(s);    return getRawResult();}
复制代码


它调用了 doJoin()方法,通过 doJoin()方法得到当前任务的状态来判断返回什么结果,任务状态有 4 种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)。


  • 如果任务状态是已完成,则直接返回任务结果。

  • 如果任务状态是被取消,则直接抛出 CancellationException。

  • 如果任务状态是抛出异常,则直接抛出对应的异常。


private int doJoin() {    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;    return (s = status) < 0 ? s :        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?        (w = (wt = (ForkJoinWorkerThread)t).workQueue).        tryUnpush(this) && (s = doExec()) < 0 ? s :        wt.pool.awaitJoin(w, this, 0L) :        externalAwaitDone();}
复制代码


在 doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为 NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。

发布于: 刚刚阅读数: 5
用户头像

还未添加个人签名 2020.02.29 加入

还未添加个人简介

评论

发布
暂无评论
Fork Join框架_6月月更_周杰伦本人_InfoQ写作社区