写点什么

Flux 源码之 reactor 核心原理及概述

作者:漫游指南
  • 2021 年 11 月 22 日
  • 本文字数:4959 字

    阅读完需:约 16 分钟

读了此篇文章你将会明白以下几个问题。


1、reactor 相比传统模型有什么进步,他节省了什么,带来了什么。


2、为什么订阅者为什么能运行。


前言


响应式编程和传统 mvc 的不同在于对线程的使用方式不同,mvc 为每一个请求创建/取出一个空闲线程,进行处理。可以理解为一个 request serve 会创建一个线程进行处理。

而反式编程对应的则是一个请求对应多个线程进行处理。大家可以类比下之前的操作方式是一个 set 类型的数据结构,内部放入的是一个个的 request+thread 。那么反式编程则进行转换

为 map,key 为 thread,val 为请求。当请求到来时让这个请求进入一个个的流水线,而每个流水线单独一个线程。这样将请求与线程解耦,一旦该请求 io 过高则不会影响其它线程。

理论上线程是可以无限创建的。

这里有两张图能说明 reactor 的吞吐量大的情况下依然维持低延时的情况。


基础知识:


那么是什么让我们对 reactor 学习起来这么困难呢?我们先类别下其它的知识点。

1、spring 事件监听器也是这种发布订阅模型,不同点在于无法获取到返回值。

2、与之类似的编程模型还有 epoll 模型,redis 使用此模型提高性能,不过使用的是系统自带的函数。


但是 reactor 学习起来还是困难重重,我总结了下,拦在我们面前的拦路虎如下。

1,lanmba 表达式

2,操作符,操作符的本质是什么

3,reactor 流程运行

lamba 表达式

函数式编程的核心是 java.util.Objects.Function 这个接口。而不要把关注点放在 lamba 身上。

function 的核心是函数,也就是对应我们常说的 method()。不过 function 并不会立即执行,而是当有数据流入时按需加载执行。大家先看下面的 demo。

