写点什么

Reactive 响应式编程系列:解密 reactor-netty 如何实现响应式

作者:大步流星
  • 2023-05-05
    浙江
  • 本文字数:6617 字

    阅读完需:约 22 分钟

        我们都说 Netty 是一款基于异步事件驱动来设计和实现的高性能 IO 框架,它之所以高性能,重要的原因之一是其线程模型的设计,Netty 的线程模型是基于 Reactor 设计模式的,它主要包含两个线程池:一个是 Boss 线程池,另一个是 Worker 线程池。Boss 线程池主要负责接受客户端连接请求,并将连接请求注册到 Worker 线程池中的某个线程的 Selector 上。Boss 线程池通常只有一个线程(如果建链也成为瓶颈,那么 Boss 线程池也可以有多个)。Worker 线程池主要负责处理客户端连接请求,并进行网络 I/O 操作。Worker 线程池的大小通常是根据 CPU 核数和业务需求来调整的。

        Reactor 设计模式最早由 Doug Schmidt 和 Steve Vinoski 在 1995 年的一篇论文《Reactor: An Object Behavioral Pattern for Concurrent Handling of Multiple Event-based Requests》中提出,这是一篇关于并发编程模式的经典论文。这篇论文对基于 Reactor 设计模式的网络编程框架进行了探讨。Reactor 设计模式是一种用于处理事件驱动 I/O 操作的设计模式,它主要包含以下两个核心组件:

1. Reactor:负责监听事件和分发事件,它将事件分发给对应的 Handler 处理器。

2. Handler:负责处理具体的事件,例如读取数据、解析数据、处理业务逻辑、发送响应数据等等。

Reactor 设计模式的基本思想是:当有事件发生时,Reactor 将事件分发给对应的 Handler 处理器,由 Handler 处理器来具体处理事件。在处理器处理事件的过程中,如果继续需要进行 I/O 操作,它会将 I/O 操作交给 Reactor 处理,由 Reactor 处理器负责监听 I/O 事件并分发给对应的 Handler 处理器。它可以提供高效的事件处理和 I/O 操作,避免了使用传统的同步阻塞式 I/O 模型的性能瓶颈。

        Reactor 设计模式 和 Reactive 响应式编程有一定相似之处,两者的核心思想都是事件驱动,通过异步处理来解决高并发下阻塞操作带来的资源消耗和性能下降问题,而 reacotor-netty (GitHub - reactor/reactor-netty: TCP/HTTP/UDP/QUIC client/server with Reactor over Netty)的目标就是将 Netty 框架和响应式组件库 project-reactor (GitHub - reactor/reactor-core: Non-Blocking Reactive Foundation for the JVM)打通.

        为了简单起见,我们用 HTTP 请求和应答处理过程来窥探 reactor-netty 的实现细节,在 HTTP 协议中,请求和响应消息分为两个部分:消息头和消息体。消息头包含了请求或响应的元数据信息,如请求方法、响应状态码、内容类型、内容长度等。而消息体则包含了请求或响应的实际内容。而当一个 HTTP 请求或响应结束时,需要发送一个空的消息体(EmptyContent)表示请求或响应结束了。因为 HTTP 协议是基于流的,在传输过程中,HTTP 消息是分多次发送的,每次发送都是一部分数据。当发送完最后一部分数据后,需要告诉接收方,请求或响应已经结束了。这个信号就是一个空的消息体。

        注意,为了展示方便和理解更容易,文中展示的源代码都经过了精简,请以 GitHub 上最新代码为准!!!

        有了这些基础,我们看一个 reactor-netty 的官方例子,官方给了一个 HTTP 的 client 和 server 的例子,我们这里只分析 client 端的例子:

String reqStr = "Go to Zibo for barbecue";System.out.println(Thread.currentThread().getName() + " 开始请求 " + reqStr);HttpClient httpClient = HttpClient.create().port(8888);httpClient.post()               // Specifies that POST method will be used          .uri("/test/world")   // Specifies the path          .send(ByteBufFlux.fromString(Flux.just(reqStr)))  // Sends the request body          .responseContent()    // Receives the response body          .aggregate()          .asString()          .subscribe(res ->                    System.out.println(Thread.currentThread().getName() + " 收到应答 " + res));
复制代码

