写点什么

Reactive 响应式编程系列:解密 Lettuce 如何实现响应式

作者:大步流星
  • 2023-04-22
    浙江
  • 本文字数:14945 字

    阅读完需:约 49 分钟

        响应式编程历史悠久,早在 2005 年,Microsoft Cloud Programmability Team 开始探索一种简单的编程模型,以构建大规模的异步和数据密集型互联网服务架构,响应式编程的理念逐步诞生。这本是为了解决服务端系统而提出的理念,最后也逐步也应用到客户端等其他领域,而在 Java 服务端领域,最著名的两大响应式库就是 RxJava (GitHub - ReactiveX/RxJava: RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.) 和 Project Reactor (GitHub - reactor/reactor-core: Non-Blocking Reactive Foundation for the JVM)。本文的重点不是探究响应式编程的内部实现,而是剖析 Lettuce 如何实现响应式范式。        我们知道,作为三个最流行 Redis Java 客户端 Jedis、Lettuce 和 Redisson,只有 Jedis 没有实现了响应式接口(Reactive)。我们今天的主角是:Lettuce (GitHub - lettuce-io/lettuce-core: Advanced Java Redis client for thread-safe sync, async, and reactive usage. Supports Cluster, Sentinel, Pipelining, and codecs.)。我们先来看看 Lettuce 的常规使用方式(或者说命令式编程的使用方式),为了简单起见,我们直接使用 Redis 单实例来演示(生产上当然都是 Redis Cluster 或者 Redis Sentinel 部署模式):

RedisClient client = RedisClient.create("redis://localhost");StatefulRedisConnection<String, String> connection = client.connect();RedisStringCommands command = connection.sync();System.out.println(command.get("key")); // 输出key的值
复制代码

​很显然第 4 行的 “command.get("key")” 是阻塞的,意味着当前线程必须等着拿到结果才能进行下一步,期间什么事情都做不了。虽然通常来说 Redis 操作都足够快,但我们需要考虑特殊情况,比如大 Key 或者网络不佳等场景,一旦慢查出现,当前系统的业务处理线程都将阻塞在这一步,整个系统无法响应新的请求,Reactive 响应式编程就可以改善这一局面,更具体的说,响应式编程是为了解决非 CPU 密集型(例如数据或 IO 密集型)系统资源消耗问题,它带来的一个好处在于系统能根据业务实际情况来调整资源消耗,并且让系统应对故障时能更具弹性,而它的核心就是“异步事件驱动” + “背压背压一改往常订阅者只能被迫“投喂”的尴尬局面,让订阅者也能控制发布者发布元素的顺序。

        像其他事物一样,响应式编程以及库类从历经坎坷到蓬勃发展好几年之后,Reactive Streams 组织(包括 Pivotal、Netflix、Typesafe 和 Lightbend 于 2013 年成立,旨在为异步流处理提供一个通用的 API 规范)在 2014 年发布了第一个 API 规范(Java 的可以看GitHub - reactive-streams/reactive-streams-jvm: Reactive Streams Specification for the JVM),当你把这个 reactive-streams-jvm clone 下来后关注下 api 模块,因为里面就 4 接口加 1 个类:

之前对响应式模式有过了解的同学肯定觉得它和 观察者模式 很像,但看了上面的接口,会觉得它更像 发布订阅模式,让我们从 ChatGPT 的描述中再来回顾下这两种模式吧:

观察者模式(Observer Pattern)是一种常见的设计模式,其核心思想是:当一个对象的状态发生改变时,其他依赖于它的对象都会收到通知并自动更新。在观察者模式中,存在一个被观察的对象(Subject),它维护一个列表,用来保存所有观察它的对象(Observer)。当 Subject 的状态发生改变时,它会自动通知所有的 Observer 对象,让它们可以及时更新自己的状态。

发布订阅模式(Publish-Subscribe Pattern)也是一种消息传递模式,其核心思想是:在多个对象之间,有一个消息中心(Message Broker)来协调对象之间的通信。在发布订阅模式中,发布者(Publisher)不直接发送消息给订阅者(Subscriber),而是通过消息中心来传递消息。订阅者可以向消息中心注册自己感兴趣的消息类型,并在消息中心有消息发布时,自动收到通知。

