写点什么

Reactive Spring 实战 -- 理解 Reactor 的设计与实现

用户头像
binecy
关注
发布于: 2020 年 12 月 22 日

Reactor 是 Spring 提供的非阻塞式响应式编程框架,实现了 Reactive Streams 规范。 它提供了可组合的异步序列 API,例如 Flux(用于[N]个元素)和 Mono(用于[0 | 1]个元素)。


Reactor Netty 项目还支持非阻塞式网络通信,非常适用于微服务架构,为 HTTP(包括 Websockets),TCP 和 UDP 提供了响应式编程基础。


本文通过例子展示和源码阅读,分析 Reactor 中核心设计与实现机制。

文本 Reactor 源码基于 Reactor 3.3


名词解析

响应式编程,维基百科解析为

>reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s)


响应式编程是一个专注于数据流和变化传递的异步编程范式。 这意味着使用编程语言可以很容易地表示静态(例如数组)或动态(例如事件发射器)数据流。


下面简单解释一下相关名词。

数据流与变化传递,我的理解,数据流就如同一条车间流水线,数据在上面传递,经过不同的操作台(我们定义的操作方法),可以被观测,被过滤,被调整,或者与另外一条数据流合并为一条新的流,而操作台对数据做的改变会一直向下传递给其他操作台。

java 8 lambda 表达式就是一种数据流形式

lists.stream().filter(i -> i%2==0).sorted().forEach(handler);
复制代码

lists.stream(),构建一个数据流,负责生产数据。

filter,sorted 方法以及 handler 匿名类,都可以视为操作台,他们负责处 理数据。


这里还涉及两个概念

声明式编程,通过表达式直接告诉计算机我们要的结果,具体操作由底层实现,我们并不关心,如 sql,html,spring spel。


对应的命令式编程,一步一步告诉计算机先做什么再做什么。我们平时编写 java,c 等代码就是命令式编程。

上例中通过 filter,sorted 等方法直接告诉计算机(Spring)执行过滤,排序操作,可以理解为声明式编程。

注意,我的理解是,声明式,命令式编程并没有明确的界限。

越是可以直接通过声明表达我们要什么,就越接近声明式编程,反之,越是需要我们编写操作过程的,就越接近命令式编程。

如 Spring 中的声明式事务和编程式事务。

可参考:https://www.zhihu.com/question/22285830


函数式编程,就是将函数当做一个数据类型,函数作为参数,返回值,属性。

Java 不支持该模式,通过匿名类实现,如上例中 forEach 方法。

注意,函数式编程还有很多学术性,专业性的概念,感兴趣的同学可以自行了解。


响应式编程,主要是在上面概念加了异步支持。

这个异步支持非常有用,它可以跟 Netty 这些基于事件模型的异步网络框架很好地结合,下一篇文章我们通过 WebFlux 来说明这一点。


数据流转

下面我们来简单看一下 Reactor 的设计与实现吧。

首先通过一个小用例,来看一个 Reactor 中如何生产数据,又如何传递给订阅者。

