初入响应式编程 (下)

用户头像
CD826
关注
发布于: 2020 年 04 月 29 日
初入响应式编程(下)



上一篇:《初入响应式编程(上)》

1. Reactive Streams的基础API

前面我们讲Java自JDK9之后引入了响应式编程范型:Reactive Streams,但并未提供任何实现,而是通过PublisherSubscriberSubscriptionProcessor这四个接口定义了一套响应式编程的标准。下面让我们近距离接触一下这四个接口。

1.1 Publisher

Publisher的主要职责是生产事件,并以异步的方式推送给相关的订阅者,在标准中接口定义如下:

package org.reactivestreams;
public interface Publish<T> {
void subscribe(Subscriber<? super T> s);
}

该接口的定义十分简单,仅仅提供了一个subscribe()方法,允许订阅者通过该方法订阅Publisher所发布的事件。



注意到,该接口的声明是一个泛型接口声明,也就是我们通过该接口可以向订阅者发布任意类型的事件或数据消息,该事件或数据消息的类型由我们自己来定义。



对于Publisher来说会向订阅者发布以下4种事件:

  1. Subscription Event: 订阅事件;

  2. Data Event: 类型为T的事件;

  3. Completion Event: 完成事件;

  4. Error Event: 异常事件.



对于类型为T的数据事件(Data Event),Publisher可以发布任何数量,当然也可以是0次,也就是说在极端情况下可能不会向订阅者发布任何数据事件。



而订阅事件(Subscription Event)、完成事件(Completion Event)和异常事件(Error Event)则最多只会发布一次,而且一旦向订阅者发布了完成事件或异常事件,那么Publisher将不会再向订阅者发布任何数据事件。



一旦一个SubscriberPublisher订阅成功,则Subscriber将按照下图中的顺序依次接收到Publisher所发布的事件:

Subscriber接收Publish事件的顺序图



此外,Publisher在向订阅者发布数据事件时,必须支持标准中所规定的背压机制(backpressure),也就是说Publisher不可以向订阅者发布超过其所请求数量的数据消息。至于订阅者如何向Publisher请求以及请求多少个数据消息,我们将在下一小节进行讲解。



什么是背压机制?Backpressure这个概念源自工程。原意是指在管道运输中,气流或液流由于管道突然变细、遇到急弯等原因导致出现了下游向上游的逆向压力,这种情况就称作back pressure,国内工程界这个词的翻译就是背压。



而在编程中是指数据流从上游生产者向下游消费者传输,上游生产速度大于下游消费速度,从而导致下游的Buffer溢出,我们就称这种现象为backpressure出现。



对于背压这个概念的理解需要强调一点是,背压的重点不在于上游生产速度大于下游消费速度,而在于下游Buffer溢出。因为一旦我们下游处理设置了Buffer,那么就有可能会遇到这种情况。但是,根据墨菲法则,下游一旦设置Buffer,则这种现象一定会发生。而且这种现象一旦出现,下游就面临一种危险的境地,此时下游唯一的选择就是丢弃上游发送过来的事件,别无它法。

1.2 Subscriber

Subscriber就是订阅者,用于监听Publisher所发布的事件,并作出相应的业务处理。Subscriber接口的代码定义如下:

package org.reactivestreams;
public interface Subscriber<T> {
void onSubscribe(Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}



下面,让我们依次来了解一下这些方法:

  • onSubscribe(Subscription s): 一旦订阅者成功注册到PublisherPublisher就会向订阅者发布一个订阅事件(subscription Event)事件,并将所构造Subscription对象传递给订阅者。后续订阅者可以使用该对象向Publisher发起数据消息请求等处理;

  • onNext(T t): Publisher发送给订阅者的所有数据事件消息均由该方法进行处理。不过需要强调一点,Publisher有可能一个数据事件都不会发布;

  • onCompletion(): 当Publisher完成全部数据事件的发送时就会向订阅者发布该事件。一旦该事件发布,那么Subscriber就不能够再向Publisher请求任何数据,而且Publisher也不会再发送任何事件给订阅者。此时,我们需要将订阅者所持有的Subscription对象设置为空。不过,这里也需要强调一下,该事件有可能永远都不发布,比如Publisher出现异常时(此时发布的是异常事件)或者Publisher是一个无限数据消息发送者时,此事件也不会发布;

  • onError(Throwable t): 不论是在数据事件消息发布时,还是在订阅者订阅时,一旦出现异常Publisher就会发布该事件给订阅者,订阅者可以在该方法中监听并进行相关处理。和完成事件一样,一旦PublisherSubscriber发布了该事件消息,订阅者将不能够再向Publisher请求更多数据,而且Publisher也不能再发送任何事件给订阅者。此时,我们需要将订阅者所持有的Subsciption设置为无效,以防止再次数据请求。

1.3 Subscription

Subscription在Reactive Streams中是一个非常重要的角色,订阅者可以通过其对数据流的传输进行控制,避免Publisher发布过多的数据消息,造成订阅者的崩溃。我们可以通过Subscription所提供的下面两个方法来实现数据请求的处理:

package org.reactivestreams;
public interface Subscription {
void request(long n);
void cancel();
}



我们知道一旦订阅者订阅成功,Publisher就会将其所构造的Subscription传递给订阅者,然后订阅者就可以通过Subscriptionrequest()方法向Publisher请求数据。request()中的参数,也就是告诉Publisher最多能够发送多少个数据消息给订阅者,一旦发送的个数超过了该数目,则必须停止发送。



相应的,订阅者则使用onNext()方法对Publisher所发布的数据进行处理。一旦处理了所有数据,就可以再次通过request()方法请求更多数据。不过在请求时一旦将request()的参数设置为Long.maxValue,那么Publisher将会不限个数的向订阅者发送数据消息,直至数据发送完成或者出错。



当然,订阅者也可以借助Subscription中的cancel()方法让Publisher中止数据消息的发送。不过,一旦调用该方法,订阅者还是有可能再收到一些数据消息(之前余留的数据事件)。但是,一旦取消成功,订阅者则不能够通过request()方法再向Publisher请求数据,同时应将其所持有的Subscription置为无效。

1.4 Processor

Processor接口的代码定义如下:

package org.reactivestreams;
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}



