写点什么

并发编程 -CompletableFuture 解析 | 京东物流技术团队

  • 2023-07-28
    北京
  • 本文字数:7501 字

    阅读完需:约 25 分钟

并发编程-CompletableFuture解析 | 京东物流技术团队

1、CompletableFuture 介绍

CompletableFuture 对象是 JDK1.8 版本新引入的类,这个类实现了两个接口,一个是 Future 接口,一个是 CompletionStage 接口。


CompletionStage 接口是 JDK1.8 版本提供的接口,用于异步执行中的阶段处理,CompletionStage 定义了一组接口用于在一个阶段执行结束之后,要么继续执行下一个阶段,要么对结果进行转换产生新的结果等,一般来说要执行下一个阶段都需要上一个阶段正常完成,这个类也提供了对异常结果的处理接口

2、CompletableFuture 的 API

2.1 提交任务

在 CompletableFuture 中提交任务有以下几种方式:


public static CompletableFuture<Void> runAsync(Runnable runnable)public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
复制代码


这四个方法都是用来提交任务的,不同的是 supplyAsync 提交的任务有返回值,runAsync 提交的任务没有返回值。两个接口都有一个重载的方法,第二个入参为指定的线程池,如果不指定,则默认使用 ForkJoinPool.commonPool()线程池。在使用的过程中尽量根据不同的业务来指定不同的线程池,方便对不同线程池进行监控,同时避免业务共用线程池相互影响。

2.2 结果转换

2.2.1 thenApply

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
复制代码


thenApply 这一组函数入参是 Function,意思是将上一个 CompletableFuture 执行结果作为入参,再次进行转换或者计算,重新返回一个新的值。

2.2.2 handle

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
复制代码


handle 这一组函数入参是 BiFunction,该函数式接口有两个入参一个返回值,意思是处理上一个 CompletableFuture 的处理结果,同时如果有异常,需要手动处理异常。

2.2.3 thenRun

public CompletableFuture<Void> thenRun(Runnable action)public CompletableFuture<Void> thenRunAsync(Runnable action)public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
复制代码


thenRun 这一组函数入参是 Runnable 函数式接口,该接口无需入参和出参,这一组函数是在上一个 CompletableFuture 任务执行完成后,在执行另外一个接口,不需要上一个任务的结果,也不需要返回值,只需要在上一个任务执行完成后执行即可。

2.2.4 thenAccept

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
复制代码


thenAccept 这一组函数的入参是 Consumer,该函数式接口有一个入参,没有返回值,所以这一组接口的意思是处理上一个 CompletableFuture 的处理结果,但是不返回结果。

2.2.5 thenAcceptBoth

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
复制代码


thenAcceptBoth 这一组函数入参包括 CompletionStage 以及 BiConsumer,CompletionStage 是 JDK1.8 新增的接口,在 JDK 中只有一个实现类:CompletableFuture,所以第一个入参就是 CompletableFuture,这一组函数是用来接受两个 CompletableFuture 的返回值,并将其组合到一起。BiConsumer 这个函数式接口有两个入参,并且没有返回值,BiConsumer 的第一个入参就是调用方 CompletableFuture 的执行结果,第二个入参就是 thenAcceptBoth 接口入参的 CompletableFuture 的执行结果。所以这一组函数意思是将两个 CompletableFuture 执行结果合并到一起。

2.2.6 thenCombine

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
复制代码


thenCombine 这一组函数和 thenAcceptBoth 类似,入参都包含一个 CompletionStage,也就是 CompletableFuture 对象,意思也是组合两个 CompletableFuture 的执行结果,不同的是 thenCombine 的第二个入参为 BiFunction,该函数式接口有两个入参,同时有一个返回值。所以与 thenAcceptBoth 不同的是,thenCombine 将两个任务结果合并后会返回一个全新的值作为出参。

2.2.7 thenCompose

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
复制代码


thenCompose 这一组函数意思是将调用方的执行结果作为 Function 函数的入参,同时返回一个新的 CompletableFuture 对象。

2.3 回调方法

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
复制代码


whenComplete 方法意思是当上一个 CompletableFuture 对象任务执行完成后执行该方法。BiConsumer 函数式接口有两个入参没有返回值,这两个入参第一个是 CompletableFuture 任务的执行结果,第二个是异常信息。表示处理上一个任务的结果,如果有异常,则需要手动处理异常,与 handle 方法的区别在于,handle 方法的 BiFunction 是有返回值的,而 BiConsumer 是没有返回值的。