@Testpublic void range() {    // [1]    Flux flux = Flux.range(1, 10);    // [2]    Subscriber subscriber = new BaseSubscriber<Integer>() {        protected void hookOnNext(Integer value) {            System.out.println(Thread.currentThread().getName() + " -> " + value);            request(1);        }    };    // [3]    flux.subscribe(subscriber);}
复制代码

Reactor 中,发布者 Publisher 负责生产数据,有两种发布者,Flux 可以生产 N 个数据,Mono 可以生产 0~1 个数据。

订阅者 Subscriber 负责处理,消费数据。

1 构建一个发布者 Flux

注意,这时发布者还没开始生产数据。

2 构建一个订阅者 Subscriber

3 创建订阅关系,这时,生产者开始生产数据,并传递给订阅者。


Flux.range,fromArray 等静态方法都会返回一个 Flux 子类,如 FluxRange,FluxArray。


Publisher#subscribe,该方法很重要,它负责创建发布者与订阅者的订阅关系。

Flux#subscribe

public final void subscribe(Subscriber<? super T> actual) {    CorePublisher publisher = Operators.onLastAssembly(this);    CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
try { ...
publisher.subscribe(subscriber); } catch (Throwable e) { Operators.reportThrowInSubscribe(subscriber, e); return; }}
复制代码

获取内部的 CorePublisher,CoreSubscriber。

Flux 子类都是一个 CorePublisher。

我们编写的订阅者,都会转化为一个 CoreSubscriber。


CorePublisher 也有一个内部的 subscribe 方法,由 Flux 子类实现。

FluxRange#subscribe

public void subscribe(CoreSubscriber<? super Integer> actual) {    ...    actual.onSubscribe(new RangeSubscription(actual, st, en));}
复制代码

Subscription 代表了发布者与订阅者之间的一个订阅关系,由 Publisher 端实现。

Flux 子类 subscribe 方法中通常会使用 CoreSubscriber 创建为 Subscription,并调用订阅者的 onSubscribe 方法,这时订阅关系已完成。


下面来看一下 Subscriber 端的 onSubscribe 方法

BaseSubscriber#onSubscribe -> hookOnSubscribe

protected void hookOnSubscribe(Subscription subscription) {    subscription.request(9223372036854775807L);}
复制代码

Subscription#request 由 Publisher 端实现,也是核心方法,订阅者通过该方法向发布者拉取特定数量的数据。

注意,这时发布者才开始生产数据。


RangeSubscription#request -> RangeSubscription#slowPath -> Subscriber#onNext

void slowPath(long n) {    Subscriber<? super Integer> a = this.actual;    long f = this.end;    long e = 0L;    long i = this.index;
while(!this.cancelled) { // [1] while(e != n && i != f) { a.onNext((int)i); if (this.cancelled) { return; }
++e; ++i; }
... }}
复制代码

1 RangeSubscription 负责生产指定范围内的整数,并调用 Subscriber#onNext 将数据推送到订阅者。


可以看到,

Publisher#subscribe 完成订阅操作,生成 Subscription 订阅关系,并触发订阅者钩子方法 onSubscribe。

订阅者的 onSubscribe 方法中,订阅者开始调用 Subscription#request 请求数据,这时发布者才开始生产数据,并将数据推给订阅者。


操作符方法

跟 java 8 lambda 表达式一样,Reactor 提供了很多的声明式方法,这些方法类似于操作符,直接操作数据(下文称为操作符方法)。

合理利用这些方法,可以大量简化我们的工作。


数据处理,如 skip,distinct,sort,filter

钩子方法,如 doOnNext,doOnSuccess

组合操作,flatMap,zipWhen

阻塞等待,blockLast

流量控制,limitRate

数据缓存,buffer,cache

可参考官方文档:https://projectreactor.io/docs/core/release/reference/#which-operator


注意,这些操作符方法虽然是添加到 Publisher 端,但 Reactor 会将逻辑会转移到 Subscriber 端。


看一个简单例子

Flux.range(1, 3)    .doOnNext(i -> {        System.out.println(Thread.currentThread().getName() + " doOnNext:" + i);    })    .skip(1)    .subscribe(myHandler);
复制代码

myHandler 即我们实现的 Subscriber。

每调用一次操作符方法,Flux 都会生成一个新的 Flux 子类(装饰模式),最后 Flux 类为FluxSkip[FluxPeek[FluxRange]]


我们来看一下完整的 Flux#subscribe 方法代码