该接口同时继承了PublisherSubscriber,并未增加其它任何方法声明。因此,它即是一个消息发生者,也是一个消息的订阅者,所以在使用时常常将Processor作为发布者和订阅者之间数据转换处理器。而且,一旦在数据处理过程中出现了错误,则必须将异常传递给下游的订阅者。



另外,我们也可以将Processor视作将PublisherSubscriber连接到单个数据流中所需的管道。

2. 响应式编程的两个原则

响应式编程和我们之前所使用的命令式编程还是有非常大的差别。所以,我在这里首先和大家分享两个响应式编程的原则。

2.1 异步原则

第一个异步原则。我们知道一个异步执行的任务可以让我们无须等待前一个任务执行完成,从而可以大大提高任务执行的效率及硬件的利用率,更为重要的是异步执行可让我们将任务进行解耦。



因此,Reactive Streams提供异步执行也正是基于这个原因。所以,当我们在编写响应式程序时一定要注意,一个发布者(Publisher)在生成所要发布的数据消息时,或者一个订阅者(Subscriber)在处理一个数据消息时可以是同步的、阻塞式的。但是,当发布者(Publisher)将数据发布给订阅者(Subscriber)时必须是异步的,它们之间不能产生阻塞,否则我们所讨论响应式编程将变成一个笑话。

2.2 背压机制(backpressure)

背压机制是响应式编程中一个非常重要的原则,也是我们在编写响应式程序时所必须遵守的一个原则。背压机制避免了订阅者(Subscriber)在处理事件时所面临的超限(overrun)状况。根据背压机制,发布者(Publisher)只能向订阅者(Subscriber)发布等于或少于订阅者所请求个数的数据消息,而订阅者可以视其自身处理能力来决定是否再向发布者请求更多数据消息。



而当发布者生产数据消息的速率大于订阅者处理速率的话,发布者可以构建缓存来存储尚未发送的事件,也可以简单的抛弃。具体采用何种策略则由具体业务来决定。



此外,对于订阅者来说视其应用场景的不同可以一次请求多个数据消息,也可以一次仅请求一个数据消息。

3. 进入Reactor

正如之前所讲的,Java中所提供的Reactive Stream所仅仅是一个标准,其本身仅是相应的接口定义,并未提供任何具体的实现,而这些实现由第三方来完成。目前所知这些实现有: RxJava、Reactor、akka等等。其中Reactor则是由Pivotal开源组织提供,该组织也是Spring的提供者,因此顺理成章Spring 5中所引入的响应式编程就是基于该项目完成。所以后续我们将会着重介绍Reactor,至于其它的实现,大家可以自行研究。



最新版本的Reactor,也就是3.x版,可以说是完全支持Reactive Streams所定义的标准,包括背压机制。在正式开始讲解Reactor 之前先让我们粗略了解一下Reactor的特性。

3.1 无限数据流

在数据流的发布上Reactor提供了无限数据流的支持。同时,针对背压机制,提供了request-respone模式,使得Reactor提供每次仅发送一个数据事件消息的能力。

3.2 推-拉模式的支持(push-pull model)

因为发布者和订阅者之间生产和消费速率的不同,所以Reactor在事件消息的处理上提供了推(push)和拉(pull)这两种模式。当发布者生产数据消息的速率大于订阅者消费速率的话,可以采用拉(pull)模式,由订阅者主动来拉取数据消息。而当发布者生产数据消息的速率小于订阅者消费速率的话,可采用发布者主动推送(push)的模式。

3.3 丰富的操作处理

Reactor提供了丰富的操作,使用这些操作我们可以对数据流进行筛选、过滤、转换以及合并等处理。Reacive Streams在对数据流处理上借鉴了Unix中管道处理模式,每一个操作都可视为管道中的一个节点,我们可以通过组合这些操作来构建各种处理,从而能够实现极为复杂的业务功能。

3.4 Reactor主要工程

  • Reactor-Core: Reactor的核心工程,实现了Reactive Streams所定义的API,也是Spring 5中响应式编程的核心工程;

  • Reactor-Extra: Reactor的扩展库,包含reactor-adapter和reactor-extra,增强了Reactor Core的功能;

  • Reactor-Netty: 基于Netty,提供了非阻塞的、支持背压的TCP/HTTP/UDP客户端和服务端;

  • Reactor-Test: Reactor的测试套件。

发布于: 2020 年 04 月 29 日 阅读数: 75
用户头像

CD826

关注

Talk is cheap, show me the code. 2020.04.10 加入

还未添加个人简介

评论

发布
暂无评论
初入响应式编程(下)