写点什么

Java 8 的异步利器:CompletableFuture 源码解析(建议精读)

作者:Java快了!
  • 2022 年 9 月 14 日
    湖南
  • 本文字数:3853 字

    阅读完需:约 13 分钟

completableFuture 是 JDK1.8 版本新引入的类。下面是这个类:


实现了俩接口,本身是个 class。这个是 Future 的实现类,使用 completionStage 接口去支持完成时触发的函数和操作。

一个 completetableFuture 就代表了一个任务,他能用 Future 的方法,还能做一些之前说的 executorService 配合 futures 做不了的。

之前 future 需要等待 isDone 为 true 才能知道任务跑完了,或者就是用 get 方法调用的时候会出现阻塞,而使用 completableFuture 的使用就可以用 then,when 等等操作来防止以上的阻塞和轮询 isDone 的现象出现。

1.创建 CompletableFuture 直接 new 对象。

一个 completableFuture 对象代表着一个任务,这个对象能跟这个任务产生联系。

下面用的 complete 方法意思就是这个任务完成了需要返回的结果,然后用 get() 方法可以获取到。


2.JDK1.8 使用的接口类。

在本文的 CompletableFuture 中大量地使用了这些函数式接口。

注:这些声明大量应用于方法的入参中,像 thenApply 和 thenAccept 这俩就是一个用 Function 一个用 Consumer

而 lambda 函数正好是可以作为这些接口的实现。例如 s->{return 1;} 这个就相当于一个 Function。因为有入参和返回结果。


(1)Function


(2)Consumer


对于前面有 Bi 的就是这样的,BiConsumer 就是两个参数的。


(3)Predicate 这个接口声明是一个入参,返回一个 boolean。


(4)supplier


3.下面是这个类的静态方法

带有 Async 就是异步执行的意思、也是一个 completableFuture 对象代表着一个任务这个原则。

这种异步方法都可以指定一个线程池作为任务的运行环境,如果没有指定就会使用 ForkJoinPool 线程池来执行


(1) supplyAsync&runAsync 的使用例子。

public static void main(String[] args) throws ExecutionException, InterruptedException {    ExecutorService executorService = Executors.newCachedThreadPool();    executorService.submit(new Callable<Object>() {        @Override        public Object call() throws Exception {            System.out.println("executorService 是否为守护线程 :" + Thread.currentThread().isDaemon());            return null;        }    });    final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {        System.out.println("this is lambda supplyAsync");        System.out.println("supplyAsync 是否为守护线程 " + Thread.currentThread().isDaemon());        try {            TimeUnit.SECONDS.sleep(2);        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("this lambda is executed by forkJoinPool");        return "result1";    });    final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {        System.out.println("this is task with executor");System.out.println("supplyAsync 使用executorService 时是否为守护线程 : " + Thread.currentThread().isDaemon());        return "result2";    }, executorService);    System.out.println(completableFuture.get());    System.out.println(future.get());    executorService.shutdown();}
复制代码


这些任务中带有 supply 是持有返回值的,run 是 void 返回值的,在玩 supply 时发现一个问题:如果使用 supplyAsync 任务时不使用任务的返回值,即 不用 get 方法阻塞主线程会导致任务执行中断。

注:跟 get 方法无关,后面有答案



然后我开始探索是否是只有 supplyAsync 是这样。我测试了 runAsync 发现也是这样。


下图为与 supplyAsync 任务执行不全面一样的问题,我甚至测试了将 lambda 换成 runnable 发现无济于事。


答案:

造成这个原因是因为 Daemon。因为 completableFuture 这套使用异步任务的操作都是创建成了守护线程,那么我们没有调用 get 方法不阻塞这个主线程的时候。主线程执行完毕,所有线程执行完毕就会导致一个问题,就是守护线程退出。

那么我们没有执行的代码就是因为主线程不再跑任务而关闭导致的,可能这个不叫问题,因为在开发中我们主线程常常是一直开着的。但是这个小问题同样让我想了好久。

下面我们开一个非守护线程,可以看到程序执行顺利。


下面证实守护线程在其他非守护线程全部退出的情况下不继续执行。