public final void subscribe(Subscriber<? super T> actual) {    CorePublisher publisher = Operators.onLastAssembly(this);    CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
try { // [1] if (publisher instanceof OptimizableOperator) { OptimizableOperator operator = (OptimizableOperator)publisher;
while(true) { // [2] subscriber = operator.subscribeOrReturn(subscriber); if (subscriber == null) { return; } // [3] OptimizableOperator newSource = operator.nextOptimizableSource(); if (newSource == null) { publisher = operator.source(); break; }
operator = newSource; } } // [4] publisher.subscribe(subscriber); } catch (Throwable var6) { Operators.reportThrowInSubscribe(subscriber, var6); }}
复制代码

1 判断 Flux 是否由操作符方法产生。

2 OptimizableOperator#subscribeOrReturn 会生成新的 Subscriber,以执行操作符逻辑。如上面例子中,FluxPeek 会生成 PeekSubscriber,FluxSkip 生成 SkipSubscriber。这里将操作符逻辑转移到 Subscriber 端。

OptimizableOperator#subscribeOrReturn 也可以直接调用被装饰 Publisher 的 subscribe 方法,从而改变流程。如下面说的 FluxSubscribeOn。

3 取出上一层被装饰的 Publisher 作为新的 Publisher,如上例的FluxSkip[FluxPeek[FluxRange]],会依次取出 FluxPeek,FluxRange。

这个操作一直执行,直到取出真正生产数据的 Publisher。

4 使用真正生产数据的 Publisher,和最后包装好的 Subscriber,再调用 subscribe 方法。


上面例子中,流程如下


push/pull

Reactor 提供了 push 和 pull 两种模式。


先看一下 pull 模式

Flux.generate(sink -> {    int k = (int) (Math.random()*10);    if(k > 8)        sink.complete();    sink.next(k);}).subscribe(i -> {    System.out.println("receive:" + i);});
复制代码

Sink 可以理解为数据池,负责存储数据,根据功能不同划分,如 IgnoreSink,BufferAsyncSink,LatestAsyncSink。

Sink#next 会将数据放入池中,由 Sink 缓存或直接发送给订阅者。


Flux#generate(Consumer<SynchronousSink<T>> generator),可以理解为 pull 模式,

订阅者每调用一次 request 方法,Publisher 就会调用一次 generator 来生产数据,而且 generator 每次执行中只能调用一次 Sink.next。

generator 是单线程执行,生成数据后直接同步发送到订阅者。


push 模式可以使用 create 方法

Flux.create(sink -> {    System.out.println("please entry data");    BufferedReader br = new BufferedReader(new InputStreamReader(System.in));    while (true) {        try {            sink.next(br.readLine());        } catch (IOException e) {        }    }}).subscribe(i -> {    System.out.println("receive:" + i);});
复制代码

Flux#create(Consumer<? super FluxSink<T>> emitter),可以理解为 push 模式。

注意,Publisher 只在 Flux#subscribe 操作时调用一次 emitter,后续 request 不再调用 emitter。

我们可以将 Sink 绑定到其他的数据源,如上例的控制台,或其他事件监听器。

当数据来了,Sink 就会将它推送给订阅者。

Flux#create 生成的 Flux 可以多线程同时调用 Sink.next 推送数据,并且数据会先缓存到 Sink,后续 Sink 推送给订阅者。


push 方法与 create 类似,只是它只允许一个线程调用 Sink.next 推送数据。

上例中 create 方法使用 push 方法更合适,因为只有一个线程推送数据。


混合模式

假如一个消息处理器 MessageProcessor 需要会将普通消息直接推送给订阅者,而低级别消息由订阅者拉取。

我们可以 FluxSink#onRequest 实现混合模式

Flux.create(sink -> {    // [1]    messageProcessor.setHandler((msg) -> {        sink.next(msg);    });    // [2]    sink.onRequest(n -> {        List<String> messages = messageProcessor.getLowMsg();        for(String s : messages) {            sink.next(s);        }    });})
复制代码

1 普通消息直接推送

2 低级别消息由订阅者拉取


完整代码可参考:https://gitee.com/binecy/bin-springreactive/blob/master/order-service/src/test/java/com/binecy/FluxPushPullTest.java


线程与调度器

前面说了 reactor 是支持异步的,不过它并没有默认开启异步,我们可以通过调度器开启,如

public void parallel() throws InterruptedException {    Flux.range(0, 100)            .parallel()            .runOn(Schedulers.parallel())            .subscribe(i -> {                System.out.println(Thread.currentThread().getName() + " -> " + i);            });    new CountDownLatch(1).await();}
复制代码

parallel 将数据分成指定份数,随后调用 runOn 方法并行处理这些数据。

runOn 该方法参数指定的任务执行的线程环境。

最后的 CountDownLatch 用于阻塞主线程,以免进程停止看不到效果。


调度器相当于 Reactor 中的 ExecutorService,不同的调度器定义不同的线程执行环境。

Schedulers 提供的静态方法可以创建不同的线程执行环境。

Schedulers.immediate() 直接在当前线程执行

Schedulers.single() 在一个重复利用的线程上执行

Schedulers.boundedElastic() 在由 Reactor 维护的线程池上执行,该线程池中闲置时间过长(默认值为 60s)的线程也将被丢弃,创建线程数量上限默认为 CPU 核心数 x 10。线程数达到上限后,最多可提交 10 万个任务,这些任务在线程可用时会被执行。该线程池可以为阻塞操作提供很好的支持。阻塞操作可以执行在独立的线程上,不会占用其他资源。

Schedulers.parallel() 固定线程,对于异步 IO,可以使用该方案。


Reactor 另外提供了两个操作符方法来切换执行上下文,publishOn 和 subscribeOn。

publishOn 影响当前操作符方法后面操作的线程执行环境,而 subscribeOn 则影响整个链路的线程执行环境。

(runOn 与 publishOn 类似,影响该方法后续操作线程执行环境)


Flux.range(1, 3)        .doOnNext(i -> {            System.out.println(Thread.currentThread().getName() + " doOnNext:" + i);        })        .publishOn(Schedulers.newParallel("myParallel"))        .skip(1)        .subscribe(myHandler);
复制代码

myHandler 只是简单打印线程和数据

Consumer myHandler = i -> {    System.out.println(Thread.currentThread().getName() + " receive:" + i);};
复制代码

输出结果为

main doOnNext:1main doOnNext:2main doOnNext:3myParallel-1 receive:2myParallel-1 receive:3
复制代码

publishOn 后面的操作(包括 skip,myHandler)都已经切换到新的线程。


再来简单看一下 publishOn 与 subscribeOn 的实现

前面说了,操作符方法的逻辑会移到 Subscriber 端,上例过程示意如下

线程切换是在 PublishOnSubscriber 中完成的,所以 PublishOnSubscriber 后面的操作都在新线程上。


将上面例子代码修改一下

Flux.range(1, 3)        .doOnNext(i -> {            System.out.println(Thread.currentThread().getName() + " doOnNext:" + i);        })        .subscribeOn(Schedulers.newParallel("myParallel"))        .skip(1)        .subscribe(myHandler);
复制代码

输出结果为

myParallel-1 doOnNext:1myParallel-1 doOnNext:2myParallel-1 receive:2myParallel-1 doOnNext:3myParallel-1 receive:3
复制代码

从数据生产到消费,所有操作都在新的线程上。


示意图如下


前面说了,Flux#subscribe 中会调用 OptimizableOperator#subscribeOrReturn 方法,而在 FluxSubscribeOn 中,会直接切换任务线程,后面整个流程都执行在新线程上了。


使用 publishOn 还是 subscribeOn,关键在于阻塞操作是在生产数据时还是消费数据时。

如果阻塞操作在生产数据时,如同步查询数据库,查询下游系统,可以使用 subscribeOn

如果阻塞操作在消费数据时,如同步保存数据,可以使用 publishOn。


流量控制

响应式编程中常常会出现 Backpressure 的概念,

它是指在 push 模式下,当发布者生产数据的速度大于订阅者消费数据的速度,导致出现了订阅者传递给订阅者的逆向压力。


FluxSink.OverflowStrategy 定义了在这种场景下的几种处理策略。

IGNORE 完全忽略新的数据

ERROR Publisher 抛出异常

DROP 抛弃数据,触发 Flux#onBackpressureDrop 方法

LATEST 订阅者只能获取最新的一个数据

BUFFER 缓存所有的数据,注意,该缓存没有边界,可能导致内存溢出

FluxSink.OverflowStrategy 类似于线程池的任务拒绝策略。


下面来看一个例子

@Testpublic void backpressure() throws InterruptedException {    Flux.<Integer>create(sink -> {        for (int i = 0; i < 50; i++) {            System.out.println("push: " + i);            sink.next(i);            try {                Thread.sleep(10);            } catch (InterruptedException e) {            }        }    }, FluxSink.OverflowStrategy.ERROR)    .publishOn(Schedulers.newSingle("receiver"), 10)    .subscribe(new BaseSubscriber<Integer>() {        protected void hookOnSubscribe(Subscription subscription) {            subscription.request(1);        }        protected void hookOnNext(Integer value) {            System.out.println("receive:" + value);            try {                Thread.sleep(12);            } catch (InterruptedException e) {            }            request(1);        }        protected void hookOnError(Throwable throwable) {            throwable.printStackTrace();            System.exit(1);        }    });    new CountDownLatch(1).await();}
复制代码

1 发布者每隔 10 毫秒生产一个数据

注意,FluxSink.OverflowStrategy.ERROR参数指定了 Backpressure 处理策略

2 publishOn 方法指定后续运行线程环境

注意下文解析的第二个参数。

3 订阅者每隔 20 毫秒消费一个数据


Sink 中有一个关键字段,BaseSink#requested,代表订阅者请求数量。

每次订阅者调用Subscription#request(long n)方法,BaseSink#requested 都会加上对应数值 n。

而每次生产数据调用Sink#next时,BaseSink#requested 都会减 1。

当 Sink#next 执行,如果 BaseSink#requested 为 0,就是执行 FluxSink.OverflowStrategy 指定策略。


publishOn(Scheduler scheduler, int prefetch)方法会将 BaseSink#requested 的值初始化为 prefetch。

注意,这里并不会生产 prefetch 个数据并发送给订阅者,只会修改 BaseSink#requested。


另外,PublishOnSubscriber 中会将Subscription#request操作缓存,达到阀值后合并为一次 request 操作。


在上面的例子中阀值为prefetch - (prefetch >> 2),就是 8 了

所以我们会看到结果

receive:5push: 10push: 1121:59:55.828 [Thread-0] DEBUG reactor.core.publisher.Operators - onNextDropped: 11
复制代码

发布者发送 10(prefetch)个数据后,尽管订阅者已经消费 5 个数据,并发起 5 次 request 操作,但被 PublishOnSubscriber 缓存了,并没有发送到发布者那边,这时 BaseSink#requested 已经为 0 了,抛出 OverflowException 异常,Sink 关闭,后面的数据被抛弃。


可以将订阅者的休眠时间调整为 12 毫秒,这样当发布者发送 10(prefetch)个数据前,PublishOnSubscriber 会发起一次 request(8)的操作,可以看到

push: 1922:03:33.779 [Thread-0] DEBUG reactor.core.publisher.Operators - onNextDropped: 19
复制代码

也就是到 19 个数据才抛出异常,抛弃数据。


到这里,我们已经基本了解 Reactor 的概念,核心设计与实现机制。

下一篇文章,我们通过比较 WebFlux 和 AsyncRestTemplate,看一下响应式编程会给我们带来什么惊奇。


如果您觉得本文不错,欢迎关注我的微信公众号,系列文章持续更新中。您的关注是我坚持的动力!


发布于: 2020 年 12 月 22 日阅读数: 659
用户头像

binecy

关注

还未添加个人签名 2020.08.26 加入

还未添加个人简介

评论

发布
暂无评论
Reactive Spring实战 -- 理解Reactor的设计与实现