写点什么

手把手教学妹 CompletableFuture 异步化, 性能关系直接起飞!

作者:Java高工P7
  • 2021 年 11 月 12 日
  • 本文字数:2618 字

    阅读完需:约 9 分钟

System.out.println("T2:拿茶叶...");


sleep(1, TimeUnit.SECONDS);


return "龙井";


});


//任务 3:任务 1 和任务 2 完成后执行:泡茶


CompletableFuture<String> f3 =


f1.thenCombine(f2, (__, tf)->{


System.out.println("T1:拿到茶叶:" + tf);


System.out.println("T1:泡茶...");


return "上茶:" + tf;


});


//等待任务 3 执行结果


System.out.println(f3.join());


void sleep(int t, TimeUnit u) {


try {


u.sleep(t);


}catch(InterruptedException e){}


}


// 一次执行结果:


T1:洗水壶...


T2:洗茶壶...


T1:烧开水...


T2:洗茶杯...


T2:拿茶叶...


T1:拿到茶叶:龙井


T1:泡茶...


上茶:龙井


创建 CompletableFuture 对象


====================================================================================


创建 CompletableFuture 对象主要靠下面代码中展示的这 4 个


静态方法




头两个使用默认线程池。

runAsync(Runnable runnable)

Runnable 接口的 run()方法没有返回值


supplyAsync(Supplier supplier)

Supplier 接口的 get()方法有返回值



CompletableFuture 默认使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism设置 ForkJoinPool 线程池的线程数)。


若所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,推荐根据不同业务类型创建不同的线程池,以


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


避免互相干扰。下两个方法可指定线程池

runAsync(Runnable runnable, Executor executor)

supplyAsync(Supplier supplier, Executor executor)


创建完 CompletableFuture 对象后,会自动异步执行 runnable.run()或者 supplier.get(),对一个异步操作,我们关注:


  • 异步操作什么时候结束

  • 如何获取异步操作的执行结果


因为 CompletableFuture 类实现了 Future 接口,所以这些都是通过 Future 接口解决的。


CompletableFuture 类还实现了 CompletionStage 接口


CompletionStage 接口


================================================================================


任务有时序关系,比如


  • 串行


比如烧水泡茶,其中洗水壶和烧开水


  • 并行


洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间


  • 汇聚


烧开水、拿茶叶这俩任务和泡茶就是汇聚


CompletionStage 接口可清晰描述任务之间的这种时序关系,例如


f3 = f1.thenCombine(f2, ()->{})


描述的就是一种汇聚关系。烧水泡茶中的汇聚关系是一种


  • AND 聚合关系


AND 指所有依赖的任务(烧开水和拿茶叶)都完成后才开始执行当前任务(泡茶)


还有


  • OR 聚合关系


OR 指的是依赖的任务只要有一个完成就可以执行当前任务。


1 串行关系




CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。

thenApply 系


fn 的类型是接口 Function<T, R>,这个接口里与 CompletionStage 相关的方法是 R apply(T t)



该方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage<U>

thenAccept 系


参数 consumer 的类型是接口Consumer<T>,这个接口里与 CompletionStage 相关的方法是


void accept(T t)



该方法虽然支持参数,但不支持返回值,所以 thenAccept 系方法返回值是 CompletionStage。

thenRun 系


参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage<Void>


Async 表示异步执行 fn、consumer 或 action。

thenCompose 系

这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系相同。


看如何使用 thenApply()。


supplyAsync()启动一个异步流程,之后是两个串行操作。虽然这是一个异步流程,但任务 1、2、3 是串行执行,即 2 依赖 1 的执行结果,3 依赖 2 的执行结果。



2 AND 汇聚




主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系接口


3 OR 汇聚




主要是 applyToEither、acceptEither 和 runAfterEither 系接口


CompletionStage applyToEither(other, fn);


CompletionStage applyToEitherAsync(other, fn);


CompletionStage acceptEither(other, consumer);


CompletionStage acceptEitherAsync(other, consumer);


CompletionStage runAfterEither(other, action);


CompletionStage runAfterEitherAsync(other, action);


如何使用 applyToEither()描述 OR 汇聚关系。


CompletableFuture<String> f1 =


CompletableFuture.supplyAsync(()->{


int t = getRandom(5, 10);


sleep(t, TimeUnit.SECONDS);


return String.valueOf(t);


});


CompletableFuture<String> f2 =


CompletableFuture.supplyAsync(()->{


int t = getRandom(5, 10);


sleep(t, TimeUnit.SECONDS);


return String.valueOf(t);


});


CompletableFuture<String> f3 =


f1.applyToEither(f2,s -> s);


System.out.println(f3.join());


CompletableFuture 中各种关系(并行、串行、聚合),支持的各种场景。 比如:线程 A 等待线程 B 或线程 C 等待线程 A、B 。


其实 CountdownLatch、ThreadPoolExecutor 和 Future 就是来解决这些关系场景的,现在有了 completableFuture,可以优先考虑使用 CompletableFuture。


4 异常处理


=====================================================================


fn、consumer、action 的核心方法都不允许抛受检异常,但无法限制它们抛运行时异常,例如下面的代码,执行 1/0 就会出现除 0 错误的运行时异常


非异步编程里,可以用 try/catch 捕获并处理异常,异步编程里该如何处理呢?



CompletionStage 给出的方案很简单,使用这些方法处理异常和串行操作一样的,而且还支持链式编程。


CompletionStage exceptionally(fn);


CompletionStage<R> whenComplete(consumer);


CompletionStage<R> whenCompleteAsync(consumer);


CompletionStage<R> handle(fn);


CompletionStage<R> handleAsync(fn);


  • exceptionally()类似 try/catch 中的 catch



  • whenComplete()和 handle()类似 try/finally 的 finally,无论是否发生异常都会执行 whenComplete()中的回调方法 consumer 和 handle()中的回调方法 fn


whenComplete()不支持返回结果,handle()支持返回结果。


学了这么多,最后来看个例子:


//采购订单


PurchersOrder po;


CompletableFuture<Boolean> cf =


CompletableFuture.supplyAsync(()->{


// 在 MySQL 中查询规则


return findRuleByJdbc();


}).thenApply(r -> {


// 规则校验


return check(po, r);


});


Boolean isOk = cf.join();

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
手把手教学妹CompletableFuture异步化,性能关系直接起飞!