写点什么

【函数式编程实战】(十一) CompletableFuture、反应式编程源码解析与实战

  • 2022 年 7 月 28 日
  • 本文字数:7019 字

    阅读完需:约 23 分钟


前言📫 作者简介:小明 java 问道之路,专注于研究计算机底层,就职于金融公司后端高级工程师,擅长交易领域的高安全/可用/并发/性能的设计和架构📫 🏆 Java 领域优质创作者、阿里云专家博主、华为云专家🏆🔥 如果此文还不错的话,还请👍关注、点赞、收藏三连支持👍一下博主哦


本文导读

Java 代码为了更好的发展和性能,开发了 异步编程的模式,Future 异步编程和 CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9 中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。

一、同步与异步

1、为什么要有异步

在 Java 发展的这 20 年,他只做了一件事不被淘汰,为了不被淘汰不断的更新 jdk 的版本,以便使用计算机硬件、操作系统以及新的编程概念。

Java 一开始提供了 synchronized 锁、Runable,后面 java5 有引入了 java.util.concurrent 包,java7 中的 forkjoin 框架 java.util.concurrent.RecursiveTask,到 java8 中 Stream 流、lambda 表达式的支持,这一切都是为了支持高并发。

即便如此,多线程虽然极大的提升了性能,如果合理的使用线程池的话,好处,第一可以降低资源消耗,重复利用已创建的线程;第二:提高响应速度,任务可以不需要等到线程创建就能立即执行;第三:提高线程的可管理性。统一分配、调优和监控。但是线程池也不是没有缺点,使用 k 个线程的线程池就只能并发的执行 k 个任务,其他任务还是回休眠或者阻塞

这时候如果有线程不和其他任务相关联,又可以不用阻塞,就好了。Java8 考虑到了,充分发挥了计算机硬件的处理能力,异步 API 应运而生。

2、什么是同步?什么是异步?

同步就是 a 程序强依赖 b 程序,我必须等到你的回复或者执行完毕,才能做出下一步响应,类似于编程中程序被解释器(JVM)顺序执行一样(加载 > 验证 > 准备 > 解析 > 初始化);

异步则相反,a 程序不强依赖 b 程序,响应的时间也无所谓,无论你返回还是不返回,a 程序都能继续运行,也就是说我不存在等待对方的概念,a 程序就是 异步非阻塞的。

下面举一个例子就说明什么是同步、什么是异步

异步编程涉及两种风格,Future 风格 API 和反应式风格 API ,Future<Integer> fun(int a){},fun( a , x-> {}),这两个模式的实战会在后面小结讲解。

二、Future 异步编程

1、Future 接口

Java5 中就引入了 Future 接口,他的涉及初衷就是异步计算,例如我们结算一个商户下的所有订单,这个时候并不需要 for 循环去累加,Future 接口使用的时候只需要封装 Callable 中,再提交给 ExecutorService。

2、Future 接口的使用

Future 接口的使用看下 Java8 之前是如何使用异步的

public static void main(String[] args) throws Exception {    List<OrderInfo> orderInfos = Arrays.asList(new OrderInfo("123", BigDecimal.ONE),            new OrderInfo("456", BigDecimal.TEN), new OrderInfo("789", BigDecimal.TEN));
// 创建 ExecutorService 通过它可以向线程池提交任务 ExecutorService executorService = Executors.newCachedThreadPool();
// 异步操作的同时,可以进行其他操作 Future<BigDecimal> decimalFuture = executorService.submit(new Callable<BigDecimal>() { @Override public BigDecimal call() throws Exception { return reduceAmt(orderInfos); } });
// Java8 写法 Future<BigDecimal> decimalFuture = executorService.submit(() -> reduceAmt(orderInfos));
System.out.println(decimalFuture.get());}
private static BigDecimal reduceAmt(List<OrderInfo> orderInfos) { return orderInfos.stream() .map(OrderInfo::getOrderAmt) .reduce(BigDecimal.ZERO, BigDecimal::add);}
复制代码

异步编程可以在 ExecutorService 中,以并发的方式调用另一个线程执行操作,后续调用 get() 方法获取操作结果,如果操作完成会立刻返回,如果操作没有完成则回阻塞线程,直到操作完成返回。

3、Future 接口的缺陷(局限性)

Future 接口 还提供了方法来检测异步计算是否已经结束(isDone() 方法),等待异步操作结束。

