写点什么

任务编排:CompletableFuture 从入门到精通

  • 2023-04-28
    湖南
  • 本文字数:9248 字

    阅读完需:约 30 分钟

最近遇到了一个业务场景,涉及到多数据源之间的请求的流程编排,正好看到了一篇某团介绍 CompletableFuture 原理和使用的技术文章,主要还是涉及使用层面。网上很多文章涉及原理的部分讲的不是特别详细且比较抽象。因为涉及到多线程的工具必须要理解原理,不然一旦遇到问题排查起来就只能凭玄学,正好借此梳理一下 CompletableFuture 的工作原理

背景

我们把 Runnable 理解为最基本的线程任务,只具备在线程下执行一段逻辑的能力。为了获取执行的返回值,创造了 Callable 和与其配合使用的 Future。为了将任务之间进行逻辑编排,就诞生了 CompletableFuture。关于如何理解任务的逻辑编排,举一个简单的例子:



打开电脑-更新系统这两个操作是有先后顺序的,但是泡茶和这两个操作没有先后顺序,是可以并行的,而开始办公必须要等待其他操作结束之后才能进行,这就形成了任务编排的执行链。


在 IO 密集型系统中,类似的场景有很多。因为不同数据集的查询依赖主键不同,A 数据集的查询主键是 B 数据集的一个字段这种情况很常见,通常还需要并发查询多个数据集的数据,所以对于多线程的执行编排是有需求的。


一种解决办法是 CountDownLatch,让线程执行到某个地方后进行等待,直到依赖的任务执行结束。对于一些简单的执行链是可以满足的,但是当编排逻辑复杂起来,CountDownLatch 会导致代码难以维护和调试。所以诞生了 CompletableFuture 用来描述和维护任务之间的依赖关系以进行任务编排。在实际应用中,有以下两类场景是适合使用任务编排的:

  • 多数据源请求的流程编排

  • 非阻塞化网关等 NIO 场景

使用方式

创建与执行

同步方法

和 FutureTask 类似,CompletableFuture 也通过 get()方法获取执行结果。但是不同的是,CompletableFuture 本身可以不承载可执行的任务(相比 FutureTask 则必须承载一个可执行的任务 Callable),通过一个用于标记执行成功并设置返回值的函数,在使用上也更为灵活,如下:

	CompletableFuture<String> demo = new CompletableFuture<>();	demo.complete("success");	System.out.println(demo.get());
复制代码

执行结果:success


和 Future 类似,get()函数也是同步阻塞的,调用 get 函数后线程会阻塞直到调用 complete 方法标记任务已经执行成功。


除了手动触发任务的完成,也可以让创建对象的同时就标记任务完成:

	CompletableFuture<String> demo = CompletableFuture.completedFuture("success");	System.out.println(demo.get());
复制代码

执行结果:success

异步方法

相比于同步方法,异步执行更为常见。比如下面这个例子:

        CompletableFuture<String> demo = CompletableFuture.supplyAsync(() -> {            System.out.println("do something by thread" + Thread.currentThread().getName());            return "success";        });        System.out.println(demo.get());
复制代码

执行结果:do something by threadForkJoinPool.commonPool-worker-9success


supplyAsync 方法接收一个 Supplier 对象,逻辑函数交给线程池中的线程异步执行

默认会使用 ForkJoinPool 的公共线程池来执行代码(不推荐),当然也可以指定线程池,如下:

	ExecutorService executor = Executors.newFixedThreadPool(4);    CompletableFuture<String> demo = CompletableFuture.supplyAsync(() -> {        System.out.println("do something by thread" + Thread.currentThread().getName());        return "success";    }, executor);    System.out.println(demo.get());
复制代码

执行结果:do something by threadpool-1-thread-1success


如果不需要执行结果,也可以用 runAsync 方法:

	CompletableFuture.runAsync(() -> {        System.out.println("do something by thread" + Thread.currentThread().getName());    });
复制代码

执行结果:do something by threadForkJoinPool.commonPool-worker-9

多任务编排

多任务编排是 CompletableFuture 的核心,这里列举不同的场景来进行说明

一元依赖

步骤 2 需要依赖步骤 1 执行完毕才能执行,类似主线程的顺序执行,可以通过以下方式实现:

      ExecutorService executor = Executors.newFixedThreadPool(4);      CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {        System.out.println("执行【步骤1】");        return "【步骤1的执行结果】";      }, executor);
