写点什么

Sentinel 适配 Reactor+WebFlux 框架的实现原理

  • 2023-06-15
    湖南
  • 本文字数:8580 字

    阅读完需:约 28 分钟

在 Sentinel 已经支持异步调用链的前提下,只需要解决 asyncContext 的传递问题,Sentinel 即可适配各种异步框架/响应式编程框架。


响应式编程框架 Reactor 本身就支持 Context 机制,Sentinel 适配 Reactor 框架正是利用了 Reactor 框架的 Context 机制传递 asyncContext,因此,要理解 Sentinel 适配 Reactor 框架的原理就要先理解 Reactor 框架的 Context 传递机制,而要理解 Reactor 框架的 Context 传递机制就要理解 Reactor 框架的工作原理。

Reactive Streams 规范

Reactor 是一个完全实现 Reactive Streams 规范的响应式编程框架,Reactive Streams 定义了响应式编程的规范。

  • 发布-订阅:由订阅者订阅发布者,触发发布者生产数据,再由发布者通过发布数据传递给订阅者消费的过程。

  • 响应式流:一个原始数据经过多重操作后最终被订阅者消费,每一步操作都是一次数据的发布-订阅事件,这些发布-订阅事件按顺序组合到一起就构成了响应式流。


Reactive Streams 规范只定义了如下 4 个接口:

  • Publisher:发布者。

  • Subscriber:订阅者。

  • Subscription:用于连接发布者和订阅者。

  • Processor:处理器,既是订阅者也是发布者。


Publisher 的定义如下:

Publisher 只定义了一个 subscribe 方法,subscribe 方法相当于 Java8 Stream 的终止操作。在没有调用 Publisher 的 subscribe 方法之前,一切操作都只是我们定义的执行计划。执行计划制定了整个流的执行过程,当 Publisher 的 subscribe 方法被调用,并且传入一个 Subscriber 时,整个响应式流才能形成闭环,流才会开始工作。


Subscriber 的定义如下:

Reactive Streams 规范定义了 Subscriber 可以订阅如下 4 种事件:

  • onSubscribe:由 Publisher 调用,被此 Subscriber 订阅的 Publisher 在执行 subscribe 方法时回调。

  • onNext:通常由 Subscription 调用,在 Subscription#request 方法请求到数据时被调用。

  • onError:通常在 onNext 方法中调用,在消费数据发生异常时被调用。

  • onComplete:通常由 Subscription 调用,在数据全部被正常消费完成时被调用。


Subscription 的定义如下:

Subscription 相当于一个场景类。

  • request:Subscriber 调用此方法请求指定数量的数据,在 Subscription 收到 Publisher 发出的数据时,调用 Subscriber 的 onNext 方法将数据传递给 Subscriber 消费,此方法通常在 Subscriber 的 onSubscribe 方法中被调用。

  • cancel:通常由 Subscriber 调用此方法来取消订阅,此方法被调用后,request 方法就不再产生数据、不再触发 Subscriber 的 onNext 方法。


Processor 的定义如下:

Processor 比较特殊,实现 Processor 的类既是 Publisher 也是 Subscriber。

Reactor 发布-订阅流程

一个由发布者发布数据、由订阅者订阅并打印输出的简单案例如下,我们将一步步分析此案例的执行流程,以深入理解 Reactor 框架的发布-订阅流程。

Mono 是一个抽象类,它实现了 Reactive Streams 规范的 Publisher,并扩展了发布者的操作以提供流式编程,代码如下:

Mono 提供了一系列用于创建 Mono 的静态方法,使用户不需要记住 Mono 都有哪些实现类。本例中使用的 just 静态方法就是其中之一,其源码如下:

其中,onAssembly 方法对我们分析流程影响不大,忽略 onAssembly 方法之后的 just 静态方法的代码如下:

提示:在不了解 Reactor 框架的情况下,不建议读者在初次阅读源码时太纠结细节,这也是学习方法,先掌握主干,再关心细枝末节,所以这里先忽略 onAssembly 方法。


just 静态方法返回的 Mono 实例的类型为 MonoJust。本着阅读源码先掌握主干的宗旨,我们去掉 MonoJust 实现的其他接口,只关心 MonoJust 继承 Mono 抽象类实现的抽象方法。经过剪枝后的 MonoJust 类的源码如下:

