写点什么

SpringWeb 服务应用响应式 Web 开发组件:响应式编程和 SpringBoot

  • 2023-06-18
    湖南
  • 本文字数:6517 字

    阅读完需:约 21 分钟

应用响应式 Web 开发组件

当下,对于具有广大用户群体的新型互联网应用而言,它们基本都需要考虑如何高效应对用户流量、如何确保系统弹性等核心技术主题。


在理论和实践的结合下,响应式编程是一种新型的编程模型,是确保系统弹性的一款强有力的武器。在响应式编程领域,存在一套完整的响应式流规范以及实现这一规范的开发工具。在现实中,开发人员通常不会直接使用这些偏底层的开发工具来开发应用程序,而是借助于特定的开发框架。而我们日常开发中每天都在使用的 Spring 就是这样一个支持响应式编程的开发框架。


在 2017 年,Spring 发布了新版本 Spring 5,这是自 Spring 4 发布以来将近 4 年的时间中所发布的第一个全新版本。Spring 5 引入了很多核心功能,重要的是它全面拥抱了响应式编程的设计思想和实践。


Spring 5 的响应式编程模型以 Project Reactor 库为基础,而后者则实现了响应式流规范。事实上,Spring Boot 从 2.x 版本开始全面依赖 Spring 5。


Spring Boot 为我们提供了一系列响应式编程组件,而本章将重点关注如何使用 Spring Boot 框架来开发响应式 Web 服务。

响应式编程和 Spring Boot

响应式编程是一种新的编程技术,其目的是构建响应式系统。对于响应式系统而言,任何时候都需要确保其具备即时响应性,这是大多数日常业务场景所需要的,但却是一项非常复杂而有挑战性的任务,需要对相关技术有深入的了解。本节将讨论这些技术。

响应式流规范和实现框架

对于响应式编程而言,首先要明确的概念是数据流(Data Stream)。简单来讲,所谓的流就是由生产者生产并由一个或多个消费者消费的元素序列。而一旦有了数据流,那么就势必面临流量控制问题。流量控制是讨论数据流的核心话题。而针对如何控制流量,业界存在一个响应式流规范,以及一批实现了该规范的开发工具。

1. 响应式流规范

Java API 版本的响应式流只包含四个接口,即 Publisher<T>、Subscriber<T>、Subscription 和 Processor<T,R>。


发布者(Publisher)是潜在的包含无限数量的有序元素的生产者,它根据收到的请求向当前订阅者发送元素。Publisher<T>接口定义如下所示:

public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);}
复制代码

订阅者(Subscriber)从发布者那里订阅并接收元素。发布者向订阅者发送订阅令牌(Subscription Token)。通过订阅令牌,订阅者就可以向发布者请求多个元素。当元素准备就绪时,发布者就会向订阅者发送合适数量的元素。然后订阅者可以请求更多的元素,发布者也可能有多个来自订阅者的待处理请求。Subscriber <T>接口定义如下所示:

public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
复制代码

当执行发布者的 subscribe()方法时,发布者会回调订阅者的 onSubscribe()方法。在这个方法中,通常订阅者会借助传入的 Subscription 对象向发布者请求 n 个数据。然后发布者通过不断调用订阅者的 onNext()方法向订阅者发出最多 n 个数据。


如果数据全部发完,则会调用 onComplete()方法告知订阅者流已经发完;如果有错误发生,则通过 onError()方法发出错误提示消息,这时同样也会终止数据流。


订阅(Subscription)表示订阅者订阅的一个令牌。当订阅请求成功时,发布者将其传递给订阅者。订阅者使用订阅令牌与发布者进行交互,例如请求更多的元素或取消订阅。Subscription 接口定义如下所示:

public interface Subscription {
public void request(long n);
public void cancel();
}
复制代码

当发布者调用 subscribe()方法注册订阅者时,会通过订阅者的回调方法 onSubscribe()传入 Subscription 对象,之后订阅者就可以使用这个 Subscription 对象的 request()方法向发布者请求数据。


处理器(Processor)充当订阅者和发布者之间的转换器(Transformer)。Processor<T,R>订阅类型 T 的数据元素,接收并转换为类型 R 的数据,发布该数据。Processor 接口同时继承了 Publisher 和 Subscriber 接口,其定义如下所示:

public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
复制代码

上述四个接口是各个响应式开发库之间互相实现兼容的桥梁,响应式流规范也仅仅聚焦于此,而对诸如转换、合并、分组等的操作一概未做要求,因此是一个非常抽象且精简的接口规范。