CompletableFuture<String> step2 = step1.thenApply(result -> { System.out.println("上一步操作结果为:" + result); return "【步骤2的执行结果】"; }); System.out.println("步骤2的执行结果:" + step2.get());
复制代码

执行结果:执行【步骤 1】上一步操作结果为:【步骤 1 的执行结果】步骤 2 的执行结果:【步骤 2 的执行结果】


通过 thenApply 方法,接收上一个 CompletableFuture 对象的返回值,其中隐含的逻辑是,该处逻辑只有等上一个 CompletableFuture 对象执行完后才会执行。

二元依赖

相比于一元依赖的顺序执行链,二元依赖更为常见,比如下面这个场景:

步骤 1 和 2 是并行的,而步骤 3 需要等步骤 1 和 2 执行完之后才能执行,通过 CompletableFuture 是这么实现的:

        ExecutorService executor = Executors.newFixedThreadPool(4);        CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {            System.out.println("执行【步骤1】");            return "【步骤1的执行结果】";        }, executor);
CompletableFuture<String> step2 = CompletableFuture.supplyAsync(() -> { System.out.println("执行【步骤2】"); return "【步骤2的执行结果】"; }, executor);
CompletableFuture<String> step3 = step1.thenCombine(step2, (result1, result2) -> { System.out.println("前两步操作结果分别为:" + result1 + result2); return "【步骤3的执行结果】"; }); System.out.println("步骤3的执行结果:" + step3.get());
复制代码

执行结果:执行【步骤 1】执行【步骤 2】前两步操作结果分别为:【步骤 1 的执行结果】【步骤 2 的执行结果】步骤 3 的执行结果:【步骤 3 的执行结果】


通过 thenCombine 方法,等待 step1 和 step2 都执行完毕后,获取其返回结果并执行一段新的逻辑

多元依赖

当然还可能有下面这种场景,步骤 M 需要依赖 1-N 的 N 个前置节点:

这种情况会更为复杂,实现方式如下:

    	ExecutorService executor = Executors.newFixedThreadPool(4);        CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {            System.out.println("执行【步骤1】");            return "【步骤1的执行结果】";        }, executor);        CompletableFuture<String> step2 = CompletableFuture.supplyAsync(() -> {            System.out.println("执行【步骤2】");            return "【步骤2的执行结果】";        }, executor);        CompletableFuture<String> step3 = CompletableFuture.supplyAsync(() -> {            System.out.println("执行【步骤3】");            return "【步骤3的执行结果】";        }, executor);