但是当长时间计算任务完成时,该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并。


就会发生很多性能问题:1、将两个异步计算合并为一个(这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。)2、此时就要等待 Future 集合中的所有任务都完成。3、当 Future 的完成事件发生时会收到通知,使用 Future 计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果,每一步都需要等待。

三、CompletableFuture 接口详解

1、CompletableFuture 的创建

CompletableFuture.runAsync()也可以用来创建 CompletableFuture 实例。与 supplyAsync()不同的是,runAsync()传入的任务要求是 Runnable 类型的,所以没有返回值,runAsync 适合创建不需要返回值的计算任务

通过该 supplyAsync 函数创建的 CompletableFuture 实例会异步执行当前传入的计算任务,在调用端,则可以通过 get 或 join 获取最终计算结果。

同事直接 new(构造函数创建)也是可以的,下面通过一个实战小例子看下,高并发高性能的添加购物车结构如何设计

import java.util.concurrent.CompletableFuture;import org.springframework.core.task.AsyncTaskExecutor;
public class CompletableFutureImpl { @Autowired @Qualifier("asyncExecutor") private AsyncTaskExecutor asyncTaskExecutor;
public DefaultResponseVO addGoodsCart(HttpServletRequest request, @Valid AddCartReqVO reqVo) { // 添加购物车 GetCartItemEntity respVO = mallCartProcess.addGoodsCart(buildAddGoodsCartReqVO(reqBo));
/** * 异步刷新到购物车 */ CompletableFuture voidCompletableFuture = CompletableFuture.runAsync(() -> syncAddCacheCheck(respVO), asyncTaskExecutor); // 通过supplyAsync 函数创建的 CompletableFuture<Object> uCompletableFuture = CompletableFuture.supplyAsync(() -> syncAddCacheCheck(respVO)); // 构造函数创建 CompletableFuture completableFuture = new CompletableFuture().runAsync(() -> syncAddCacheCheck(respVO));
return new DefaultResponseVO(code, msg, respVO); }
/** * 添加购物车缓存(异步刷新到购物车) */ public void syncAddCacheCheck(GetCartItemEntity cartItemEntity) { try { // 添加购物车缓存(异步刷新到购物车) mallCartProcess.getCartInfo(cartItemEntity); } catch (Exception e) { logger.error("syncAddCache error", e); } }}
复制代码

​​2、CompletableFuture.supplyAsync 源码分析

本小节讲解 CompletableFuture 的底层实现

上面为 java 8 中 supplyAsync 函数的实现源码。可以看到,当 supplyAsync 入参只有 supplier 时,会默认使用 asyncPool 作为线程池(一般情况下为 ForkJoinPool 的 commonPool),并调用内部方法 asyncSupplyStage 执行具体的逻辑。在 asyncSupplyStage 方法中,程序会创建一个空的 CompletableFuture 返回给调用方。同时该 CompletableFuture 和传入的 supplier 会被包装在一个 AsyncSupply 实例对象中,然后一起提交到线程池中进行处理。


值得注意的是,当 supplyAsync 返回时,调用方只会拿到一个空的 CompletableFuture 实例。看到这里,我们可以猜测,当计算最终完成时,计算结果会被 set 到对应的 CompletableFuture 的 result 字段中。调用方通过 join 或者 get 就能取到该 CompletableFuture 的 result 字段的值。所以,虽然实际创建 CompletableFuture 的线程和进行任务计算的线程不同,但是最终会通过 result 来进行结果的传递。这种方式与传统的 Future 中结果传递方式类似(计算线程 set 值,使用线程 get 值)。

上面为 java 8 中 AsyncSupply 的实现源码,AsyncSupply 的源码很简单。首先,它实现了 Runnable 接口,所以被提交到线程池中后,工作线程会执行其 run()方法。通过对 AsyncSupply 中 run 方法的分析,也基本证实我们之前的猜测。即计算任务由工作线程调用 run 方法执行,并设置到 CompletableFuture 的结果中。其他线程中的使用方,则可以调用该 CompletableFuture 的 join 或者 get 方法获取其结果。


因此,我们只需要搞清楚其 run()中的实现即可。在 run() 中,程序首先检查了传入的 CompletableFuture 和 Supplier 是否为空,如果均不为空,再检查 CompletableFuture 的 d.result 是否为空,如果不为空,则说明 CompletableFuture 的值已经被其他线程主动设置过了(这也是 CompletableFuture 与 Future 最大的不同之处),因此这里就不会再被重新设置一次。如果 d.result 为空,则调用 Supplier(源码中的 f 变量)的 get()方法,执行具体的计算,然后通过 completeValue 方法将结果设置到 CompletableFuture 中。最后,调用 CompletableFuture 的 postComplete()方法,执行连接到当前 CompletableFuture 上的后置任务。

3、CompletableFuture.runAsync 源码分析

通过上面的源码可以看出,runAsync 也会生成一个空的 CompletableFuture,并包装在 AsyncRun 中提交到线程池中执行。这与 supplyAsync 是完全一致的。由于 Runnable 没有返回值,这里返回的 CompletableFuture 的结果值是 Void 类型的。


AsyncRun 的 run 中,计算的执行是通过调用传入的 Runnable(源码中的 f 变量)的 run 方法进行的。由于没有返回值,所以这里在设置 CompletableFuture 的值时,使用其 completeNull()方法,设置一个特殊的空值标记。 设计方面和 supplyAsync 一致

4、CompletableFuture API 实战

thenApply、thenAccept、thenRun

thenApply 提交的任务类型需遵从 Function 签名,也就是有入参和返回值,其中入参为前置任务的结果。thenAccept 提交的任务类型需遵从 Consumer 签名,也就是有入参但是没有返回值,其中入参为前置任务的结果。thenRun 提交的任务类型需遵从 Runnable 签名,即没有入参也没有返回值。


thenCombine、thenCompose

thenCombine 最大的不同是连接任务可以是一个独立的 CompletableFuture。嵌套获取层级也越来越深。因此,需要一种方式,能将这种嵌套模式展开,使其没有那么多层级。thenCompose 的主要目的就是解决这个问题(这里也可以将 thenCompose 的作用类比于 stream 接口中的 flatMap,因为他们都可以将类型嵌套展开)。


whenComplete、handle

whenComplete 主要用于注入任务完成时的回调通知逻辑(获得的结果是前置任务的结果,whenComplete 中的逻辑不会影响计算结果)。handle 与 handle 接收的处理函数有返回值,而且返回值会影响最终获取的计算结果(产生了新的结果)

四、反应式编程

1、什么是反应式(resctive)编程

反应式编程是最近几年才提出的概念,主要有四个特征:响应式,反应式编程的响应速度应该很快,而且是稳定可预测的。韧性,系统出现失败时,任然可以继续响应服务,任何一个组件都能以异步的方式想其他组件分发任务。弹性,影响代码响应的因素的代码(系统)的负载能力,反应式编程可以增加分配的资源,受流量影响后有自动适配的能力,服务更大的负载。消息驱动,各个组件之间松耦合,组件隔离,跨组件通信使用异步消息传递。

反应式(resctive)编程在应用层的主要特征是任务以异步的方式执行,非阻塞的处理事件流,充分利用多核 CPU 的特点,大多反应式框架(RxJava 等)都可以独立开辟线程池,用于执行阻塞式操作,主线程池中运行都是无阻塞的。

2、反应式流(Flow API)

2.1、发布订阅模式

Future CompletableFuture 的思维模式是计算的执行是独立且并发的。使用 get()方法可以在执行结束后获取 Future 对象的执行结果。因此,Future 是一个一次性对象,它只能从头到尾执行代码一次。

与此相反,反应式编程的思维模式是类 Future 的对象随着时间的推移可以产生很多的结果。举个例子是 Web 服务器的监听组件对象。该组件监听来自网络的 HTTP 请求,并根据请求的内容返回相应的数据。

2.2、Flow API 源码解析

Java9 使用 java.util.concurrent.Flow 提供的接口对反应式编程进行建模,实现了名为“发布-订阅”的模型(也叫协议,简写为 pub-sub )

反应式编程有三个主要的概念,分别是:订阅者可以订阅的发布者;名为订阅的连接;消息(也叫事件),它们通过连接传输。

反应式流(Flow API)规范可以总结为 4 个接口:Publisher(发布者)、Subscriber(订阅者)、Subscription(订阅)和 Processor(处理者)

Publisher 负责生成数据,并将数据发送给 Subscription(每个 Subscriber 对应一个 Subscription)。Publisher 接口声明了一个方法 subscribe(),Subscriber 可以通过该方法向 Publisher 发起订阅。 

一旦 Subscriber 订阅成功,就可以接收来自 Publisher 的事件。这些事件是通过 Subscriber 接口上的方法发送的

Subscriber 的第一个事件是通过对 onSubscribe()方法的调用接收的。Publisher 调用 onSubscribe() 方法时,会将 Subscription 对象传递给 Subscriber。通过 Subscription,Subscriber 可以管理其订阅情况

Subscriber 可以通过调用 request() 方法来请求 Publisher 发送数据,或者通过调用 cancel()方法表明它不再对数据感兴趣并且取消订阅。当调用 request() 时,Subscriber 可以传入一个 long 类型的数值以表明它愿意接受多少数据。这也是回压能够发挥作用的地方,以避免 Publisher 发送多于 Subscriber 能够处理的数据量。在 Publisher 发送完所请求数量的数据项之后,Subscriber 可以再次调用 request()方法来请求更多的数据。

Subscriber 请求数据之后,数据就会开始流经反应式流。Publisher 发布的每个数据项都会通过调用 Subscriber 的 onNext()方法递交给 Subscriber。如果有任何错误,就会调用 onError()方法。如果 Publisher 没有更多的数据,也不会继续产生更多的数据,那么将会调用 Subscriber 的 onComplete() 方法来告知 Subscriber 它已经结束

反应式流规范的接口本身并不支持以函数式的方式组成这样的流。Reactor 项目是反应式流规范的一个实现,提供了一组用于组装反应式流的函数式 API。有我们自己实现。

2.3、Flow API 实战

FlowImpl :创建 Publisher 并向其订阅 TempSubscriber

public class FlowImpl {    public static void main(String[] args) {        // 创建 Publisher 并向其订阅 Subscriber        getOrderAmt("20220727123").subscribe(new SubscriberImpl());    }
// Publisher是个函数式接口 private static Flow.Publisher<OrderInfo> getOrderAmt(String orderId) { return subscriber -> subscriber.onSubscribe(new SubscriptionImpl(subscriber, orderId)); }}
复制代码


Subscription 接口:实现向 Subscriber 发送 OrderInfo Steam

public class SubscriptionImpl implements Flow.Subscription {    private final Flow.Subscriber<? super OrderInfo> subscriber;    private final String orderId;
public SubscriptionImpl(Flow.Subscriber<? super OrderInfo> subscriber, String orderId) { this.subscriber = subscriber; this.orderId = orderId; }
@Override public void request(long n) { // 另起一个线程向 subscriber 发送下一个元素 Executors.newSingleThreadExecutor().submit(() -> { for (long i = 0L; i < n; i++) // subscriber 每处理一个请求执行一次循环 try { // 将当前 订单号 发送给 Subscriber subscriber.onNext(OrderInfo.reduceAmt(orderId)); } catch (Exception e) { // 查询报错将这个报错信息传给 Subscriber subscriber.onError(e); e.printStackTrace(); break; } }); }
@Override public void cancel() { // 如果 Subscription 被取消了,向 subscriber 发送一个完成信号 subscriber.onComplete(); }}
复制代码

Subscriber 接口:实现打印输出收到的 订单 数据

public class SubscriberImpl implements Flow.Subscriber<OrderInfo> {    private Flow.Subscription subscription;    @Override    public void onSubscribe(Flow.Subscription subscription) {        this.subscription = subscription;        subscription.request(1);    }    @Override    public void onNext(OrderInfo orderInfo) {        System.out.println(orderInfo);        subscription.request(1);    }    @Override    public void onError(Throwable throwable) {        System.out.println(throwable.getMessage());    }    @Override    public void onComplete() {        System.out.println("Done!");    }}
复制代码

总结

Java 代码为了更好的发展和性能,开发了 异步编程的模式,Future 异步编程和 CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9 中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。

发布于: 5 小时前阅读数: 28
用户头像

物有本末,事有终始。知所先后,则近道矣 2020.03.20 加入

🏆CSDNJava领域优质创作者/阿里云专家博主/华为云专家 📫就职某大型金融互联网公司后端高级工程师 👍专注于研究计算机底层/Java/架构/设计模式/算法

评论

发布
暂无评论
【函数式编程实战】(十一) CompletableFuture、反应式编程源码解析与实战_CompletableFuture_小明Java问道之路_InfoQ写作社区