作为总结,我们可以把响应式流规范核心接口的交互方式梳理成图。

可以看到,上图中所示的交互方式一共包含如下 7 个步骤。

  1. 当发布者使用 subscribe()方法实现对该发布者的订阅时,首先会创建一个具有相应逻辑的 Subscription 对象,这个 Subscription 对象定义了如何处理请求,以及如何发出数据。

  2. 然后发布者将这个 Subscription 通过订阅者的 onSubscribe()方法传给订阅者。

  3. 在订阅者的 onSubscribe()方法中,需要通过 Subscription 的 request()方法发起第一次请求。

  4. Subscription 收到请求,就可以通过回调订阅者的 onNext()方法发出元素,有多少发多少,但不能超过请求的个数。

  5. 订阅者在 onNext()方法中通常定义对元素的处理逻辑,处理完成之后,可以继续发起请求。

  6. 发布者根据需要继续满足订阅者的请求。

  7. 如果发布者的元素序列正常结束,就通过订阅者的 onComplete()方法予以告知。如果序列发送过程中有错误,则通过订阅者的 onError()方法予以告知并传递错误提示。这两种情况都会导致序列终止,订阅过程结束。

2. Project Reactor

Spring 5 引入了响应式编程机制,并默认集成了 Project Reactor(下文简称为 Reactor)作为该机制的实现框架。Reactor 诞生较晚,可以认为它是第二代响应式开发框架。所以它是一款完全基于响应式流规范设计和实现的工具库,在使用上直观易懂。


在 Reactor 框架中,数据流的表现形式如图所示:

上图中的数据流模型从语义上可以用如下公式表示:

onNext x 0..N [onError | onComplete]


上述公式包含了如下三种不同类型的方法调用,分别处理不同场景下的消息通知。

  • onNext():正常包含元素的消息通知。

  • onComplete():序列结束的消息通知。

  • onError():序列出错的消息通知,可以没有。


按照响应式流规范,当这些消息通知产生时,订阅者中对应的 onNext()、onComplete()和 onError()这三个方法将被调用。如果序列没有出错,则 onError()方法不会被调用;而如果不调用 onComplete()方法,我们就会得到一个无限异步序列。通常,无限异步序列应该只用于测试等特殊场景。


针对数据流,Reactor 提供了两个核心组件,即 Flux 和 Mono。其中 Flux 代表包含 0 到 n 个元素的异步序列,而 Mono 则表示包含 0 个或 1 个元素的异步序列。


创建 Flux 的方式非常多,这些方式可以分成两大类,一类是充分利用 Flux 的静态方法,另一类则是动态创建 Flux。这里的静态方法常见的包括 just()、fromArray()、fromIterable()、fromStream()、empty()、error()、never()、range()、interval()等,而动态方法则包括 generate()和 create()。


创建 Mono 的方式也类似。另外,和其他主流的响应式编程框架一样,Reactor 框架的设计目标也是为了简化响应式流的使用方法。为此,Reactor 框架为我们提供了大量操作符用于操作 Flux 和 Mono 对象。常见的包括用于数据转换的 flatMap、用于数据过滤的 filter、用于操作组合的 zipWith、用于条件控制的 defaultIfEmpty,以及 subscribe 和 log 等工具操作符。


由于本章的重点是介绍 Spring WebFlux,而 Project Reactor 是 WebFlux 的底层框架,我们一般不会直接使用该框架开发 Web 应用程序。

响应式编程的应用场景分析

在介绍完响应式流程规范以及开发工具 Project Reactor 之后,你可能会问,响应式编程到底有什么用?现实中哪些场景可以用得上响应式编程?这是一个好问题,本小节将基于一些具体的应用场景来探讨这一话题。

1. 响应式编程的应用场景

本质上,我们可以认为响应式编程并不仅仅是一种编程技术,而且是一种架构设计的系统方法,因此适用于诸多场景。它既可以用于简单的 Web 应用系统,也可以用于大型企业的解决方案。当然,基于响应式数据流,我们也完全可以利用它构建流式系统或大数据系统。


数据流处理是响应式编程的一大应用场景。流式系统的主要特点是低延迟和高吞吐量。对于这类系统,大多数数据是从服务器端传出的,因此客户端扮演消费者的角色。