static class FunctionTest<T, R> { // 1        // 将行为进行抽象        public void func(Function<T, R> function, T t) {            R r = function.apply(t);            System.out.println(r);        }    }

public static void main(String[] args) { //2 FunctionTest<Integer, String> ft = new Main.FunctionTest<Integer, String>(); ft.func(i -> i * 2 + "", 61); }
复制代码

1、FunctionTest 时并不清楚传递的时候要做什么行为,但是他可以定义 T 和 R 以及对 T 和 R 的公共处理,这里大家能想到什么?我想到的是模板模式。

2、在 reactor 中有在订阅者未订阅之前,这些 function 是不加载的,如果 functionTest 里面有其它对象的引用等情况会造成内存的损耗。

3、对应于 jdk 自带的 java.util.function 我们应该如何理解呢。

lambda 的核心是一种假象,通过编译器做的一个内部类,注意该内部类是在 runting 中生成的。这也对应了在第二段说的懒加载的特点。大家可以下载 byteCode 插件,观察下 lambda 表达式最后的字节码是什么。


操作符

个人观点操作符就是装配器模式,你可以理解为数据的一个个变形,或者是一个个装数据的桶。为了进一步加深理解可以参考下 linux 命令符的管道。大家写 linux 命令时候可以做到一个命令的输出作为下一个命令的输入。操作符详细的可以参考官网。

后面有一个简单的 demo 是个人写的一个操作符,可以为大家理解操作符是什么提供帮助。


Flux 源码设计


我们同样类别下 spring 事件监听器来理解这一模型,如果没接触过 spring 监听器也没关系。首先 web 服务器接收到请求这一动作可以想象成为一个消息发布者,就好像海底捞有前台在说来客人了,我们叫他 Publisher 接口,意思为消息发布者,而 Flux 是 Publisher 是一个实现类。


饭店后台会有很多服务员,往往服务员空闲了会喊 xxx 桌有空位。我们叫他 subscriber 接口意思为消息订阅者。


大家先看 Publisher 的方法。关键点在个地方 ,一是 publisher.subscribe(订阅者),这里故意写成中文是为了提醒大家注意 publisher 前面带有<T> 这个泛型。大家先回想下泛型的作用,以及 jvm 是怎么处理泛型的。

第二个关键点在于订阅者里面是有 Subscription,那么这个 Subscription 是什么玩意呢?看代码


这个接口存在 2 个方法,当 2 个类关系时候我们还能理清楚,三个类又是什么关系呢?当 Subscription 调用 request 时候会发生什么呢?

比如现在订阅者压力很大要通知生产者停止生产那么就会调用 cancel。类有点多我们结合代码看下流程。代码过程里的 Flux 不过是对 reactor 的一层泛型封装而已。 我们此次使用 fluxFlatMap 做实例来解释下这三个类是如何交互的。

Flux.just(1).flatMap(a->{ // 1,调用flatmap            return Flux.just(1);        }).subscribe(b -> {            System.out.println("Main.main" + b);        });       
复制代码


//Flux 类  public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {		return flatMap(mapper, Queues.SMALL_BUFFER_SIZE, Queues				.XS_BUFFER_SIZE);	}

final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, boolean delayError, int concurrency, int prefetch) { return onAssembly(new FluxFlatMap<>( // 2 this, mapper, delayError, concurrency, Queues.get(concurrency), prefetch, Queues.get(prefetch) )); }
复制代码


看代码 2 处直接 new 了一个 FluxFlatMap,到这大家就明白前文说的操作符的本意是什么了。也就是说你也可以自己 new 一个新的操作符,让数据流按照你的方式进行定制。


继续向前,当 publisher.subscribe(订阅者), 具体到这里是 Flux.subscribe()。

	public final Disposable subscribe(			@Nullable Consumer<? super T> consumer,			@Nullable Consumer<? super Throwable> errorConsumer,			@Nullable Runnable completeConsumer) {		return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);// 1	}
public final Disposable subscribe( @Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext) { return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,// 2 completeConsumer, null, initialContext)); } public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) { subscribe(subscriber); // 3 return subscriber; }

@Override @SuppressWarnings("unchecked") public final void subscribe(Subscriber<? super T> actual) { CorePublisher publisher = Operators.onLastAssembly(this); CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
try { if (publisher instanceof OptimizableOperator) {// 4 判断是否为操作符 OptimizableOperator operator = (OptimizableOperator) publisher; while (true) { subscriber = operator.subscribeOrReturn(subscriber); if (subscriber == null) { // null means "I will subscribe myself", returning... return; } OptimizableOperator newSource = operator.nextOptimizableSource(); if (newSource == null) { publisher = operator.source(); break; } operator = newSource; } }
publisher.subscribe(subscriber); // 5 } catch (Throwable e) { Operators.reportThrowInSubscribe(subscriber, e); return; } }
复制代码


代码调用路径为 1->2->3 ->4。

我们具体看下 subscribe(Subscriber<? super T> actual) 这个方法。

if (publisher instanceof OptimizableOperator) // 4

这是判断当前的 publisher 是否为操作符,上面也讲过,flatMap 是个操作符,实际上也是 publisher 的一个实现。判断是则调用 FluxFlatMap 中的 subscribeOrReturn(CoreSubscriber<? super R> actual) 。代码追踪到这我们暂时不要深入。

我们先看下 CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);


这个 lambdaSubScriber 是怎么蹦出来的呢,这个入参的 actual 也是个 lambdaSubScriber 类。大家看代码 2 处,原来是 2 那里 new 出来的,我们画张图梳理下。


看到这里应该明白了,抛出接口,最后起作用的就三个,FluxFlatMap,是 Flux 的子类,实现 publisher 接口,是消息发布者。lambdaSubcriber 是一个消息订阅者。当 Flux.subscribe() 时候经过层层调用到 LambdaSubscriber<T>的 onNext(T x)方法(中间不是此次讨论重点过程略

@Override	public final void onNext(T x) {		try {			if (consumer != null) {				consumer.accept(x);			}		}		catch (Throwable t) {			Exceptions.throwIfFatal(t);			this.subscription.cancel();			onError(t);		}	}
复制代码


代码到这里大家估计能明白些了,这就是函数式编程的包装类啊。


手写一个简单版本的 Flux

估计看到这里还是有点绕,没关系我们模仿 Flux 也写一个简化版本的 Flux 好了。

1、创建一个自己的 Flux 类 实现 CorePublisher 接口

public class FluxTest<T> implements CorePublisher<T> {    @Override    public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
}
public final Disposable subscribe(Consumer<? super T> consumer) { subscribe(new LambdaSubscriberTest(consumer)); return null; }

@Override public void subscribe(Subscriber<? super T> subscriber) { System.out.println("FluxTest.subscribe"); }
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) { subscribe(subscriber); return subscriber; }
public static <T> FluxTest<T> just(T... data) { return new FluxJustTest(data); }
}
复制代码


2、创建一个 FluxJust 的子类,目的是模拟 Flux.just 方法,此处中类似操作符但略有不同。

public class FluxJustTest<T> extends FluxTest<T>{    T value;    FluxJustTest(T value) {        this.value = Objects.requireNonNull(value, "value");    }    @Override    public void subscribe(CoreSubscriber<? super T> actual) {        actual.onNext(this.value);       // actual.onSubscribe(Operators.scalarSubscription(actual, this.value, "just"));    }
复制代码


3、搞一个 lambdaSubscriber 的类用来模拟函数式订阅

public final class LambdaSubscriberTest<T> implements CoreSubscriber {    Consumer<? super Object> consumer;

LambdaSubscriberTest(Consumer<? super Object> consumer) { this.consumer = consumer; }
@Override public void onSubscribe(Subscription s) {
}
@Override public void onNext(Object x) { if (consumer != null) { consumer.accept(x);// System.out.println("LambdaSubscriberTest.onNext" );// consumer.accept(o);// consumer.accept(o); } }
@Override public void onError(Throwable throwable) {
}
@Override public void onComplete() {
}}
复制代码


最后来一个测试类

public class Main {    public static void main(String[] args) {        Flux.just(1).flatMap(a -> {            System.out.println("Main.main");                    return Flux.just(a+"1");                }        ).subscribe(b -> {            System.out.println("Main.main.subscribe" + b);        });
// FluxTest.just(3)// .subscribe(new ConsumerTest()); }
static class ConsumerTest implements Consumer { @Override public void accept(Object o) { System.out.println("ConsumerTest.accept" + o.toString()); } }}
复制代码


这里把原生的 Flux 和我们写的做个对比。


总结

Flux 实际上是利用 java 函数式编程语法糖和 NIO 的基于事件响应的模型弄的一个组合怪,目的是提高吞吐量和良好的代码写作。之所以难以理解也在此,希望大家通过此文能看破它的设计出来的“语法糖”明白它的原理。

用户头像

漫游指南

关注

还未添加个人签名 2020.07.20 加入

还未添加个人简介

评论

发布
暂无评论
Flux 源码之reactor 核心原理及概述