以上方法都有一个带有 Async 的方法,带有 Async 的方法表示是异步执行的,会将该任务放到线程池中执行,同时该方法会有一个重载的方法,最后一个参数为 Executor,表示异步执行可以指定线程池执行。为了方便进行控制,最好在使用 CompletableFuture 时手动指定我们的线程池。

2.4 异常处理

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
复制代码


exceptionally 是用来处理异常的,当任务抛出异常后,可以通过 exceptionally 来进行处理,也可以选择使用 handle 来进行处理,不过两者有些不同,hand 是用来处理上一个任务的结果,如果有异常情况,就处理异常。而 exceptionally 可以放在 CompletableFuture 处理的最后,作为兜底逻辑来处理未知异常。

2.5 获取结果

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
复制代码


allOf 是需要入参中所有的 CompletableFuture 任务执行完成,才会进行下一步;


anyOf 是入参中任何一个 CompletableFuture 任务执行完成都可以执行下一步。


public T get() throws InterruptedException, ExecutionExceptionpublic T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutExceptionpublic T getNow(T valueIfAbsent)public T join()
复制代码


get 方法一个是不带超时时间的,一个是带有超时时间的。


getNow 方法则是立即返回结果,如果还没有结果,则返回默认值,也就是该方法的入参。


join 方法是不带超时时间的等待任务完成。

3、CompletableFuture 原理

join 方法同样表示获取结果,但是 join 与 get 方法有什么区别呢。


public T join() {    Object r;    return reportJoin((r = result) == null ? waitingGet(false) : r);}
public T get() throws InterruptedException, ExecutionException { Object r; return reportGet((r = result) == null ? waitingGet(true) : r);}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { Object r; long nanos = unit.toNanos(timeout); return reportGet((r = result) == null ? timedGet(nanos) : r);}
public T getNow(T valueIfAbsent) { Object r; return ((r = result) == null) ? valueIfAbsent : reportJoin(r);}
复制代码


以上是 CompletableFuture 类中两个方法的代码,可以看到两个方法几乎一样。区别在于 reportJoin/reportGet,waitingGet 方法是一致的,只不过参数不一样,我们在看下 reportGet 与 reportJoin 方法。