CompletableFuture<Void> stepM = CompletableFuture.allOf(step1, step2, step3); CompletableFuture<String> stepMResult = stepM.thenApply(res -> { // 通过join函数获取返回值 String result1 = step1.join(); String result2 = step2.join(); String result3 = step3.join(); return result1 + result2 + result3; }); System.out.println("步骤M的结果:" + stepMResult.get());
复制代码

执行结果:执行【步骤 1】执行【步骤 2】执行【步骤 3】步骤 M 的结果:【步骤 1 的执行结果】【步骤 2 的执行结果】【步骤 3 的执行结果】


通过 allOf 函数声明当参数中的所有任务执行完毕后,才会执行下一步操作,但是要注意,allOf 本身只是定义节点,真正阻塞的位置是 thenApply 函数。


和之前的方式不同,由于采用了不定变量,所以要通过 CompletableFuture#join 来获取每个任务的返回值。


除了 allOf 之外,如果我们需要任意一个任务完成后就执行下一步操作,可以使用 anyOf 方法,如下:

    // step1/2/3的定义相同		// ...		CompletableFuture<Object> stepM = CompletableFuture.anyOf(step1, step2, step3);    System.out.println("步骤M的结果:" + stepM.get());
复制代码

执行结果:步骤 M 的结果:【步骤 1 的执行结果】


与 allOf 不同,anyOf 的返回值即为第一个执行完毕的任务

工作原理

概念

在讲原理之前,先来了解一下 CompletableFuture 的定义。在实现上,CompletableFuture 继承了 Future 和 CompletionStage

Future 毋庸置疑,CompletableFuture 最基本的能力就是获取异步计算的结果。CompletionStage 则是声明了编排节点的能力,每一个 CompletionStage 都声明了流程树上的一个节点(见下图):

CompletionStage 声明的接口 thenXXX,包括 thenApply、thenCompose 等,定义了节点之间的连接方式(实际情况更为复杂,具体原理参考下节函数分析),通过这种方式,最终定义出一颗流程树,进而实现了多线程的任务编排。CompletionStage 的方法返回值通常是另一个 CompletionStage,进而构成了链式调用。

结构分析

CompletableFuture 里包含两个变量,result 和 stack

result 很好理解,就是当前节点的执行结果。stack 就比较复杂,是一个无锁并发栈,声明了当前节点执行完毕后要触发的节点列表,接下来我们详细讲一下

CompletableFuture 中的栈设计

Completion 是一个无锁并发栈,声明了当前节点执行完毕后要触发的节点列表。在结构上是一个链式节点,其中只包含了一个指向下一个节点的 next 对象

我们可以看到 Completion 有繁多的实现类,表示不同的依赖方式。

我们知道,在 CompletableFuture 中的流程编排是通过 thenApply、thenAccept、thenCombine 等方式来实现的:

  • thenApply:接收上一步的处理结果,进行下一步消费,并返回结果

  • thenAccept:和 thenApply 类似,不过无结果返回

  • thenCombine:同时接收两个流程节点,等其都执行完毕后一起处理结果


每个函数实际分别对应了一种 Completion 实现类,以刚才的三种函数为例,分别对应了 UniApply、UniAccept、UniCombine 三个对象。所以 Completion 可以认为是流程编排逻辑的抽象对象,可以理解为流程节点,或者任务节点。


以 UniCompletion 为例,结构如下:

abstract static class UniCompletion<T,V> extends Completion {    Executor executor;                 // 线程池    CompletableFuture<V> dep;          // 完成任务依赖的cf    CompletableFuture<T> src;          // 完成任务所需资源所在的cf
/** * 如果任务可以被执行则返回true,通过FJ标记位保证只有一个线程判断成功。 * 如果任务是异步的,则在任务启动后通过tryFire来执行任务 */ final boolean claim() { Executor e = executor; if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { if (e == null) return true; executor = null; // disable e.execute(this); } return false; }
/** * 如果dep不为空返回true,用以判断当前任务节点是否已被激活 */ final boolean isLive() { return dep != null; }}
复制代码

先来看 claim 函数,这个比较容易解释,该函数用于判断任务是否可被执行。通过 compareAndSetForkJoinTaskTag 函数的 CAS 操作保证只有一个线程执行成功,主要作用就是在多线程情况下确保任务的正确执行。


接下来就是重头戏,源任务与依赖任务,这两个概念是 CompletableFuture 的核心,贯穿了所有逻辑的执行,只有理解了这两个概念,才能对执行原理有比较透彻的理解


源任务与依赖任务

源任务和依赖任务在 UniCompletion 中分别为 src 和 dep 属性,举个具体的例子,比如下面这段代码:

CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {    return "A";});
CompletableFuture<String> b = a.thenApply(res -> { return "B " + res;});
复制代码

调用 a.thenApply(Function fn)时,可以认为是生成了一个 UniApply 的流程节点(具体怎么生成的下文会提到),其中源任务就是 a,而依赖任务则是 thenApply 的返回值。


换个简单的说法,在上面的代码中,我们有 a、b 两个任务,b 任务的完成需要依赖于 a 任务的完成,所以 a 会生成一个流程节点(UniApply 对象),其中包含了 b 想要执行完成的全部资源(a 的执行结果等),这时 a 任务就叫做源任务(因为 a 任务中有任务资源)。而 b 任务需要依赖 a 任务来完成,所以 b 任务叫做依赖任务。


源任务的完成会触发依赖任务的执行,这个就是任务编排的基本原理

函数分析

在本节中,CompletableFuture 由于名字太长,会以 cf 来代指。


由于 thenAccept、thenCombine 函数等逻辑比较类似,我们以最基础的 thenApply 函数为例进行分析。

核心函数

我们先不要直接从 thenApply、complete 等函数入手,我们先来看这几个核心函数,不明白做什么的不要紧,先理解这几个函数的原理就好。


uniApply

CompletableFuture 的逻辑在于“只有当 X 条件满足时,再执行 Y 逻辑”,uniApply 函数就是负责这样的逻辑判断,首先看源码:

final <S> boolean uniApply(CompletableFuture<S> a,                            Consumer<? super S> f, UniApply<S> c) {    Object r; Throwable x;    // 1    if (a == null || (r = a.result) == null || f == null)        return false;
tryComplete: if (result == null) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { completeThrowable(x, r); break tryComplete; } r = null; }
try { // 2 if (c != null && !c.claim()) return false;
// 3 S s = (S) r; completeValue(f.apply(s)); } catch (Throwable ex) { completeThrowable(ex); } } return true;}
复制代码

整个方法可以分为三段(已在代码中标出),我们分开来说。

  • 第一段,判断所给的任务节点是否已经执行完毕,如果已经执行完毕则进入下一步

  • 第二段,如果有关联的流程节点,则通过 claim 函数判断当前任务是否可被执行,如果可执行则进入下一步(确保多线程情况下任务的正确执行)

  • 第三段,执行传入的函数并把值设置到当前对象中。


整个逻辑是这样的,首先我们传入了一个 cf 对象、一个函数,和一个流程节点。只有当传入的 cf 对象执行完成(result 不为空),再执行给定的函数,并把执行结果设置到当前对象中。如果不考虑特殊情况,uniApply 方法用一句话解释就是:如果给定的任务已经执行完毕,就执行传入的函数并把执行结果设置到当前对象中


tryFire

uniApply 函数仅仅是一个有条件的函数执行器,真正想要达到任务编排还需要其他函数的参与,我们先来看 tryFire 方法:

final CompletableFuture<V> tryFire(int mode) {            CompletableFuture<V> d; CompletableFuture<T> a;    if ((d = dep) == null ||        !d.uniApply(a = src, fn, mode > 0 ? null : this))        return null;    dep = null; src = null; fn = null;    return d.postFire(a, mode);}
复制代码

tryFire 根据关联关系的不同有多种实现,实际执行流程相差不大,这里以常用的 UniApply 的实现来举例。

首先这个方法接收了一个 mode 参数,有以下几种取值:

  • -1:传播模式,或者叫嵌套模式。表示任务实际已经执行完毕,只是在传递状态

  • 0:同步模式。任务由当前线程调用处理

  • 1:异步模式。任务需要提交到指定线程池处理


根据 mode 的不同,实际 tryFire 执行的流程也会发生很大区别。不过归根到底,tryFire 方法的本质是调用了 uniApply 执行一次任务,如果执行成功,则会清空 dep、src 等自身属性(清空之后 isLive 方法会返回 false,表示任务已经执行完毕),同时通过 postFire 方法执行该任务下的其他依赖任务,实现任务的传播执行。


postFire 方法由于和 tryFire 方法关联比较密切,这里放在一起说明:

final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {        if (a != null && a.stack != null) {            if (mode < 0 || a.result == null)                a.cleanStack();            else                a.postComplete();        }        if (result != null && stack != null) {            if (mode < 0)                return this;            else                postComplete();        }        return null;}
复制代码

这里简单概括一下执行原理,如果是嵌套模式,则清理栈内无效任务,并返回对象本身(可以认为什么都没做);否则通过 postComplete 方法执行栈内依赖此任务的其他任务项。


postComplete

当一个 CompletionStage 执行完成之后,会触发依赖它的其他 CompletionStage 的执行,这些 Stage 的执行又会触发新一批的 Stage 执行,这就是任务的顺序编排。


如果说 uniApply 是基础功能,是负责线程安全且遵守依赖顺序地执行一个函数,那么 postComplete 就是核心逻辑,负责当一个任务执行完毕后触发依赖该任务的其他任务项,先来看源码:

final void postComplete() {    CompletableFuture<?> f = this; Completion h;    // 1    while ((h = f.stack) != null ||           (f != this && (h = (f = this).stack) != null)) {        CompletableFuture<?> d; Completion t;
// 2 if (f.casStack(h, t = h.next)) { if (t != null) { if (f != this) { pushStack(h); continue; } h.next = null; // detach } // 3 f = (d = h.tryFire(NESTED)) == null ? this : d; } }}
复制代码

在源码上标记了三个位置,分别代表三层结构:


  • 第一层 while 循环,只要当前对象栈中还有流程节点,那么就循环执行内部逻辑。

  • 第二层,由于 continue 的存在,和第一层结合起来看就是一个批量压栈的操作,将所有需要触发的依赖树按顺序压入当前对象栈中。

  • 第三层,通过 tryFire 按顺序触发栈中所有的依赖任务。上节我们可以看到 tryFire 函数内根据 mode 的不同会触发不同的逻辑,这里 mode 指定为 NESTED 就是为了避免循环调用 postComplete

执行函数

几个核心函数介绍完了,接下来我们回到最外层,来看看任务是如何执行的,首先我们以 thenApply 为例分析核心执行函数。


supplyAsync(实际调用为 asyncSupplyStage)

该方法用于提交一个任务到线程池中执行,并将该任务打包为一个 CompletableFuture 节点

static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {    if (f == null) throw new NullPointerException();    CompletableFuture<U> d = new CompletableFuture<U>();    e.execute(new AsyncSupply<U>(d, f));    return d;}
复制代码

其中 AsyncSupply 实现了 Runnable 接口,所以理解为一种特殊的任务即可。这种任务在执行完成后会调用 completeValue 将函数执行的结果设置当前对象中。


所以整体逻辑为,首先创建一个 cf 对象,并立即将任务添加到线程池执行,在执行完毕后会将任务执行的结果保存到所创建的 cf 对象中。


complete

public boolean complete(T value) {    boolean triggered = completeValue(value);	postComplete();	return triggered;}
复制代码

该方法直接调用 completeValue 方法设置值,设置完值之后调用 postComplete 方法来依次执行后续任务。当调用该方法时,会实现任务的依赖扩散执行


thenApply(实际调用为 uniApplyStage)

private <V> CompletableFuture<V> uniApplyStage(    	Executor e, Function<? super T,? extends V> f) {        if (f == null) throw new NullPointerException();        CompletableFuture<V> d =  new CompletableFuture<V>();    if (e != null || !d.uniApply(this, f, null)) {        UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);        push(c);        c.tryFire(SYNC);    }    return d;}
复制代码

结合上节分析的核心函数,我们很容易可以分析该函数的流程:执行 function 函数,如果条件不满足则执行失败,会生成一个流程节点并压入栈,同时再通过 tryFire 再尝试执行一次,如果条件依然不满足,那么只能等待所依赖的任务执行完成后通过 postComplete 触发执行。


get

public T get() throws InterruptedException, ExecutionException {    Object r;    return reportGet((r = result) == null ? waitingGet(true) : r);}
复制代码

方法核心在于 waitingGet,内部使用了 ForkJoinPool.managedBlock 来阻塞线程直到执行完毕

流程分析

在函数分析中,我们实际已经说明了任务依赖执行的基本原理,这里为了更为详细地说明,我们以一个简单的例子来分析。


首先我们抛开一切复杂的因素,以最基本的同步串行代码来讲,我们现在有这样一个对象:

CompletableFuture<String> A = new CompletableFuture<>();
复制代码


然后我们这时候给其加上了任务编排,增加了一个 thenApply 依赖。

AtomicInteger seq = new AtomicInteger(0);Function<String, String> func = s -> s + " | " + seq.incrementAndGet();
CompletableFuture<String> a = new CompletableFuture<>();CompletableFuture<String> b = a.thenApply(func);
复制代码


于是我们就有了这样一个结构,A 的 stack 中压入了一个 Completion 节点,该节点的源任务指向 A 本身,而依赖任务指向了 B,表示 B 任务依赖 A 任务的执行。


接下来我们再加一条依赖:

AtomicInteger seq = new AtomicInteger(0);Function<String, String> func = s -> s + " | " + seq.incrementAndGet();
CompletableFuture<String> a = new CompletableFuture<>();CompletableFuture<String> b = a.thenApply(func);CompletableFuture<String> c = a.thenApply(func);
复制代码


我们会发现两个特点:

  1. 和栈的性质一样,越晚添加的编排逻辑越早被执行

  2. 基于同一个对象衍生出来的流程节点的源任务是一致的


以此类推,thenXXX 的其他逻辑也是类似的原理,当 a 调用 complete 函数时(无论是同步还是异步),都会依次触发 A 任务的 stack 下挂接的其他依赖任务。而只要 a 没有调用 complete 函数,那么 thenApply 中挂接的依赖任务无论如何都无法执行(因为 a 对象的 result 属性为空)

注意事项

避免主任务和子任务向同一个线程池中申请线程,由于存在依赖关系,当通过 join 来获取子任务的值时,一旦子任务由于线程队列已满进入阻塞队列,那么将会形成死锁。


作者:Kripath_Rion

链接:https://juejin.cn/post/7212466685450207290

来源:稀土掘金

用户头像

还未添加个人签名 2021-07-28 加入

公众号:该用户快成仙了

评论

发布
暂无评论
任务编排:CompletableFuture从入门到精通_Java_做梦都在改BUG_InfoQ写作社区