这个时候,通过使用非阻塞式通信可以确保资源得到高效的利用,从而实现低延迟和高吞吐量。流式系统的表现形式也可以有很多,日常的日志埋点和分析、服务运行时的状态采集等都属于这种类型。对高并发流量的处理,通常涉及大量的 I/O 操作。


相较于传统的同步阻塞式 I/O 模型,响应式编程所具备的异步非阻塞式 I/O 模型非常适合应对高并发流量的业务场景。这类场景中比较典型的一种表现形式就是微服务架构中的 API 网关,因为网关的作用就是响应来自前端系统的流量并将其转发到后端服务。


讲到微服务架构,如何构建一个具有异步非阻塞式请求处理流程的 Web 服务也是开发人员的核心诉求,我们需要高效处理跨服务之间的网络请求。针对这种场景,响应式编程及其相关技术(如本章要介绍的 RSocket 协议)同样也是一种非常有效的解决方案。

2. 响应式编程在开源框架中的应用

响应式编程在系统开发过程中逐渐得到广泛的应用。结合上文所分析的三种典型应用场景,这里我们以对应的 Netflix Hystrix、Spring CloudGateway 以及 Spring WebFlux 这三款主流的开源框架为例,解析这些框架背后的响应式编程技术。


(1)Netflix Hystrix 中的滑动窗口和响应式编程

我们已经在第 1 章介绍 Spring 家庭生态时提到过 Spring Cloud 微服务开发框架。


在 Spring Cloud 微服务开发框架中,存在一个 Spring Cloud NetflixHystrix 组件,该组件基于 Netflix Hystrix 实现了服务熔断功能。NetflixHystrix 是 Netflix 开源的一款容错库,使用了 HystrixCircuitBreaker 类来实现熔断器。


该类通过一个 circuitOpen 状态位控制着整个熔断判断流程,而这个状态位本身的状态值则取决于系统目前的执行数据和健康指标。那么,HystrixCircuitBreaker 如何动态获取系统运行时的各项数据呢?这里就使用到了一个 HealthCountsStream 类,从命名上不难看出,这就是一种数据流。


HealthCountsStream 在设计上采用了一种特定的机制,即滑动窗口(RollingWindow)机制,核心代码如下所示:

this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>()
{
@Override
public Observable<Bucket> call() {
return inputEventStream.observe()
// 使用window操作符收集一个Bucket时间内的数据
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
// 将每个window内聚集起来的事件集合汇总成Bucket
.flatMap(reduceBucketToSummary).startWith(emptyEventCountsToStart);
}
});
复制代码

在技术选型上,Hystrix 采用了基于响应式编程思想的 RxJava。与其他响应式编程框架一样,RxJava 同样实现了前面介绍的响应式流规范。


使用 RxJava 的一大好处是可以通过 RxJava 的一系列操作符来实现滑动窗口,包括上述代码所展示的 window、flatMap 和 reduce 等。其中 window 操作符把当前流中的元素收集到另外的流序列;flatMap 操作符把流中的每个元素转换成一个流,再把转换之后得到的所有流中的元素进行合并;而 reduce 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的流。


(2)Spring Cloud Gateway 中的过滤器和响应式编程

Spring Cloud Gateway 是 Spring Cloud 微服务开发框架中的另一个核心组件,是 Spring 官方自己开发的一款 API 网关。在技术体系上,Spring CloudGateway 基于最新的 Spring 5 和 Spring Boot 2 以及用于响应式编程的 ProjectReactor 框架,提供响应式、非阻塞式 I/O 模型。和其他 API 网关系统类似,Spring Cloud Gateway 中的核心组件也是过滤器。


过滤器用于在响应 HTTP 请求之前或之后修改请求本身及对应的响应结果。Spring Cloud Gateway 提供了一个全局过滤器(GlobalFilter)的概念,对所有路由都生效。我们来演示一下如何使用全局过滤器来对所有 HTTP 请求进行拦截,具体做法是实现 GlobalFilter 接口,示例代码如下所示:

@Configuration
public class JWTAuthFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange,
GatewayFilterChain chain) {
ServerHttpRequest.Builder builder =
exchange.getRequest().mutate();
builder.header("Authorization","Token");
chain.filter(exchange.mutate().request(builder.build()).build());
return
chain.filter(exchange.mutate().request(builder.build()).build());
}
}
复制代码

