写点什么

☕【Java 技术指南】「并发编程专题」Fork/Join 框架基本使用和原理探究(原理篇)

发布于: 4 小时前
☕【Java 技术指南】「并发编程专题」Fork/Join 框架基本使用和原理探究(原理篇)

ForkJoin 线程池框架回顾

  • ForkJoin 框架其实就是一个线程池 ExecutorService 的实现,通过工作窃取(work-stealing)算法,获取其他线程中未完成的任务来执行。

  • 可以充分利用机器的多处理器优势,利用空闲的线程去并行快速完成一个可拆分为小任务的大任务,类似于分治算法。

  • ForkJoin 的目标,就是利用所有可用的处理能力来提高程序的响应和性能。本文将介绍 ForkJoin 框架,源码剖析。

ForkJoinPool 的类架构图

ForkJoinPool 核心类实现

  • ForkJoin 框架的核心是 ForkJoinPool 类,基于 AbstractExecutorService 扩展。

  • ForkJoinPool 中维护了一个队列数组 WorkQueue[],每个 WorkQueue 维护一个 ForkJoinTask 数组和当前工作线程。

  • ForkJoinPool 实现了工作窃取(work-stealing)算法并执行 ForkJoinTask。


ForkJoinPool,所有线程和 WorkQueue 共享,用于工作窃取、任务状态和工作状态同步。


核心属性介绍

  • ADD_WORKER: 100000000000000000000000000000000000000000000000 -> 1000 0000 0000 0000,用来配合 ctl 在控制线程数量时使用

  • ctl: 控制 ForkJoinPool 创建线程数量,(ctl & ADD_WORKER) != 0L 时创建线程,也就是当 ctl 的第 16 位不为 0 时,可以继续创建线程

  • defaultForkJoinWorkerThreadFactory: 默认线程工厂,默认实现是 DefaultForkJoinWorkerThreadFactory

  • runState: 全局锁控制,全局运行状态

  • workQueues: 工作队列数组 WorkQueue[]

  • config: 记录并行数量和 ForkJoinPool 的模式(异步或同步)

ForkJoinTask

  • status: 任务的状态,对其他工作线程和 pool 可见,运行正常则 status 为负数,异常情况为正数

WorkQueue

  • qlock: 并发控制,put 任务时的锁控制

  • array: 任务数组 ForkJoinTask<?>[]

  • pool: ForkJoinPool,所有线程和 WorkQueue 共享,用于工作窃取、任务状态和工作状态同步

  • base: array 数组中取任务的下标

  • top: array 数组中放置任务的下标

  • owner: 所属线程,ForkJoin 框架中,只有一个 WorkQueue 是没有 owner 的,其他的均有具体线程 owner。

  • WorkQueue 内部就是 ForkJoinTask


workQueue: 当前线程的任务队列,与 WorkQueue 的 owner 呼应




ForkJoinTask 是能够在 ForkJoinPool 中执行的任务抽象类,父类是 Future,具体实现类有很多,这里主要关注 RecursiveAction 和 RecursiveTask。


  • RecursiveAction 是没有返回结果的任务

  • RecursiveTask 是需要返回结果的任务



只需要实现其 compute()方法,在 compute()中做最小任务控制,任务分解(fork)和结果合并(join)。


ForkJoinWorkerThread

ForkJoinPool 中执行的默认线程是 ForkJoinWorkerThread,由默认工厂产生,可以自己重写要实现的工作线程。同时会将 ForkJoinPool 引用放在每个工作线程中,供工作窃取时使用。


  • pool: ForkJoinPool,所有线程和 WorkQueue 共享,用于工作窃取、任务状态和工作状态同步

  • workQueue: 当前线程的任务队列,与 WorkQueue 的 owner 呼应





  • ForkJoinPool 作为最核心的组件,维护了所有的任务队列 WorkQueues,workQueues 维护着所有线程池的工作线程,工作窃取算法就是在这里进行的。

  • 每一个 WorkQueue 对象中使用 pool 保留对 ForkJoinPool 的引用,用来获取其 WorkQueues 来窃取其他工作线程的任务来执行。

  • 同时 WorkQueue 对象中的 owner 是 ForkJoinWorkerThread 工作线程,绑定 ForkJoinWorkerThread 和 WorkQueue 的一对一关系,每个工作线程会优先完成自己队列的任务,当自己队列中的任务为空时,才会通过工作窃取算法从其他任务队列中获取任务。

  • WorkQueue 中的 ForkJoinTask<?>[] array,是每一个具体的任务,插入 array 中的第一个任务是最大的任务。

源码分析

ForkJoinPool 构造函数

ForkJoinPool 有四个构造函数,其中参数最全的那个构造函数如下所示:


public ForkJoinPool(int parallelism,                            ForkJoinWorkerThreadFactory factory,                            UncaughtExceptionHandler handler,                            boolean asyncMode)
复制代码


  • parallelism:可并行级别,Fork/Join 框架将依据这个并行级别的设定,决定框架内并行执行的线程数量。并行的每一个任务都会有一个线程进行处理,但是千万不要将这个属性理解成 Fork/Join 框架中最多存在的线程数量,也不要将这个属性和 ThreadPoolExecutor 线程池中的 corePoolSize、maximumPoolSize 属性进行比较,因为 ForkJoinPool 的组织结构和工作方式与后者完全不一样。

  • factory:当 Fork/Join 框架创建一个新的线程时,同样会用到线程创建工厂。只不过这个线程工厂不再需要实现 ThreadFactory 接口,而是需要实现 ForkJoinWorkerThreadFactory 接口。

  • 后者是一个函数式接口,只需要实现一个名叫 newThread 的方法。

  • 在 Fork/Join 框架中有一个默认的 ForkJoinWorkerThreadFactory 接口实现:DefaultForkJoinWorkerThreadFactory。

  • handler:异常捕获处理器。当执行的任务中出现异常,并从任务中被抛出时,就会被 handler 捕获。

  • asyncMode:这个参数也非常重要,从字面意思来看是指的异步模式,它并不是说 Fork/Join 框架是采用同步模式还是采用异步模式工作。

  • Fork/Join 框架中为每一个独立工作的线程准备了对应的待执行任务队列,这个任务队列是使用数组进行组合的双向队列。即是说存在于队列中的待执行任务,即可以使用先进先出的工作模式,也可以使用后进先出的工作模式。


当 asyncMode 设置为 true 的时候,队列采用先进先出方式工作;反之则是采用后进先出的方式工作,该值默认为 false


......asyncMode ? FIFO_QUEUE : LIFO_QUEUE,......
复制代码


  • ForkJoinPool 还有另外两个构造函数,一个构造函数只带有 parallelism 参数,既是可以设定 Fork/Join 框架的最大并行任务数量;

  • 另一个构造函数则不带有任何参数,对于最大并行任务数量也只是一个默认值——当前操作系统可以使用的 CPU 内核数量(Runtime.getRuntime().availableProcessors())。

  • 实际上 ForkJoinPool 还有一个私有的、原生构造函数,之上提到的三个构造函数都是对这个私有的、原生构造函数的调用。