final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {    System.out.println("this is lambda supplyAsync");    System.out.println("supplyAsync 是否为守护线程 " + Thread.currentThread().isDaemon());    try {        TimeUnit.SECONDS.sleep(1);        try(BufferedWriter writer = new BufferedWriter                (new OutputStreamWriter(new FileOutputStream(new File("/Users/zhangyong/Desktop/temp/out.txt"))))){            writer.write("this is completableFuture daemon test");        }catch (Exception e){            System.out.println("exception find");        }    } catch (InterruptedException e) {        e.printStackTrace();    }    System.out.println("this lambda is executed by forkJoinPool");    return "result1";});
复制代码

这个代码就是操作本地文件,并且 sleep 了一秒。其他线程就一句控制台输出的代码,最终的结果是文件没有任何变化。

当我把主线程 sleep 5 秒时,本地文件会写入一句 this is completableFuture daemon test 验证成功。

(2)allOf&anyOf

这两个方法的入参是一个 completableFuture 组、allOf 就是所有任务都完成时返回,但是是个 Void 的返回值。

anyOf 是当入参的 completableFuture 组中有一个任务执行完毕就返回,返回结果是第一个完成的任务的结果。

public static void otherStaticMethod() throws ExecutionException, InterruptedException {        final CompletableFuture<String> futureOne = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                System.out.println("futureOne InterruptedException");            }            return "futureOneResult";        });        final CompletableFuture<String> futureTwo = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(6000);            } catch (InterruptedException e) {                System.out.println("futureTwo InterruptedException");            }            return "futureTwoResult";        });        CompletableFuture future = CompletableFuture.allOf(futureOne, futureTwo);        System.out.println(future.get());//        CompletableFuture completableFuture = CompletableFuture.anyOf(futureOne, futureTwo);//        System.out.println(completableFuture.get());    }
复制代码



(3) completedFuture 这个方法我没懂他是干啥的,源码就是返回一个值。感觉没啥意义。


(4)取值方法,除了 get 还有一个 getNow(); 这个就比较特殊了。

这个方法是执行这个方法的时候任务执行完了就返回任务的结果,如果任务没有执行完就返回你的入参。


(5)join 方法跟线程的 join 用法差不多。


(6) whenXXX ,在一个任务执行完成之后调用的方法。

这个有三个名差不多的方法: whenComplete 、 whenCompleteAsync 、还有一个是 whenCompleteAsync 用自定义 Executor


首先看一下这个 whenComplete 实例方法。这个就是任务执行完毕调用的,传入一个 action,这个方法的执行线程是当前线程,意味着会阻塞当前线程。

下面图中 test 的输出跟 whenComplete 方法运行的线程有关,运行到 main 线程就会阻塞 test 的输出,运行的是 completableFuture 线程则不会阻塞住 test 的输出。


下面是任务执行的线程的探索。



根据测试得出的结论是:如果调用 whenComplete 的中途,还发生了其他事情,图中的主线程的 sleep(400); 导致 completableFuture 这个任务执行完毕了,那么就使用主线程调用。

如果调用的中途没有发生其他任务且在触碰到 whenComplete 方法时 completableFuture 这个任务还没有彻底执行完毕那么就会用 completableFuture 这个任务所使用的线程。

下面是 whenCompleteAsync 方法。这个方法就是新创建一个异步线程执行。所以不会阻塞。


(7) then 方法瞅着挺多的,实际上就是异不异步和加不加自定义 Executor


注: whenComplete 中出现的问题在 then 中测试不存在、使用的就是上一个任务的线程。这个 thenCompose 就是一个任务执行完之后可以用它的返回结果接着执行的方法,方法返回的是另一个你期盼泛型的结果。

compose 理解就是上一个任务结果是 then 的一部分。


下面介绍一下 thenCombine

这个 combine 的理解就是结合两个任务的结果。


综上:这个线程的问题并不是大问题,只要你不用线程来做判断条件,他并不会影响你的效率。试想 pool 线程都执行完了就用主线程跑呗。没跑完,而使你等了那你就用 pool 线程呗。

thenRun 就是这个任务运行完,再运行下一个任务,感觉像是 join 了一下。


其余不再介绍,大同小异。

像 thenApply(Function); 这样的就是有入参有返回值类型的。

像 thenAccept(Consumer); 这样的就是有入参,但是没有返回值的。详情在上文中有过关于函数式接口的叙述。

用户头像

Java快了!

关注

还未添加个人签名 2022.09.03 加入

还未添加个人简介

评论

发布
暂无评论
Java 8 的异步利器:CompletableFuture源码解析(建议精读)_java;_Java快了!_InfoQ写作社区