从这个 client 端例子来看,初步看起来和 Reactive 有关的有两个地方,第一个是 send 方法入参貌似是一个 Flux,另一个是订阅方法 subscribe,其中我们用 Lambda 表达式直接把 HTTP 的应答结果打印出来,需要注意的是,我们特意也打印了线程的名称,这是为了突出展示 Reactive 的异步事件驱动特性,即发起请求的线程和接收应答的线程不是同一个线程。这里我们先剖析下 client 端的实现原理。client 端有两个阶段会有 IO 阻塞,一个是请求发送,一个等待请求应答,请求发送很明显在上述的 send 方法中来做,请求的应答目前看起来像是在 responseContent 方法中来实现。

        我们先来详细看看 send 方法,按照 Reactive 的要求,调用 send 方法不会导致请求立马发送,请求的发送时机一定是 subscribe 方法调用时!!!而为了让请求发送的异步性做的更彻底,我们需要把请求的构造和包装也做成异步,所以你会看到调用了 ByteBufFlux.fromString 方法,该方法返回值是一个 Flux 类型(即 ByteBufFlux)来作为一个请求数据缓冲区的发布者对象。我们看下 send 方法的内部实现:

final class HttpClientFinalizer extends HttpClientConnect implements HttpClient.RequestSender {
@Override public HttpClientFinalizer send(Publisher<? extends ByteBuf> requestBody) { Objects.requireNonNull(requestBody, "requestBody"); // 使用Lambda构造匿名内部类来封装请求发送 return send((req, out) -> out.send(requestBody)); }
@Override public HttpClientFinalizer send( BiFunction<? super HttpClientRequest, ? super NettyOutbound, ? extends Publisher<Void>> sender) { Objects.requireNonNull(sender, "requestBody"); HttpClient dup = duplicate(); // 将请求发送方法保存起来 dup.configuration().body = sender; return (HttpClientFinalizer) dup; }}
public final class HttpClientConfig extends ClientTransportConfig<HttpClientConfig> { BiFunction<? super HttpClientRequest, ? super NettyOutbound, ? extends Publisher<Void>> body;}
复制代码

可以看出来最终我们用 dup.configuration().body 保存了请求发送的 Lambda 对象(即匿名内部类,为了阐述方便,后面直接说是 Lambda 对象),而 dup 是一个 HttpClient 类型的对象,即我们给出的 client 官方例子中的流程发起类。send 方法就这么简单?直接复制一个 HttpClient 对象并将其 HttpClientConfig 的 body 属性赋值请求发送的匿名内部类?别忘记 Reactive 的核心之一就是异步事件驱动,在这个 case 中建立 TCP 连接,发送请求、等待应答这些会阻塞的操作,都由 Netty 框架来驱动,reactor-netty 负责将其和 reactor (即 project-reactor)组件打通。那么问题来了,send 方法入参的 Lamdba 对象只进行了赋值,那 “out.send(requestBody)” 什么时候被调用呢?如果是当前线程来执行,那么就失去异步的特性了,所以肯定还是 Netty 来事件驱动,这里就不得不提 ChannelOperationsHandler 这个类,由于它继承自 Netty 的 ChannelInboundHandlerAdapter,所以其实现的 channelActive 方法最终会触发 HttpClientConnect.HttpIOHandlerObserver.onStateChange 方法的调用,这意味着一有链接建立成功,就会触发 HTTP 请求的发送,该方法最终会触发 HttpClientHandler.requestWithBody 方法的调用,而这个 HttpClientHandler 对象中就包含了这个 Lambda 表达式对象,所以 reactor-netty 就通过 ChannelOperationsHandler 和 Netty 的事件循环打通了。

        接着我们看看 send 方法后面调用的 responseContent 方法,我们看看其实现:

final class HttpClientFinalizer extends HttpClientConnect implements HttpClient.RequestSender {	final HttpClientConfig config;	static final Function<ChannelOperations<?, ?>, Publisher<ByteBuf>> contentReceiver = ChannelOperations::receive;	    @Override	public ByteBufFlux responseContent() {		ByteBufAllocator alloc = (ByteBufAllocator) config.options()		                                                           .get(ChannelOption.ALLOCATOR);		if (alloc == null) {			alloc = ByteBufAllocator.DEFAULT;		}
@SuppressWarnings("unchecked") Mono<ChannelOperations<?, ?>> connector = (Mono<ChannelOperations<?, ?>>) connect(); return ByteBufFlux.fromInbound(connector.flatMapMany(contentReceiver), alloc); }}
class HttpClientConnect extends HttpClient { @Override protected Mono<? extends Connection> connect() { HttpClientConfig config = configuration(); return new MonoHttpConnect(config); }}
public class ChannelOperations<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> implements NettyInbound, NettyOutbound, Connection, CoreSubscriber<Void>, ChannelOperationsId { final FluxReceive inbound;
// 异步接收数据并返回一个缓冲区的Flux @Override public ByteBufFlux receive() { // fromInbound 用来将一个数据的发布者和一块缓冲区来封装成一个ByteBufFlux return ByteBufFlux.fromInbound(inbound, connection.channel() .alloc()); }}
复制代码

可以看到,这里使用了 Netty 中的 ByteBufAllocator,它是用于分配 ByteBuf 对象的工具,而 ByteBuf 是 Netty 中的字节缓冲区,用于存储和操作二进制数据。接着我们发现调用了 connect 方法来获取一个通道操作的 Mono,那么很显然这个 connect 方法并没有立马去创建链接,直接返回的是一个 MonoHttpConnect 对象,所以链接的建立过程肯定是在 MonoHttpConnect 中的 subscribe 方法来实现,感兴趣可以去看看。responseContent 方法最复杂的部分在 return 语句里,该方法最终返回的是 ByteBufFlux 类型对象,ByteBufFlux 其实就是把 Netty 中的 ByteBuf 进行了异步封装,封装成了 Flux<ByteBuf>。也就是说 responseContent 方法把建链和接收数据的缓冲区都做了异步封装并返回。之前我们说到在链接建立成功后就会发送 HTTP 请求,那么我们如何知道应答数据已经准备好了?还是得看 ChannelOperationsHandler,我们看其 channelRead 方法:

final class ChannelOperationsHandler extends ChannelInboundHandlerAdapter {	@Override	final public void channelRead(ChannelHandlerContext ctx, Object msg) {		if (msg == null || msg == Unpooled.EMPTY_BUFFER || msg instanceof EmptyByteBuf) {			return;		}		try {			ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());			if (ops != null) {                // 如果是 HTTP 则 ops 是 HttpClientOperations 对象				ops.onInboundNext(ctx, msg);			}		} catch (Throwable err) {			safeRelease(msg);			ctx.close();			exceptionCaught(ctx, err);		}	}}
复制代码

该方法会从 Channel 中取出通道操作对象(如果是 HTTP 则对应的是 HttpClientOperations,它是 ChannelOperations 的子类之一)并调用其 onInboundNext 方法,该方法将 Netty 输入的 Object msg 传递到 reactor-netty 的 FluxReceive 对象的 onInboundNext 方法,从这里开始进入了响应式阶段

final class FluxReceive extends Flux<Object> implements Subscription, Disposable {    CoreSubscriber<? super Object> receiver;	Queue<Object>                  receiverQueue;	final void onInboundNext(Object msg) {		if (receiverFastpath && receiver != null) {			try {				receiver.onNext(msg);			} finally {				ReferenceCountUtil.release(msg);			}		} else {			Queue<Object> q = receiverQueue;			if (q == null) {				// please note, in that case we are using non-thread safe, simple				// ArrayDeque since all modifications on this queue happens withing				// Netty Event Loop				q = new ArrayDeque<>();				receiverQueue = q;			}						q.offer(msg);			drainReceiver();		}	}}
复制代码

我们需要知道的是,在 Netty 中 HTTP 请求和响应都是由一系列的 DefaultHttpRequest 和 DefaultHttpResponse 对象组成的,其中 DefaultHttpContent 对象就是 HTTP 请求和响应的内容部分。当接收到 HTTP 请求时,Netty 会将请求头和请求体分开处理,其中请求体就是由一系列的 DefaultHttpContent 组成的。同样地,当发送 HTTP 响应时,Netty 会将响应头和响应体分开处理,其中响应体也是由一系列的 DefaultHttpContent 组成的。当 channelRead 接收到的 msg 为 DefaultHttpResponse 类型时,表明收到了应答的 HEADER 部分,接着当 channelRead 接收到的 msg 为 DefaultHttpContent 类型时,表明收到了应答体内容,最后当 channelRead 接收到的 msg 为 EmptyLastHttpResponse 类型时,表明该 HTTP 应答完整结束。有了这一基础知识后,reactor-netty 为了管理链接的生命周期,定义了 ConnectionObserver 这一接口,该接口是一个函数接口,需要被实现的是其 onStateChange 方法:

@FunctionalInterfacepublic interface ConnectionObserver {	/**	 * React on connection state change (e.g. http request or response)	 *	 * @param connection the connection reference	 * @param newState the new State	 */	void onStateChange(Connection connection, State newState);}
复制代码

它接收一个链接对象和一个新的状态,来承载链接状态变更,那么链接有哪些状态呢?

 ConnectionObserver,其内部的 State 接口定义了链接的各种状态:

	interface State {
/** * Propagated when a connection has been established and is available * 连接建立 */ State CONNECTED = ReactorNetty.CONNECTED;
/** * Propagated when a connection is bound to a channelOperation and ready for * user interaction * 已配置 */ State CONFIGURED = ReactorNetty.CONFIGURED;
/** * Propagated when a connection has been reused / acquired * (keep-alive or pooling) * 获得 */ State ACQUIRED = ReactorNetty.ACQUIRED;
/** * Propagated when a connection has been released but not fully closed * (keep-alive or pooling) * 已释放 */ State RELEASED = ReactorNetty.RELEASED;
/** * Propagated when a connection is being fully closed * 断开连接 */ State DISCONNECTING = ReactorNetty.DISCONNECTING; }
复制代码

我们再回到 channelRead 方法中 "ops.onInboundNext(ctx, msg);" 部分,前面说过 ops 其实是 HttpClientOperations 类型的对象,而 HttpClientOperations.onInboundNext 方法中将通过 msg 消息类型来判断链接的生命周期中所处的状态,整体过程比较复杂,这里不直接给出源码。

        接着是 aggregate 方法,从方法名来看就是做应答数据聚合使用的,它主要是从 ByteBufAllocator 取并发射数据,由于需要做成异步的,所以内部使用了 Mono.defer 来实现,需要注意的时,由于该方法需要聚合数据,所以用的是 doOnNext 方法,即在数据流真正调用 onNext 发射之前调用,该 doOnNext 方法将在 channelRead 方法中收到 EmptyLastHttpResponse 类型的 msg 来触发,这里暂时不给出源码。

        后面是 asString 方法根据默认的字符集来把缓冲区的数据变换为字符串。

        最后的 subscribe 方法我们都比较熟悉了,传入的 Lambda 表达式我们把应答结果打印出来。

        以上就是整个消息发送和接收应答过程,我们可以看到,一旦 subscribe 方法调用,整个建立链接、发送请求、接收应答、聚合应答数据 都是由另一个线程(reactor-http-nio-*)来完成,相当于主线程再触发完 subscribe 方法后就可以做其他事情去了,而不用等待应答,但需要注意的是,subscribe 中的 Lambda 表达式(即处理 HTTP 应答的业务逻辑)也由 reactor-http-nio-* 线程执行,请勿处理高开销操作。

发布于: 刚刚阅读数: 3
用户头像

大步流星

关注

还未添加个人签名 2018-11-24 加入

还未添加个人简介

评论

发布
暂无评论
Reactive响应式编程系列:解密reactor-netty如何实现响应式_Reactive响应式编程系列_大步流星_InfoQ写作社区