目前,我们可以把响应式模式看做本地化的发布订阅模式,但目前主流的响应式库类实现的功能远多于传统的发布订阅框架,例如,其中 “背压” 就是核心中的核心。在响应流 API 规范中,我们看到了四个接口,即 发布者(Publisher)订阅者(Subscriber)订阅关系(Subscription)处理者(Processor),处理者就是就是发布者和订阅者的结合体。我们来认真看下这四个接口,便于我们进一步了解响应式的秘密:

public interface Publisher<T> {    public void subscribe(Subscriber<? super T> s);}
public interface Subscriber<T> { public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();}
public interface Subscription { public void request(long n);
public void cancel();}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
复制代码
  • 发布者接口 Publisher 很简单,就一个订阅方法,传入一个订阅者即可,用于建立订阅关系。

  • 订阅者接口 Subscriber 稍微复杂点,有四个方法,第一个 onSubscribe 方法在订阅成功后会被调用,入参是一个订阅关系对象,意味着订阅者能通过订阅关系来主动和发布者通信,记住,这是“背压”机制的关键。第二个方法是 onNext,即发布者通过该方法给订阅者“发射”一个元素或事件,而第三个 onError 方法,是指发布者发射元素有错误时,会通过该方法让订阅者感知。最后的 onComplete 方法在所有元素都发射完成后被调用。

  • 订阅关系接口 Subscription 中只有两个方法,即 reqeust 方法,用于订阅者像发布者请求一定数量的元素,另一个是 cancel 方法,用于取消订阅关系。而这个 request 就是“背压”的关键,毕竟订阅者的消费能力只有自己知道。

  • 处理者接口 Processor 就不多说了,所见即所得。

当然,我们肯定也记得 2016 年发布的 Java9 中添加了 Flow 类来支持响应式流,可以看出 Java 官方也坐不住了,所以上图 reactive-streams-jvm 中后面加了个 java9 的目录,其中的 FlowAdapters 类就是用于 响应流 API 和 Java9 的 Flow 类互转。

        有人肯定会说,有了 Project Reactor (前面提到的主流的响应式库之一),想把 lettuce-core 从阻塞的命令式编程变成异步的响应式编程还不简单,通过对之前命令式编程代码进行如下改造即可:

RedisClient client = RedisClient.create("redis://localhost");StatefulRedisConnection<String, String> connection = client.connect();RedisStringCommands command = connection.sync();// System.out.println(command.get("key")); // 输出key的值myReactiveRedisGet(command, "key")    .subscribe(value -> System.out.println("查询Redis key的结果:" + value)); // 采用Reactive方式输出key的值
private static Mono<String> myReactiveRedisGet(RedisStringCommands<String, String> command, String key) { return Mono.just(command.get(key));}
复制代码

这里我们并没有直接使用阻塞的 command.get("key")来获取结果,而是新建了一个 myReactiveRedisGet 方法,它的返回值是 Mono<String>,Mono 是 project reactor 库(后面简称 reactor)中的核心类之一,用于发射 0 个或 1 个元素的响应式流类,是属于发布者角色,即它是一种 Publisher(另一种发射 1 个或多个元素的是 Flux,它也是一种 Publisher)。Mono.just 方法用于直接发射入参传进来的元素(即 command.get(key)),仔细看这行代码,我们会发现 Mong.just 调用前传入的入参就已经是阻塞的方式获取的结果了,这当然不符合 Reactive 的预期,所以我们改成了如下代码:

RedisClient client = RedisClient.create("redis://localhost");StatefulRedisConnection<String, String> connection = client.connect();RedisStringCommands command = connection.sync();// System.out.println(command.get("key")); // 输出key的值myReactiveRedisGet(command, "key")    .subscribe(value -> System.out.println("查询Redis key的结果:" + value)); // 采用Reactive方式输出key的值
private static Mono<String> myReactiveRedisGet(RedisStringCommands<String, String> command, String key) { return Mono.defer(() -> Mono.just(command.get(key)));}
复制代码