MonoJust 类的 subscribe 方法的参数类型是 CoreSubscriber,而对于 Reactive Streams 规范的 Publisher,其 subscribe 方法的参数类型是 Subscriber,这说明父类 Mono 已经实现了 Publisher 的 subscribe 方法,因此我们先看父类 Mono 的 subscribe 方法,其源码如下:

先调用 Operators#toCoreSubscriber 方法,将 Subscriber 转换为 CoreSubscriber,再调用子类实现的 subscribe 重载方法。


Mono 抽象类提供了很多个 subscribe 方法的重载,无论使用哪个重载方法,最后都会调用 Mono 实现 Publisher 的 subscribe 方法,自然也会调用 Mono 子类实现的 subscribe 方法。


在本案例中,我们调用 MonoJust 实例的 subscribe 方法传入的是一个 lambda 表达式。该 lambda 表达式实现的是 Consumer 的 accept 方法,相当于传入了一个 Consumer 实例。该 Consumer 实例最终会被包装成一个 Subscriber,成为最终订阅数据的订阅者,其源码如下:

  1. 将 Consumer 包装成一个类型为 LambdaMonoSubscriber 的订阅者。

  2. 调用父类 Mono 实现 Publisher 的 subscribe 方法。


接下来分析 Mono 是如何将数据传递给订阅者的,要理解这个过程需要分析 MonoJust 类的 subscribe 方法。


MonoJust 类的源码如下:

先调用 Operators#scalarSubscription 方法,将 CoreSubscriber 和数据封装成一个 Subscription 实例,再调用 CoreSubscriber 实例的 onSubscribe 方法。


在 Reactive Streams 规范中,Subscriber 的 onSubscribe 方法要求传入的是一个 Subscription 实例,Reactor 将 Subscriber 和数据封装到一个 Subscription 实例中,这样 Subscription 实例就能将数据传递给 Subscriber 消费。


如果结合本例分析,则 MonoJust 实例的 subscribe 方法的 actual 参数就是一个 LambdaMonoSubscriber。MonoJust 实例在 subscribe 方法中调用了这个 LambdaMonoSubscriber 实例的 onSubscribe 方法。

LambdaMonoSubscriber 类的源码如下:

  • consumer 字段:数据消费者,在本案例中为打印输出语句的 lambda 表示式。

  • subscription 字段:订阅场景,也是自己的委托人。


LambdaMonoSubscriber 实例将会在 onSubscribe 方法中调用 Subscription 实例的 request 方法请求数据,由于 Subscription 实例持有 LambdaMonoSubscriber 实例的引用,因此 Subscription 实例能够在 request 方法中将请求到的数据交给 LambdaMonoSubscriber 实例处理。


在分析 Subscription 类的 request 方法之前,需要知道 Operators#scalarSubscription 方法返回的 Subscription 实例的类型是 ScalarSubscription,这是一个同步订阅场景下的 Subscription 实现类,其源码如下:

  • actual:真实订阅者。

  • value:发布者发布的数据。


actual 与 value 都是在构造方法中传入的。在本案例中,actual 就是 LambdaMonoSubscriber 实例,value 就是调用 Mono#just 方法传入的数值 1。


当 LambdaMonoSubscriber 实例的 onSubscribe 方法时被调用时,ScalarSubscription 实例的 request 方法也会被调用。ScalarSubscription 实例会在 request 方法中调用 LambdaMonoSubscriber 实例的 onNext 方法,将 MonoJust 实例发布的数据 1 传递给 LambdaMonoSubscriber 实例,并且在 LambdaMonoSubscriber 实例的 onNext 方法执行结束之后,调用 LambdaMonoSubscriber 实例的 onComplete 方法。


LambdaMonoSubscriber 类的 onNext 方法源码如下:

onNext 方法中出现的 consumer 是传递的 lambda 表达式,负责将订阅的数据打印输出。


从调用 Mono#just 方法创建发布者,到传递的 lambda 表达式调用发布者的订阅方法订阅数据,整个发布-订阅流程如图所示:

使用时序图表示的发布-订阅流程如图所示:

对发布-订阅流程的总结如下:

  1. 调用 Mono#just 方法创建一个类型为 MonoJust 的发布者,并且参数传递的 value 是该发布者需要发布的数据。

  2. 调用 MonoJust 实例的 subscribe 方法传递一个数据消费者,该消费者会被包装成一个类型为 LambdaMonoSubscriber 的订阅者。

  3. 在 MonoJust 实例的 subscribe 方法中,调用 Operators#scalarSubscription 方法创建 ScalarSubscription 实例,并且调用 LambdaMonoSubscriber 实例的 onSubscribe 方法。

  4. LambdaMonoSubscriber 实例的 onSubscribe 方法调用 ScalarSubscription 实例的 request 方法请求数据。

  5. ScalarSubscription 实例的 request 方法调用 LambdaMonoSubscriber 实例的 onNext 方法,传递 MonoJust 实例发布的数据,在没有发生异常的情况下,在 onNext 方法执行完成后调用 onComplete 方法。

  6. LambdaMonoSubscriber 实例的 onNext 方法调用第 2 条传递的消费者消费数据。

Reactor 响应式流的构造原理

现在我们丰富一下案例,即当订阅到 MonoJust 传递的数据时,将数据转换为字符串,再将转换为字符串后的数据传递给订阅者,分析整个案例的发布-订阅流程,从而理解响应式流的构造原理。修改后的案例代码如下:

与 Mono#just 静态方法不同的是,Mono#map 方法是 Mono 提供的一系列由一个 Mono 实例转换为另一个 Mono 实例的成员方法。Mono#map 方法的源码如下:

Mono#map 方法返回的 Mono 实例类型为 MonoMap。MonoMap 的源码如下:

MonoMap 的父类是 MonoOperator。MonoOperator 类的源码如下:

一个 MonoOperator 实例也是一个发布者,是 source 的受托者。MonoOperator 的子类可以在 subscribe 方法中调用 source 的 subscribe 方法,从而将两个发布者串联起来。


实际上很多的操作都是通过 MonoOperator 实现的,通过委托模式将前一个发布者的订阅委托给它后面的发布者完成,这样就能构造出一个响应式流。


响应式流的发布-订阅流程如下:

  1. 顺序定义操作

与写代码的顺序是一致的,操作被顺序定义,前一个发布者委托给后一个发布者实现订阅。

在本案例中,我们先创建 MonoJust 实例,再创建 MonoMap 实例,并将 MonoJust 实例委托给了 MonoMap 实例。


  1. 倒序订阅

在定义完操作之后,首先调用 MonoMap 实例的 subscribe 方法,然后由 MonoMap 实例调用 MonoJust 实例的 subscribe 方法,因此订阅这一步是从后向前传递的,也就是倒序订阅。


  1. 顺序消费数据

由于数据在流中的传递是顺序的,因此消费数据也是顺序的。


在本案例中,订阅完成后会首先调用 MonoJust 实例的 onSubscribe 方法,所以订阅该 MonoJust 的 Subscriber 的 onNext 方法先被执行,即先执行 String::valueOf,然后调用 MonoMap 的 onSubscribe 方法,所以订阅该 MonoMap 的 Subscriber 的 onNext 方法后被执行,即后执行 System.out::printIn。

Reactor 的 Context 传递过程

多个操作(发布-订阅)组合成一个响应式流的执行流程为:顺序定义操作、倒序订阅、顺序消费数据。其中,顺序和倒序是相对响应式流的方向而言的。那么,如何让一个 Context 实例在流中传递呢?


答案是在倒序订阅时传递,由下游的订阅者将 Context 实例传递给上游的发布者。


在倒序订阅时,如果在响应式流中间的某个订阅者创建 Context 实例,并通过发布者的 subscribe 方法层层向上传递 Context 实例,则在顺序消费时,处在创建 Context 实例的订阅者上游的订阅者都可以获取这个 Context 实例,但处在这个订阅者下游的订阅者无法获取该 Context 实例,如图所示:

一个使用 Context 实例的简单案例如下:

  1. 调用 Mono#just 静态方法创建一个类型为 MonoJust 的发布者。

  2. 在订阅时,获取下游订阅者传递上来的 Context 实例,从 Context 实例中获取数据。

  3. 调用 Mono#map 方法将发布者转换为一个类型为 MonoMap 的发布者,该发布者负责把前一个发布者发布的整数取平方后得到的新整数重新发布出去。

  4. 在订阅时,获取下游订阅者传递上来的 Context 实例,并给 Context 实例写入数据。

  5. 调用 Mono#map 方法将发布者转换为一个类型为 MonoMap 的发布者,该发布者负责把前一个发布者发布的整数先加 1 再乘以 2 后得到的整数重新发布出去。

  6. 在订阅时,获取下游订阅者传递上来的 Context 实例,从 Context 实例中获取数据。

  7. 调用 Mono#subscribe 方法开始订阅。


在此案例中,第 2 条能够获取第 4 条写入 Context 实例的数据,而第 6 条无法获取第 4 条写入 Context 实例的数据,因此 Context 实例在流中通过订阅者由下游向上游传递。为什么在订阅时能够获取 Context 实例呢?


首先 Mono#subscriberContext 方法返回的是一个 MonoSubscriberContext 实例,该方法的源码如下:

MonoOperator 与 FluxOperator 是 Reactor 框架实现响应式流的关键。无论是 MonoOperator 还是 FluxOperator,实现的都是基于委托模式将前一个发布者委托给后一个发布者。受托者可以在调用委托者的订阅方法之前加入一些操作,并且可以在订阅到数据时,先对数据进行处理,再将得到的结果重新发布出去。


MonoSubscriberContext 类的源码如下:

  1. 调用下游订阅者的 currentContext 方法获取下游订阅者的 Context 实例,如果不存在,则返回一个空的 Context 实例。

  2. 调用 doOnContext 参数的 apply 方法获取一个新的 Context 实例,如果向这个 Context 实例中写入一个 key-value,就会创建新的 Context 实例。

  3. 创建一个包装当前订阅者的 Subscription 实例,并且将新的 Context 实例传递给这个 Subscription 实例。


如果将 MonoSubscriberContext#subscribe 方法的最后一行代码看作递归调用的话,也就容易理解为什么上游发布者可以在 subscribe 方法中从当前订阅者处获取 Context 实例,并且可以从 Context 实例中获取当前订阅者写入的数据了。


ContextStartSubscriber 类的源码如下:

ContextStartSubscriber 类重写的 currentContext 方法是 CoreSubscriber 接口提供的获取 Context 实例的 API,其源码如下:

默认的 Mono 子类不会重写 currentContext 方法,但 ContextStartSubscriber 类重写了 currentContext 方法,并且返回的是构造函数收到的 Context 实例,这就实现了 Context 实例的传递。


Context#empty 方法创建的是一个没有任何数据的 Context 实例,类型为 Context0。


Context0 的源码如下:

Context0 类的 put 方法的源码如下:

可见,调用 Context0#put 方法会创建一个新的 Context 实例,类型为 Context1。


Context1 类的源码如下:

如果继续调用 Context1#put 方法给 Context 实例写入数据,则也会创建一个新的 Context 实例,类型为 Context2。Context2 类的源码如下:

如果继续调用 Context2#put 方法,则将创建类型为 Context3 的 Context 实例,以此类推。


put 方法每次都返回一个新的 Context 实例,目的是限制数据的可见范围,因为下游订阅者不应该也不能获取上游订阅者写入 Context 实例的数据。

Sentinel 适配 Reactor 框架的原理

在了解了什么是 Reactive Streams 规范,以及 Reactor 框架的发布-订阅流程、响应式流的构造原理、Context 实例的传递过程之后,我们对响应式编程也有了一定的了解,而有了这些铺垫就容易理解 Sentinel 如何适配 Reactor 框架了。


我们先学习一个新的 Reactor API:transform。Mono/Flux 类的 transform 方法允许将一个 Mono/Flux 实例转换为另一种类型的 Mono/Flux 实例,将订阅委托给新的 Mono/Flux 实例。


下面看一个简单的案例,代码如下:

在上述代码中,我们使用 transform 方法将 mono 变量转换为 MonoOperator 实例,由 MonoOperator 实例代理原来的 Mono 实例完成订阅操作。


现在重写 MonoOperator 类的 subscribe 方法,将参数传递的订阅者替换为我们自己实现的订阅者——ActualSubscriberDelegater,修改后的代码如下:

其中,ActualSubscriberDelegater 类继承了 BaseSubscriber 类。BaseSubscriber 类与 MonoOperator 类都实现了委托模式。MonoOperator 是提供定义操作时使用的,在响应式流中,上游发布者委托下游发布者实现订阅;而 BaseSubscriber 是提供订阅操作时使用的,在响应式流中,下游订阅者委托上游订阅者订阅数据。


ActualSubscriberDelegater 类的实现代码如下:

ActualSubscriberDelegater 类重写了 BaseSubscriber 类的所有 hook 方法:

  • hookOnSubscribe:该方法在父类的 onSubscribe 方法被调用时调用。

  • hookOnNext:该方法在父类的 onNext 方法被调用时调用。

  • hookOnComplete:该方法在父类的 onComplete 方法被调用时调用。

  • hookOnError:该方法在父类的 onError 方法被调用时调用。


transform API 与 BaseSubscriber 结合使用可以实现调试、打印日记等操作,因此 Sentinel 可以通过 transform API 与 BaseSubscriber 结合使用来适配 Reactor 框架。


源码在 sentinel-adapter 模块下的 sentinel-reactor-adapter 模块中,我们先来了解如何使用,再分析源码。


一个简单的使用案例如下:

  • doBusiness:模拟业务方法。

  • buildSentinelTransformer:创建发布者转换器。SentinelReactorTransformer 用来实现将一

  • 个发布者委托给另一个发布者完成订阅。

  • testSentinelReactor:单元测试方法,先调用业务方法获取封装了一系列业务操作的 Mono 实例,再使用 transform API 将封装业务操作的 Mono 实例委托给 Sentinel 完成订阅。


SentinelReactorTransformer 是一个 JavaFunction 接口的实现类,其 apply 方法在 Mono#transform 方法中被调用。


SentinelReactorTransformer 类的源码如下:

  1. 如果发布者类型为 Mono,则将发布者委托给一个 MonoSentinelOperator。

  2. 如果发布者类型为 Flux,则将发布者委托给一个 FluxSentinelOperator。


为了容易理解,这里我们不讨论 Flux 及 FluxSentinelOperator。


MonoSentinelOperator 类的源码如下:

MonoSentinelOperator 类实现了 MonoOperator 类的 subscribe 方法,在调用上游发布者的 subscribe 方法时,传递的订阅者类型为 SentinelReactorSubscriber。


SentinelReactorSubscriber 类实现了在 hookOnSubscribe 方法中调用 SphU#asyncEntry 方法、ContextUtil#enter 方法和 ContextUtil#exit 方法,在 hookOnComplete 方法和 hookOnNext 方法执行完成后调用 Entry#exit 方法,在 hookOnError 方法中调用 Tracer#traceContext 方法和 Entry#exit 方法。


SentinelReactorSubscriber 类还重写了父类 CoreSubscriber 的 currentContext 方法,在调用 hookOnSubscribe 方法时,通过 Reactor 的 Context 机制向上游订阅者传递 asyncContext。


SentinelReactorSubscriber 类的 hookOnSubscribe 方法的源码如下:

在 hookOnSubscribe 方法中调用了 ContextUtil#enter 方法、SphU#asyncEntry 方法和 ContextUtil#exit 方法。


在上述源码中,较难理解的是 doWithContextOrCurrent 方法:如果被保护的资源不是异步操作,则直接执行 entryWhenSubscribed 方法;如果是异步操作,则需要先替换 ContextUtil 类的 ThreadLocal 存储的 Context 实例为下游订阅者传递上来的 asyncContext,再执行 entryWhenSubscribed 方法,并且在 entryWhenSubscribed 方法执行完成后,还要将 ContextUtil 类的 ThreadLocal 存储的 Context 实例还原为替换之前的,这些操作都是为了能够构造正确的调用链与调用树。


hookOnNext 方法和 hookOnComplete 方法的源码如下:

在 hookOnNext 方法或 hookOnComplete 方法中调用 tryCompleteEntry 方法,在 tryCompleteEntry 方法中调用 Entry#exit 方法。


hookOnError 方法的源码如下:

先调用 Tracer#traceContext 方法完成异常指标数据统计,再调用 Entry#exit 方法。


MonoSentinelOperator 类重写父类 CoreSubscriber 的 currentContext 方法,源码如下:

currentContext 方法实现从委托者(订阅者)处获取 Reactor 框架的 Context 实例,并且判断是否需要传递 Sentinel 的 asyncContext,有如下两种情况:

  1. 如果被保护的资源为同步操作,则 currentEntry 字段为空,不需要向下游订阅者传递 asyncContext。

  2. 如果被保护的资源为异步操作,则 currentEntry 字段不为空,需要向上游订阅者传递 asyncContext。


与同步操作不同,异步操作会先调用 MonoSentinelOperator 的 onSubscribe 方法,再调用上游发布者的 subscribe 方法,所以 asyncContext 通过 Reactor 框架的 Context 实例传递给上游订阅者。


flatMap 方法用于定义异步转换操作,因此以 flatMap 方法为例讲解异步操作。MonoFlatMap 类的源码如下:

对于同步操作流,上游订阅者的 onSubscribe 方法优先于下游订阅者的 onSubscribe 方法被调用,Context 实例由下向上传递;但对于异步操作流,下游订阅者的 onSubscribe 方法优先于上游订阅者的 onSubscribe 方法被调用,Context 实例由上向下传递。


可是,如果由上游订阅者给下游订阅者传递 asyncContext,那么异步调用链的方向不就反过来了吗?


例如,调用链 step1→step2,其代码如下:

flatMap:定义异步转换操作,调用该方法返回的发布者类型为 MonoFlatMap。


以 step1 为例,当流被订阅时,flatMap 方法返回的 MonoFlatMap 实例先调用下游订阅者的 onSubscribe 方法,再调用上游发布者的 subscribe 方法。


将上述代码翻译为调用链即可得到如下代码:

这样就很容易看出 step1 操作在 step2 操作的后面。


如果将 step1 和 step2 封装为资源,由于 step1 和 step2 都是异步操作,因此先调用 step2 的 MonoFlatMap 实例的 onSubscribe 方法(Sentinel 在此创建 asyncContext),再调用 step1 的 MonoFlatMap 实例的 onSubscribe 方法(Sentinel 在此获取 asyncContext)。


单元测试代码如下:

调试此单元测试代码得到的异步调用链如图所示:

从上图中可以得出结论:异步调用链的方向与代码定义的操作顺序相反。

适配 WebFlux 框架的实现原理

Spring WebFlux 是 Spring Framework 5.0 中引入的新的响应式 Web 框架,其基于响应式编程框架 Reactor 提供 Web 服务。


如果在实现一个项目的消息推送服务时,使用 WebFlux 开发推送接口,则虽然消息推送服务的吞吐量提升了,但是代码复杂度也随之提升,Debug 难度大。


笔者认为,响应式编程不适用于开发业务应用,特别是业务复杂的应用。除调试难、代码复杂度提高、代码量增加外,高度 API 侵入也会导致业务代码难以复用。


随着 DDD 领域驱动设计的流行,越来越多的项目开始采用 DDD 重新架构设计业务,并采用 DDD 重构业务系统,因此随着业务的发展,业务边界依然能保持清晰,便于微服务的拆分和重组。为了使领域架构层便于拆分和重组,建议领域架构层不使用响应式编程框架这种高度 API 侵入的框架。


响应式编程更适合开发中间件或与业务无关的系统,如 API Gateway 就使用响应式编程让网关拥有了更高的吞吐量。


Sentinel 适配 WebFlux 框架实际上就是适配 Reactor 框架,只是借助 WebFlux 框架的过滤器,将类型为 Mono/Flux 的发布者转换为类型为 MonoSentinelOperator/FluxSentinelOperator 的发布者。


SentinelWebFluxFilter 的源码如下:

  1. 从请求中获取路径作为资源名称。

  2. 获取调用来源。

  3. 创建 SentinelReactorTransformer 转换器。

  4. 使用 Reactor 框架的 transform API,将类型为 Mono/Flux 的发布者转换为 Sentinel 实现的类型为 MonoSentinelOperator/FluxSentinelOperator 的发布者。


用户头像

加VX:bjmsb02 凭截图即可获取 2020-06-14 加入

公众号:程序员高级码农

评论

发布
暂无评论
Sentinel适配Reactor+WebFlux框架的实现原理_Java_互联网架构师小马_InfoQ写作社区