private ForkJoinPool(int parallelism,                         ForkJoinWorkerThreadFactory factory,                         UncaughtExceptionHandler handler,                         int mode,                         String workerNamePrefix) {        this.workerNamePrefix = workerNamePrefix;        this.factory = factory;        this.ueh = handler;        this.config = (parallelism & SMASK) | mode;        long np = (long)(-parallelism); // offset ctl counts        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);    }
复制代码
使用案例
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
复制代码


先看 ForkJoinPool 的创建过程,这个比较简单,创建了一个 ForkJoinPool 对象,带有默认 ForkJoinWorkerThreadFactory,并行数跟机器核数一样,同步模式。

提交任务

forkJoinPool.invoke(new CountRecursiveTask(1, 100));会先执行到 ForkJoinPool#externalPush 中,此时 forkJoinPool.workQueues 并没有完成初始化工作,所以执行到 ForkJoinPool#externalSubmit。

externalSubmit


这里是一个 for 无限循环实现,跳出逻辑全部在内部控制,主要结合 runState 来控制。


  1. 建 ForkJoinPool 的 WorkQueue[]变量 workQueues,长度为大于等于 2 倍并行数量的且是 2 的 n 次幂的数。这里对传入的并行数量使用了位运算,来计算出 workQueues 的长度。

  2. 创建一个 WorkQueue 变量 q,q.base=q.top=4096,q 的 owner 为 null,无工作线程,放入 workQueues 数组中

  3. 创建 q.array 对象,长度 8192,将 ForkJoinTask 也就是代码案例中的 CountRecursiveTask 放入 q.array,pool 为传入的 ForkJoinPool,并将 q.top 加 1,完成后 q.base=4096,q.top=4097。然后执行 ForkJoinPool#signalWork 方法。(base 下标表示用来取数据的,top 下标表示用来放数据的,当 base 小于 top 时,说明有数据可以取)


externalSubmit 主要完成 3 个小步骤工作,每个步骤都使用了锁的机制来处理并发事件,既有对 runState 使用 ForkJoinPool 的全局锁,也有对 WorkQueue 使用局部锁。

signalWork

signalWork 方法的签名是:void signalWork(WorkQueue[] ws, WorkQueue q)。ws 为 ForkJoinPool 中的 workQueues,q 为 externalSubmit 方法中新建的用于存放 ForkJoinTask 的 WorkQueue.


  • signalWork 中会根据 ctl 的值判断是否需要创建创建工作线程,当前暂无,因此走到 tryAddWorker(),并在 createWorker()来创建,使用默认工厂方法 ForkJoinWorkerThread#ForkJoinWorkerThread(ForkJoinPool)来创建一个 ForkJoinWorkerThread,ForkJoinPool 为前面创建的 pool。

  • 并创建一个 WorkQueue 其 owner 为新创建的工作线程,其 array 为空,被命名为 ForkJoinPool-1-worker-1,且将其存放在 pool.workQueues 数组中。

  • 创建完线程之后,工作线程 start()开始工作。

  • 这样就创建了两个 WorkQueue 存放在 pool.workQueues,其中一个 WorkQueue 保存了第一个大的 ForkJoinTask,owner 为 null,其 base=4096,top=4097;第二个 WorkQueue 的 owner 为新建的工作线程,array 为空,暂时无数据,base=4096,top=4096。


ForkJoinWorkerThread#run
  • 执行 ForkJoinWorkerThread 线程 ForkJoinPool-1-worker-1,执行点来到 ForkJoinWorkerThread#run,注意这里是在 ForkJoinWorkerThread 中,此时的 workQueue.array 还是空的,pool 为文中唯一的一个,是各个线程会共享的。

  • run 方法中首先是一个判断 if (workQueue.array == null) { // only run once,这也验证了我们前面的分析,当前线程的 workQueue.array 是空的。每个新建的线程,拥有的 workQueue.array 是没有任务的。那么它要执行的任务从哪里来?

  • runWorker()方法中会执行一个死循环,去 scan 扫描是否有任务可以执行。全文的讲到的工作窃取 work-stealing 算法,就在 java.util.concurrent.ForkJoinPool#scan。当有了上图的模型概念时,这个方法的实现看过就会觉得其实非常简单。



  WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;  int b, n; long c;  //如果pool.workQueues即ws的k下标元素不为空  if ((q = ws[k]) != null) {    //如果base<top且array不为空,则说明有元素。为什么还需要array不为空才说明有元素?    //从下面可以知道由于获取元素后才会设置base=base+1,所以可能出现上一个线程拿到元素了但是没有及时更新base      if ((n = (b = q.base) - q.top) < 0 &&          (a = q.array) != null) {      // non-empty          long i = (((a.length - 1) & b) << ASHIFT) + ABASE;          //这里使用getObjectVolatile去获取当前WorkQueue的元素          //volatile是保证线程可见性的,也就是上一个线程可能已经拿掉了,可能已经将这个任务置为空了。          if ((t = ((ForkJoinTask<?>)                    U.getObjectVolatile(a, i))) != null &&              q.base == b) {              if (ss >= 0) {                  //拿到任务之后,将array中的任务用CAS的方式置为null,并将base加1                  if (U.compareAndSwapObject(a, i, t, null)) {                      q.base = b + 1;                      if (n < -1)       // signal others                          signalWork(ws, q);                      return t;                  }              }              else if (oldSum == 0 &&   // try to activate                       w.scanState < 0)                  tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);          }          if (ss < 0)                   // refresh              ss = w.scanState;          r ^= r << 1; r ^= r >>> 3; r ^= r << 10;          origin = k = r & m;           // move and rescan          oldSum = checkSum = 0;          continue;      }      checkSum += b;  }
复制代码
CountRecursiveTask#compute

重写 compute 方法一般需要遵循这个规则来写


if(任务足够小){  直接执行任务;  如果有结果,return结果;}else{  拆分为2个子任务;  分别执行子任务的fork方法;  执行子任务的join方法;  如果有结果,return合并结果;}
复制代码


public final ForkJoinTask<V> fork() {        Thread t;        //如果是工作线程,则往自己线程中的workQuerue中添加子任务;否则走首次添加逻辑        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)            ((ForkJoinWorkerThread)t).workQueue.push(this);        else            ForkJoinPool.common.externalPush(this);        return this;    }
复制代码


ForkJoinPool.WorkQueue#push 方法会将当前子任务存放到 array 中,并调用 ForkJoinPool#signalWork 添加线程或等待其他线程去窃取任务执行。过程又回到前面讲到的 signalWork 流程。

ForkJoinTask#externalAwaitDone
  • 主线程在把任务放置在第一个 WorkQueue 的 array 之后,启动工作线程就退出了。如果使用的是异步的方式,则使用 Future 的方式来获取结果,即提交的 ForkJoinTask,通过 isDone(),get()方法判断和得到结果。

  • 异步的方式跟同步方式在防止任务的过程是一样的,只是主线程可以任意时刻再通过 ForkJoinTask 去跟踪结果。本案例用的是同步的写法,因此主线程最后在 ForkJoinTask#externalAwaitDone 等待任务完成。

  • 这里主线程会执行 Object#wait(long),使用的是 Object 类中的 wait,在当前 ForkJoinTask 等待,直到被 notify。而 notify 这个动作会在 ForkJoinTask#setCompletion 中进行,这里使用的是 notifyAll,因为需要通知的有主线程和工作线程,他们都共同享用这个对象,需要被唤起。

ForkJoinTask#join

来看 left.join() + right.join(),在将 left 和 right 的 Task 放置在当前工作线程的 workQueue 之后,执行 join()方法,join()方法最终会在 ForkJoinPool.WorkQueue#tryRemoveAndExec 中将刚放入的 left 取出,将对应 workQueue 中 array 的 left 任务置为空,然后执行 left 任务。然后执行到 left 的 compute 方法。对于 right 任务也是一样,继续子任务的 fork 和 join 工作,如此循环往复。


  public final V join() {        int s;        if ((s = doJoin() & DONE_MASK) != NORMAL)            reportException(s);        return getRawResult();    }
复制代码


当工作线程执行结束后,会执行 getRawResult,拿到结果。

Work-Steal 算法

相比其他线程池实现,这个是 ForkJoin 框架中最大的亮点。当空闲线程在自己的 WorkQueue 没有任务可做的时候,会去遍历其他的 WorkQueue,并进行任务窃取和执行,提高程序响应和性能。

取 2 的 n 次幂作为长度的实现
  //代码位于java.util.concurrent.ForkJoinPool#externalSubmit    if ((rs & STARTED) == 0) {        U.compareAndSwapObject(this, STEALCOUNTER, null,                               new AtomicLong());        // create workQueues array with size a power of two        int p = config & SMASK; // ensure at least 2 slots        int n = (p > 1) ? p - 1 : 1;        n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;        n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;        workQueues = new WorkQueue[n];        ns = STARTED;    }
复制代码


这里的 p 其实就是设置的并行线程数,在为 ForkJoinPool 创建 WorkQueue[]数组时,会对传入的 p 进行一系列位运算,最终得到一个大于等于 2p 的 2 的 n 次幂的数组长度

内存屏障
  //代码位于java.util.concurrent.ForkJoinPool#externalSubmit    if ((a != null && a.length > s + 1 - q.base) ||        (a = q.growArray()) != null) {        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;        //通过Unsafe进行内存值的设置,高效,且屏蔽了处理器和Java编译器的指令乱序问题        U.putOrderedObject(a, j, task);        U.putOrderedInt(q, QTOP, s + 1);        submitted = true;    }
复制代码


这里在对单个 WorkQueue 的 array 进行 push 任务操作时,先后使用了 putOrderedObject 和 putOrderedInt,确保程序执行的先后顺序,同时这种直接操作内存地址的方式也会更加高效。


高并发:细粒度 WorkQueue 的锁


  //代码位于java.util.concurrent.ForkJoinPool#externalSubmit  //如果qlock为0,说明当前没有其他线程操作改WorkQueue  //尝试CAS操作,修改qlock为1,对这个WorkQueue进行加锁    if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {        ForkJoinTask<?>[] a = q.array;        int s = q.top;        boolean submitted = false; // initial submission or resizing        try {                      // locked version of push            if ((a != null && a.length > s + 1 - q.base) ||                (a = q.growArray()) != null) {                int j = (((a.length - 1) & s) << ASHIFT) + ABASE;                U.putOrderedObject(a, j, task);                U.putOrderedInt(q, QTOP, s + 1);                submitted = true;            }        } finally {            //finally将qlock置为0,进行锁的释放,其他线程可以使用            U.compareAndSwapInt(q, QLOCK, 1, 0);        }        if (submitted) {            signalWork(ws, q);            return;        }    }
复制代码


这里对单个 WorkQueue 的 array 进行 push 任务操作时,使用了 qlock 的 CAS 细粒度锁,让并发只落在一个 WOrkQueue 中,而不是整个 pool 中,极大提高了程序的并发性能,类似于 ConcurrentHashMap。

发布于: 4 小时前阅读数: 4
用户头像

🏆 2021年InfoQ写作平台-签约作者 🏆 2020.03.25 加入

👑【酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“】 🏅 【Java技术领域,MySQL技术领域,APM全链路追踪技术及微服务、分布式方向的技术体系等】 “任何足够先进的技术都是魔法“

评论

发布
暂无评论
☕【Java 技术指南】「并发编程专题」Fork/Join 框架基本使用和原理探究(原理篇)