可以看到,我们在 Mono.just 外用 Mono.defer 包了一层,defer 的入参是一个返回 Mono 的 Supplier,defer 用来延迟创建 Mono,这也意味着我们会延迟获取结果(command.get(key)),这是响应式编程的另一个重要特征,即有订阅时才生产和发送数据,没订阅的时候啥事也不做,避免消耗,这很环保,千万别小看这一点。那难道用 Mono.defer 后就相当于 Redis 操作就变成 Reactive 了吗?运行上述代码,你会发现所有操作其实都是由一个线程来执行,这意味着一个线程(在有订阅者订阅时)完成了所有事情,这肯定是阻塞的,因为 command.get 就是妥妥的阻塞方法!!该线程只有等它返回,否则它做不了其他任何事情。幸运的是 reactor 库给我们提供了异步处理 “发布者发射元素” 和 “订阅者订阅和请求元素数量” 操作的方法,即 publishOn 和 subscribeOn 方法,它们的入参都是一个 Scheduler 类型的对象,而 Scheduler 在 reactor 中是专为运算符提供抽象异步边界的(简单的说就是放到另一个线程去执行)。由于我们的订阅只是一个简单的结果打印,是元素发射过程中才去阻塞访问的 Redis,所以我们希望在元素发射过程中异步,于是我们用 publishOn 方法,代码修改成如下:

RedisClient client = RedisClient.create("redis://localhost");StatefulRedisConnection<String, String> connection = client.connect();RedisStringCommands command = connection.sync();// System.out.println(command.get("key")); // 输出key的值myReactiveRedisGet(command, "key")    .subscribe(value -> System.out.println("查询Redis key的结果:" + value)); // 采用Reactive方式输出key的值
private static Mono<String> myReactiveRedisGet(RedisStringCommands<String, String> command, Supplier<String> keySupplier) { return Mono.defer(() -> Mono.just(command.get(key))).publishOn(Schedulers.parallel()); // 元素发射的异步执行}
复制代码

可以看到,我们在 myReactiveRedisGet 方法中在 Mono.defer 方法后加了 publishOn 方法,入参是 Schedulers.parallel(),它返回的是个计划线程池(ScheduledThreadPoolExecutor),这样执行后,你会发现 command.get 确实在另一个线程中执行了,而不是在当前线程。这样就大功告成啦?难道我们就这样通过几行代码完美的实现了 Lettuce 库访问 Redis 的响应式化?如果真是这样,那么我们就可以直接给 Jedis 提交一个 PR,宣告 Jedis 也支持响应式编程了!然而现实是残酷的,虽然我们针对 Lettuce 的命令式编程的代码通过响应式库 reactor 进行了多次调整,但仍然掩盖不了一个事实,就是我们调用的始终是阻塞方法 command.get,虽然我们掩耳盗铃的将其放在另一个线程异步来执行了,但这个执行它的一部线程始终也需要阻塞的等待应答结果,我们只是没让当前业务线程来等待而已(突然想起之前网上有人说的一句话,复杂度它不会消失,只会转移),一旦请求多起来,Schedulers.parallel()中的线程也会被耗尽成为新的瓶颈点!!我们只考虑了异步,但忽略了最重要一点,那就是事件驱动,你得让一个非阻塞的第三方来通知你操作已经完成了,而不是换一个线程去阻塞等待!!这里,我们可以得出一个结论:仅仅只靠 reactor 之类的响应式库加持,我们无法真正把一个命令式阻塞操作变成 Reactive

        让我们先冷静分析一下,回顾下 Lettuce 的命令式编程的 API,发现有 4 个关键动作:

  1. 创建 RedisClient 对象。

  2. 创建 StatefulRedisConnection 连接对象。

  3. 从连接对象获取 Command 命令对象。

  4. 从 Command 命令对象进行真正的 Redis 操作。

第 1、2 两个步骤不会每个 Redis 操作都执行,一般是初始化好后管理起来(例如 Spring 容器)重复使用,所以要想 Lettuce 支持 Reactive,只能需要在第 3、4 步进行扩展。铺垫了这么多,我们来看看 lettuce-core 中使用 Reactive 响应式编程的 demo:

RedisClient client = RedisClient.create("redis://localhost");StatefulRedisConnection<String, String> connection = client.connect();RedisStringReactiveCommands<String, String> command = connection.reactive();Mono<String> get = command.get("key");get.subscribe(System.out::println); // 控制台输出value
复制代码

可以看出,相比之前的命令式编程从 connection 获取的 command 就开始不同了,这是一个 ReactiveCommands 对象,然后用该对象来进行 Redis 操作,同时 command.get 也不是直接返回结果了,而是一个 Mono,最后我们订阅这个发布者(get),将结果值输出到控制台。

我们接下来将解密这背后的原理,Lettuce 的阻塞调用方式是从 StatefulRedisConnection 类型的连接中调用 sync 方法拿到一个 Command 对象,那么为了支持响应式编程,所以在接口 StatefulRedisConnection 中新增了 reactive 方法:

/** * Returns the {@link RedisCommands} API for the current connection. Does not create a new connection. * * @return the synchronous API for the underlying connection. */RedisCommands<K, V> sync();   
/** * Returns the {@link RedisReactiveCommands} API for the current connection. Does not create a new connection. * * @return the reactive API for the underlying connection. */RedisReactiveCommands<K, V> reactive();
复制代码

我们可以看到,阻塞 Command 是 RedisCommands 类型,而 Reactive 的 Commands 是 RedisReactiveComands 类型,不看 RedisReactiveComands 类基本都能猜到它和 RedisCommands 之间最大的区别就是它的返回值是一个结果的发布者类型(例如 Mono 或者 Flux)而不直接是结果类型,例如(由于 Redis 不同的数据结构操作不同,所以 RedisCommands 和 RedisReactiveComands 都会有对应每种数据结构更具体的接口类,例如针对字符串数据结构的 RedisStringCommands 和 RedisStringReactiveCommands):

public interface RedisStringCommands<K, V> {    /**     * Get the value of a key.     *     * @param key the key.     * @return V bulk-string-reply the value of {@code key}, or {@code null} when {@code key} does not exist.     */    V get(K key);
// 其他方法略}
public interface RedisStringReactiveCommands<K, V> { /** * Get the value of a key. * * @param key the key. * @return V bulk-string-reply the value of {@code key}, or {@code null} when {@code key} does not exist. */ Mono<V> get(K key);
// 其他方法略}
复制代码

​由于字符串的 get 操作只返回一个对象,所以 Reactive 操作用的是 Mono<V>而不是 Flux<V>。对于 Reactive 操作而言,抽象类 AbstractRedisReactiveCommands 负责对直接对各个具体数据类型的 Command 来进行实现(需要注意的是它并不是直接实现 RedisReactiveCommands 接口),我们还是拿 get 操作举例,AbstractRedisReactiveCommands 的实现如下:

public abstract class AbstractRedisReactiveCommands<K, V> implements RedisAclReactiveCommands<K, V>,        RedisHashReactiveCommands<K, V>, RedisKeyReactiveCommands<K, V>, RedisStringReactiveCommands<K, V>,        RedisListReactiveCommands<K, V>, RedisSetReactiveCommands<K, V>, RedisSortedSetReactiveCommands<K, V>,        RedisScriptingReactiveCommands<K, V>, RedisServerReactiveCommands<K, V>, RedisHLLReactiveCommands<K, V>,        BaseRedisReactiveCommands<K, V>, RedisTransactionalReactiveCommands<K, V>, RedisGeoReactiveCommands<K, V>,        RedisClusterReactiveCommands<K, V> {
@Override public Mono<V> get(K key) { return createMono(() -> commandBuilder.get(key)); }
public <T> Mono<T> createMono(Supplier<RedisCommand<K, V, T>> commandSupplier) {
if (tracingEnabled) {
return withTraceContext().flatMap(it -> Mono .from(new RedisPublisher<>(decorate(commandSupplier, it), connection, false, getScheduler().next()))); }
return Mono.from(new RedisPublisher<>(commandSupplier, connection, false, getScheduler().next())); }
// 其他方法略}
复制代码

​我们可以看到,它直接调用了 createMono 方法,而且 createMono 方法的入参是一个 RedisCommand 的 Supplier 对象,意味着它支持延迟创建 RedisCommand,遵循了我们之前说的响应式的特征之一:延迟创建(只有被真正订阅时才产生数据和开销)。需要注意的是,这里的 RedisCommand 和前面说的 RedisCommands、RedisReactiveCommands 是有区别的,RedisCommands、RedisReactiveCommands 用来封装各种数据类型的命令操作,而 RedisCommand 用于封装输出、参数和状态,用于命令执行结果的处理。我们接着看 createMono 的具体实现,抛开 if 语句的部分,它通过 RedisPublisher 来构建了一个 Mono,采用的是 Mono.from,该方法会将一个响应式规范 API 中的发布者(Publisher 接口的实现类)转换成 reactor 库的 Mono 来返回,而且你可以发现,createMono 方法几乎用所有资源都用来构建 RedisPublisher 对象,包括 Command 的 Supplier、连接、异步执行的线程等。

        既然 RedisPublisher 是一个标准的 Publisher,那么它肯定会实现 subscribe 方法,我们来一探究竟:

class RedisPublisher<K, V, T> implements Publisher<T> {
private final Supplier<? extends RedisCommand<K, V, T>> commandSupplier;
private final AtomicReference<RedisCommand<K, V, T>> ref;
private final StatefulConnection<K, V> connection;
private final boolean dissolve;
private final Executor executor;
public RedisPublisher(RedisCommand<K, V, T> staticCommand, StatefulConnection<K, V> connection, boolean dissolve, Executor publishOn) { this(() -> staticCommand, connection, dissolve, publishOn); }
public RedisPublisher(Supplier<RedisCommand<K, V, T>> commandSupplier, StatefulConnection<K, V> connection, boolean dissolve, Executor publishOn) {
LettuceAssert.notNull(commandSupplier, "CommandSupplier must not be null"); LettuceAssert.notNull(connection, "StatefulConnection must not be null"); LettuceAssert.notNull(publishOn, "Executor must not be null");
this.commandSupplier = commandSupplier; this.connection = connection; this.dissolve = dissolve; this.executor = publishOn; this.ref = new AtomicReference<>(commandSupplier.get()); }
@Override public void subscribe(Subscriber<? super T> subscriber) {
if (this.traceEnabled) { LOG.trace("subscribe: {}@{}", subscriber.getClass().getName(), Objects.hashCode(subscriber)); }
// Reuse the first command but then discard it. RedisCommand<K, V, T> command = ref.get();
if (command != null) { if (!ref.compareAndSet(command, null)) { command = commandSupplier.get(); } } else { command = commandSupplier.get(); }
RedisSubscription<T> redisSubscription = new RedisSubscription<>(connection, command, dissolve, executor); redisSubscription.subscribe(subscriber); }
// 其他方法略}
复制代码

我们知道标准的 Publisher.subscribe 主要是完成订阅工作,ref 是一个 RedisCommand Supplier 的原子引用,上述代码有段奇怪的逻辑,即获取 command 的部分,其实主要目的是为了防止单个 command 被重复使用,一旦发现重复使用(即 ref.compareAndSet(command, null)为 false),则重新使用 RedisCommand Supplier 获得 command。当然,最关键的还是下面的构建 RedisSubscription 对象(即前面 Reactive 中的订阅关系角色)并调用其 subscribe 方法,需要注意一点的是,我们是通过传入订阅者 subscriber 来构建 RedisSubscription,这当然合情合理。

        接下来,我们剖析下 RedisSubscription,有人肯定会好奇,订阅关系 Subscription 理论上有用于请求 n 个元素的 request 和取消订阅的 cancel 方法,为啥还会有发布者 Publisher 中的 subscribe 方法?其实这并不是说 RedisSubscription 也是一个 Publisher 角色,而是这个方法发是被 Publisher 的 subscribe 方法来调用,所以干脆也叫做 subscribe。RedisSubscription.subscribe 方法实现如下:

        /**         * Subscription procedure called by a {@link Publisher}         *         * @param subscriber the subscriber, must not be {@code null}.         */        void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) { throw new NullPointerException("Subscriber must not be null"); }
// UNSUBSCRIBED 状态 State state = state();
if (traceEnabled) { LOG.trace("{} subscribe: {}@{}", state, subscriber.getClass().getName(), subscriber.hashCode()); }
// 调用后进入 NO_DEMAND 状态 state.subscribe(this, subscriber); }
复制代码

​可以看到该方法很简单,就是调用 state 的 subscribe 方法。State 是什么?State 是 RedisPublisher 中的一个枚举类型,用来表示 RedisSubscription(其实也是在 RedisPublisher 中定义)所处的状态,你可以发现,RedisSubscription 作为订阅关系用来表示订阅者 Subscriber 的订阅状态,而 State 用来表示订阅关系的状态。按照该枚举的定义,State 有如下状态:

  • UNSUBSCRIBED 初始的未订阅状态,调用其 subscribe 方法可以进入 NO_DEMAND 状态。

  • NO_DEMAND 没有需求时进入的状态

  • DEMAND 有需求时进入的状态

  • READING 有数据可以读取时进入的状态

  • COMPLETED 完成状态,不会再接受任何事件

部分状态能双向转换(例如 NO_DEMAND 和 DEMAND),最初是 UNSUBSCRIBED 状态,如代码所示,调用 state.subscribe 就会由 UNSUBSCRIBED 变为 NO_DEMAND 状态,我们看下 state.subscribe 的代码:

        UNSUBSCRIBED {
@SuppressWarnings("unchecked") @Override void subscribe(RedisSubscription<?> subscription, Subscriber<?> subscriber) {
LettuceAssert.notNull(subscriber, "Subscriber must not be null");
if (subscription.changeState(this, NO_DEMAND)) {
// 实际创建的是 PublishOnSubscriber 类型 subscription.subscriber = RedisSubscriber.create(subscriber, subscription.executor); subscriber.onSubscribe(subscription); } else { throw new IllegalStateException(toString()); } }
}
复制代码

​可以看到,一旦修改成 NO_DEMAND 成功,就会构建订阅者 RedisSubscriber 并绑定到订阅关系 RedisSubscription 上(后面会用到),最后调用了 subscriber 的 onSubscribe 方法(还记得之前聊到的响应式 API 规范吗?订阅成功后需要调用订阅者的 onSubscribe 方法),很显然到这里为止我们提到过的两个类 RedisPublisher 和 RedisSubscriber 就是发布者和订阅者的关系。

        然后呢?后面貌似也没代码了,我们为啥最后我们能神奇的获取到 redis get 命令的返回结果呢?其实内部是 reactor 这个响应式框架在驱动,我们回过头再来看下之前的响应式 demo:

RedisStringReactiveCommands<String, String> command = connection.reactive();Mono<String> get = command.get("key");get.subscribe(System.out::println); // 控制台输出value
复制代码

我们知道最后一行我们通过 Lambda 表达式创建了一个订阅者,对于 reactor 这个框架而言,实际创建的是 LambdaMonoSubscriber 对象,而这个 get 当然也是被 reactor 包装过的 Mono,但前面说过创建它的 createMono 方法实际上内部是从 RedisPublisher 来创建 Mono 的,所以我们调用的 get.subscribe 实际上最后会调用 RedisPublisher 的 subscribe 方法,这个方法我们前面已经讲解过逻辑了,所以上面的 state.subscribe 方法很重要的一点是调用了实际订阅者的 onSubscribe 方法(理论上这个方法应该是 reactor 框架来驱动调用的,但由于 lettuce-core 在实现 Reactive 时是自己构建的发布者-RedisPublisher,所以这个方法得 lettuce 自己来调用)。而这个订阅者 subscriber(这里是 LambdaMonoSubscriber 对象)的 onSubscribe 方法中会去调用订阅关系(即 RedisSubscription)的 request 方法:

        @Override        public final void request(long n) {            // 这时的 state 是 NO_DEMAND 状态            State state = state();
if (traceEnabled) { LOG.trace("{} request: {}", state, n); }
state.request(this, n); }
复制代码

​这时的 state 是 NO_DEMAND 状态request 方法实际上是去调用 state.request 方法,我们看下 NO_DEMAND 的 request 方法做了什么:

        NO_DEMAND {
@Override void request(RedisSubscription<?> subscription, long n) {
if (Operators.request(RedisSubscription.DEMAND, subscription, n)) {
if (subscription.changeState(this, DEMAND)) {
try { // 实际上调用的是它的dispatchCommand方法,向Redis发送要执行的命令 subscription.checkCommandDispatch(); } catch (Exception ex) { subscription.onError(ex); } subscription.checkOnDataAvailable(); }
subscription.potentiallyReadMore(); subscription.state().onDataAvailable(subscription); } else { onError(subscription, Exceptions.nullOrNegativeRequestException(n)); } }
}
复制代码

可以看到,该方法会将 state 由 NO_DEMAND 更新成 DEMAND,一旦更新成功,就会调用订阅关系 RedisSubscription 的 checkOnDataAvailable 方法,该方法最终会调用 RedisChannelHandler 对象的 dispatch 方法往其 RedisChannelWriter 类型的属性对象 channelWriter 中写数据,代码如下:

    protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {        return channelWriter.write(cmd);    }
复制代码

说明这时候已经开始向 Redis 发送命令了,接着我们还是回过头来看 NO_DEMAND 的 request 方法,接着调用的是订阅关系 RedisSubscription 的 checkOnDataAvailable 方法,该方法代码如下:

        final Queue<T> data = Operators.newQueue();
// 其他代码略
void checkOnDataAvailable() { // 如果数据为空,就去请求数据 if (data.isEmpty()) { potentiallyReadMore(); } // 如果数据不为空,那么通知数据可用 if (!data.isEmpty()) { onDataAvailable(); } }
void potentiallyReadMore() { // getDemand获取的是订阅关系中request中请求的元素数量值,reactor中默认传Long.MAX_VALUE if ((getDemand() + 1) > data.size()) { // 当前state是DEMAND状态,调用readData方法后变成READING状态 state().readData(this); } }
/** * Called via a listener interface to indicate that reading is possible. */ final void onDataAvailable() { State state = state(); state.onDataAvailable(this); }
复制代码

​这块代码我们先忽略(笔者发现这段代码逻辑有点问题,详见:Fix the RedisPublisher.RedisSubscription#potentiallyReadMore demand long overflow bug by manzhizhen · Pull Request #2383 · lettuce-io/lettuce-core · GitHub),我们先看 NO_DEMAND 的 request 方法的 “subscription.state().onDataAvailable(subscription);” 这一行,该行才是关键,这时候 subscription.state()是 DEMAND,所以我们看下 DEMAND 的 onDataAvailable 方法:

        DEMAND {            @Override            void onDataAvailable(RedisSubscription<?> subscription) {                try {                    do {                        if (!read(subscription)) {                            return;                        }                    } while (subscription.hasDemand());                } catch (Exception e) {                    subscription.onError(e);                }            }
private boolean read(RedisSubscription<?> subscription) {
State state = subscription.state();
// concurrency/entry guard if (state == NO_DEMAND || state == DEMAND) { // 尝试将state变成READING状态,表明我们在读取数据 if (!subscription.changeState(state, READING)) { return false; } } else { return false; }
// 尝试读取并发射数据 subscription.readAndPublish();
if (subscription.allDataRead && subscription.data.isEmpty()) { state.onAllDataRead(subscription); return false; }
// concurrency/leave guard subscription.afterRead();
if (subscription.allDataRead || !subscription.data.isEmpty()) { return true; }
return false; }
// 其他代码略 }
复制代码

可以看出 onDataAvailable 方法只要发现订阅关系 RedisSubscription 一直有需求(subscription.hasDemand())则会一直调用 read 方法,而 read 方法中最关键的就是 subscription.readAndPublish();方法的调用,我们看看其实现:

        /**         * Reads and publishes data from the input. Continues until either there is no more demand, or until there is no more         * data to be read.         */        void readAndPublish() {            while (hasDemand()) {                T data = read();                if (data == null) {                    return;                }                DEMAND.decrementAndGet(this);                this.subscriber.onNext(data);            }        }
/** * Reads data from the input, if possible. * * @return the data that was read or {@code null} */ protected T read() { return data.poll(); }
复制代码

可以看出其还是通过 data(final Queue<T> data = Operators.newQueue();)来交互的,调用一次 readAndPublish 方法只尝试从 data 里面 poll 一次,如果为 null,则直接返回,如果能 poll 到元素,那么将调用订阅者的 onNext 方法(这也符合响应流 API 规范)来发射该数据。

        总得有地方往 data 这个队列中放数据吧?否则上面的代码从 data 中获取到的永远是 null,也不会有机会调用订阅者的 onNext 方法。Lettuce 底层用的是 Netty 来实现 Redis 协议的,而且将 CommandHandler 注册到 Netty 的 Pipeline 中,CommandHandler 监听了读事件,后面会调用 CommandWrapper(前面提到的 RedisCommand 的实现类,负责处理 Redis 应答)的 complete 方法,而 complete 方法后续又会调用订阅关系 RedisSubscription 的 onNext 方法,我们这时候来看看该 onNext 方法:

        @Override        public void onNext(T t) {
State state = state();
if (state == State.COMPLETED) { return; }
// 如果data为空,而且当前state处于DEMAND状态(说明还没进入读数据状态),那么直接调用订阅者的onNext来发射数据 if (data.isEmpty() && state() == State.DEMAND) {
long initial = getDemand();
if (initial > 0) {
try { DEMAND.decrementAndGet(this); // 直接调用订阅者的onNext来发射数据,此例子中就是我们的System.out.println的Lambda表达式 this.subscriber.onNext(t); } catch (Exception e) { onError(e); } return; } }
// 如果有data队列中有数据,那么就将数据放进队列,如果放进队列失败,则发射一个错误 if (!data.offer(t)) {
Subscriber<?> subscriber = this.subscriber; Context context = Context.empty(); if (subscriber instanceof CoreSubscriber) { context = ((CoreSubscriber) subscriber).currentContext(); }
Throwable e = Operators.onOperatorError(this, Exceptions.failWithOverflow(), t, context); onError(e); return; }
// 通知数据可用,后续会从data中读取数据 onDataAvailable(); }
复制代码

​这样一切就圆满了,订阅关系 RedisSubscription 的 onNext 方法最终被 Redis 的读事件驱动(CommandHandler),而最终通过 data 队列或者直接调用订阅者的 onNext 方法,让订阅者来消费该元素。

        让我们简单总结下,Lettuce 的 Reactive 的实现方式就是将 Redis 数据发送数据应答事件和 Reactive 库类(之前用的是 RxJava,后面用的是 reactor)绑定,从而让 Lettuce 支持响应式 API 编程。那这种方式和我们之前直接用 myReactiveRedisGet 方法来包装 Lettuce 的阻塞执行方法有什么区别?最大的一个区别是我们不用阻塞当前的业务线程,而是让其他线程(这里是 Netty 的工作线程)来发射元素并执行订阅者的方法(即本例子中的 System.out.println 的 Lambda 表达式),这样业务线程可以做其他事情(例如继续处理其他请求),而且重要的是另一点,即使 Redis 响应慢,也不会占用到 Netty 的工作线程,有应答时才会需要该工作线程处理。注意,订阅者的方法真的应该由 Netty 的工作线程来执行吗?这也许并不一定是个好注意,我们期望可以通过 subscribeOn 来修改此行为,我们来修改下之前给的 Reactive 的 demo:

RedisClient client = RedisClient.create("redis://localhost");StatefulRedisConnection<String, String> connection = client.connect();RedisStringReactiveCommands<String, String> command = connection.reactive();Mono<String> get = command.get("key");// get.subscribe(System.out::println); // 控制台输出valueget.publishOn(Schedulers.parallel()).subscribe(System.out::println); // 使用Schedulers.parallel()的线程池来进行控制台输出value
复制代码

这里我们直接使用的是 Schedulers.parallel()线程池。




发布于: 2023-04-22阅读数: 39
用户头像

大步流星

关注

还未添加个人签名 2018-11-24 加入

还未添加个人简介

评论

发布
暂无评论
Reactive响应式编程系列:解密Lettuce如何实现响应式_Reactive_大步流星_InfoQ写作社区