private static <T> T reportGet(Object r)        throws InterruptedException, ExecutionException {        if (r == null) // by convention below, null means interrupted            throw new InterruptedException();        if (r instanceof AltResult) {            Throwable x, cause;            if ((x = ((AltResult)r).ex) == null)                return null;            if (x instanceof CancellationException)                throw (CancellationException)x;            if ((x instanceof CompletionException) &&                (cause = x.getCause()) != null)                x = cause;            throw new ExecutionException(x);        }        @SuppressWarnings("unchecked") T t = (T) r;        return t;    }
复制代码


private static <T> T reportJoin(Object r) {        if (r instanceof AltResult) {            Throwable x;            if ((x = ((AltResult)r).ex) == null)                return null;            if (x instanceof CancellationException)                throw (CancellationException)x;            if (x instanceof CompletionException)                throw (CompletionException)x;            throw new CompletionException(x);        }        @SuppressWarnings("unchecked") T t = (T) r;        return t;    }
复制代码


可以看到这两个方法很相近,reportGet 方法判断了 r 对象是否为空,并抛出了中断异常,而 reportJoin 方法没有判断,同时 reportJoin 抛出的都是运行时异常,所以 join 方法也是无需手动捕获异常的。


我们在看下 waitingGet 方法


private Object waitingGet(boolean interruptible) {        Signaller q = null;        boolean queued = false;        int spins = -1;        Object r;        while ((r = result) == null) {            if (spins < 0)                spins = SPINS;            else if (spins > 0) {                if (ThreadLocalRandom.nextSecondarySeed() >= 0)                    --spins;            }            else if (q == null)                q = new Signaller(interruptible, 0L, 0L);            else if (!queued)                queued = tryPushStack(q);            else if (interruptible && q.interruptControl < 0) {                q.thread = null;                cleanStack();                return null;            }            else if (q.thread != null && result == null) {                try {                    ForkJoinPool.managedBlock(q);                } catch (InterruptedException ie) {                    q.interruptControl = -1;                }            }        }        if (q != null) {            q.thread = null;            if (q.interruptControl < 0) {                if (interruptible)                    r = null; // report interruption                else                    Thread.currentThread().interrupt();            }        }        postComplete();        return r;    }
复制代码


该 waitingGet 方法是通过 while 的方式循环判断是否任务已经完成并产生结果,如果结果为空,则会一直在这里循环,这里需要注意的是在这里初始化了一下 spins=-1,当第一次进入 while 循环的时候,spins 是-1,这时会将 spins 赋值为一个常量,该常量为 SPINS。


private static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?                                      1 << 8 : 0);
复制代码


这里判断可用 CPU 数是否大于 1,如果大于 1,则该常量为 1<< 8,也就是 256,否则该常量为 0。


第二次进入 while 循环的时候,spins 是 256 大于 0,这里做了减一的操作,下次进入 while 循环,如果还没有结果,依然是大于 0 继续做减一的操作,此处用来做短时间的自旋等待结果,只有当 spins 等于 0,后续会进入正常流程判断。


我们在看下 timedGet 方法的源码


private Object timedGet(long nanos) throws TimeoutException {        if (Thread.interrupted())            return null;        if (nanos <= 0L)            throw new TimeoutException();        long d = System.nanoTime() + nanos;        Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0        boolean queued = false;        Object r;        // We intentionally don't spin here (as waitingGet does) because        // the call to nanoTime() above acts much like a spin.        while ((r = result) == null) {            if (!queued)                queued = tryPushStack(q);            else if (q.interruptControl < 0 || q.nanos <= 0L) {                q.thread = null;                cleanStack();                if (q.interruptControl < 0)                    return null;                throw new TimeoutException();            }            else if (q.thread != null && result == null) {                try {                    ForkJoinPool.managedBlock(q);                } catch (InterruptedException ie) {                    q.interruptControl = -1;                }            }        }        if (q.interruptControl < 0)            r = null;        q.thread = null;        postComplete();        return r;    }
复制代码


timedGet 方法依然是通过 while 循环的方式来判断是否已经完成,timedGet 方法入参为一个纳秒值,并通过该值计算出一个 deadline 截止时间,当 while 循环还未获取到任务结果且已经达到截止时间,则抛出一个 TimeoutException 异常。

4、CompletableFuture 实现多线程任务

这里我们通过 CompletableFuture 来实现一个多线程处理异步任务的例子。


这里我们创建 10 个任务提交到我们指定的线程池中执行,并等待这 10 个任务全部执行完毕。


每个任务的执行流程为第一次先执行加法,第二次执行乘法,如果发生异常则返回默认值,当 10 个任务执行完成后依次打印每个任务的结果。


public void demo() throws InterruptedException, ExecutionException, TimeoutException {        // 1、自定义线程池        ExecutorService executorService = new ThreadPoolExecutor(5, 10,                60L, TimeUnit.SECONDS,                new LinkedBlockingQueue<>(100));
// 2、集合保存future对象 List<CompletableFuture<Integer>> futures = new ArrayList<>(10); for (int i = 0; i < 10; i++) { int finalI = i; CompletableFuture<Integer> future = CompletableFuture // 提交任务到指定线程池 .supplyAsync(() -> this.addValue(finalI), executorService) // 第一个任务执行结果在此处进行处理 .thenApplyAsync(k -> this.plusValue(finalI, k), executorService) // 任务执行异常时处理异常并返回默认值 .exceptionally(e -> this.defaultValue(finalI, e)); // future对象添加到集合中 futures.add(future); }
// 3、等待所有任务执行完成,此处最好加超时时间 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.MINUTES); for (CompletableFuture<Integer> future : futures) { Integer num = future.get(); System.out.println("任务执行结果为:" + num); } System.out.println("任务全部执行完成!"); }
private Integer addValue(Integer index) { System.out.println("第" + index + "个任务第一次执行"); if (index == 3) { int value = index / 0; } return index + 3; }
private Integer plusValue(Integer index, Integer num) { System.out.println("第" + index + "个任务第二次执行,上次执行结果:" + num); return num * 10; }
private Integer defaultValue(Integer index, Throwable e) { System.out.println("第" + index + "个任务执行异常!" + e.getMessage()); e.printStackTrace(); return 10; }
复制代码


作者:京东物流 丁冬

来源:京东云开发者社区 自猿其说 Tech

发布于: 刚刚阅读数: 4
用户头像

拥抱技术,与开发者携手创造未来! 2018-11-20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东云开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
并发编程-CompletableFuture解析 | 京东物流技术团队_并发编程_京东科技开发者_InfoQ写作社区