以上代码展示了如何利用全局过滤器在所有的请求中添加 Header。在这个示例中,我们对所有经过 API 网关的 HTTP 请求添加了一个消息头,用来设置与访问 Token 相关的安全认证信息。这里 filter()方法的返回值是 Mono<Void>,代表它使用了响应式编程技术。


(3)Spring WebFlux 中的请求处理流程和响应式编程

Spring WebFlux 是 Spring 5 引入的全新的响应式 Web 服务开发框架,我们在后面中将对其详细讲解。现在只需要知道,在 WebFlux 中,和 WebMVC 类似,对 HTTP 请求的处理过程涉及 HandlerMapping、HandlerAdapter、HandlerResultHandler 类之间的交互,其核心的 handle()方法的定义如下所示,该方法实现了流式处理请求机制。

public Mono<Void> handle(ServerWebExchange exchange) {
...
return Flux.fromIterable(this.handlerMappings)
//从handlerMapping这个map中获取HandlerMapping
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
//如果没有找到HandlerMapping,则抛出异常
.switchIfEmpty(createNotFoundError())
//触发HandlerAdapter的handle方法
.flatMap(handler -> invokeHandler(exchange, handler))
//触发HandlerResultHandler的handleResult方法
.flatMap(result -> handleResult(exchange, result));
}
复制代码

在这个核心方法中,我们也看到了 Project Reactor 框架所通过的 concatMap、switchIfEmpty 和 flatMap 等响应式操作符,其中的 flatMap 操作符我们在前面已经讨论过。

Spring 响应式编程组件

基于 Project Reactor 框架,Spring 为我们提供了全面的响应式编程技术体系。对 Web 服务,我们可以使用响应式 Web 框架 WebFlux;而对 NoSQL 响应式数据访问,我们则可以使用 Spring Data Reactive 框架。本节将对 Spring 中的这些响应式编程组件简要讲解。

1. Web 服务和 Spring WebFlux

在 Spring WebMVC 的基础上,我们将引入全新的 Spring WebFlux 框架。


WebFlux 框架名称中的 Flux 一词就来源于 Project Reactor 框架中的 Flux 组件。WebFlux 功能非常强大,不仅仅包含了对创建和访问响应式 HTTP 端点的支持,还可以实现服务器推送事件以及 WebSocket。我们不对该框架的所有功能做全面介绍,对于应用程序而言,开发人员的主要工作是基于 HTTP 开发响应式服务,这也是本章的一大重点。


要想使用 WebFlux,我们需要引入 spring-boot-starter-webflux 依赖包。下图展示了 spring-boot-starter-webflux 2.5.3 版本的依赖组件,可以看到该版本在 spring-boot-starter 2.5.3 版本的基础上依赖于 springwebflux 5.3.9. RELEASE 版本,而后者同样依赖于 spring-web 5.3.9.RELEASE 版本以及 3.4.8. RELEASE 版本的 reactor-core 组件。

Spring WebFlux 提供了完整的支持响应式开发的服务端技术栈。和 Spring WebMVC 相比,Spring WebFlux 既支持基于 @Controller、@RequestMapping 等注解的传统开发模式,又支持基于 Router Functions 的函数式开发模式。


关于框架背后的实现原理,传统的 Spring MVC 构建在 Java EE 的 Servlet 标准之上,该标准本身就是阻塞式和同步的。最新版本的 Servlet 虽然也添加了异步支持,但是在等待请求的过程中,仍然在线程池中保持着线程。而 Spring WebFlux 则是构建在响应式流以及它的实现框架 Reactor 基础之上的一个开发框架,因此可以基于 HTTP 实现异步非阻塞的 Web 服务。

2. 数据访问和 Spring Data Reactive

我们知道 Spring Data 是 Spring 家族中针对数据访问而开发的一个框架,对各种数据存储媒介抽象了一批 Repository 接口以简化开发过程。


而在 Spring Data 的基础上,Spring 5 也全面提供了一组响应式数据访问模型。在新一代 Spring 框架中,我们可以把 Spring Data 划分为两大类型,一类是支持 JDBC、JPA 和部分 NoSQL 的传统 Spring Data Repository,而另一类则是支持 Mongo、Cassandra、Redis、Couchbase、R2DBC 等的响应式 Spring DataReactive Repository。

用户头像

加VX:bjmsb02 凭截图即可获取 2020-06-14 加入

公众号:程序员高级码农

评论

发布
暂无评论
SpringWeb服务应用响应式Web开发组件:响应式编程和SpringBoot_互联网架构师小马_InfoQ写作社区