写点什么

Netty 源码解析 -- ChannelPipeline 机制与读写过程

用户头像
binecy
关注
发布于: 2020 年 10 月 24 日
Netty源码解析 -- ChannelPipeline机制与读写过程

本文继续阅读 Netty 源码,解析 ChannelPipeline 事件传播原理,以及 Netty 读写过程。

源码分析基于 Netty 4.1


ChannelPipeline

Netty 中的 ChannelPipeline 可以理解为拦截器链,维护了一个 ChannelHandler 链表,ChannelHandler 即具体拦截器,可以在读写过程中,对数据进行处理。

ChannelHandler 也可以分为两类。

ChannelInboundHandler,监控 Channel 状态变化,如 channelActive,channelRegistered,通常通过重写 ChannelOutboundHandler#channelRead 方法处理读取到的数据,如 HttpObjectDecoder 将读取到的数据解析为(netty)HttpRequest。

ChannelOutboundHandler,拦截 IO 事件,如 bind,connect,read,write,通常通过重写 ChannelInboundHandler#write 方法处理将写入 Channel 的数据。如 HttpResponseEncoder,将待写入的数据转换为 Http 格式。


ChannelPipeline 的默认实现类为 DefaultChannelPipeline,它在 ChannelHandler 链表首尾维护了两个特殊的 ChannelHandler -- HeadContext,TailContext。

HeadContext 负责将 IO 事件转发给对应的 UnSafe 处理,例如前面文章中说到的 register,bind,read 等操作。

TailContext 主要是一些兜底处理,如 channelRead 方法释放 ByteBuf 的引用等。


事件传播

ChannelOutboundInvoker 负责触发 ChannelOutboundHandler 的方法,他们方法名相同,只是 ChannelOutboundInvoker 方法中少了 ChannelHandlerContext 参数。

同样,ChannelInboundInvoker 负责触发 ChannelInboundHandler 的方法,但 ChannelInboundInvoker 的方法名多了 fire,如 ChannelInboundInvoker#fireChannelRead 方法,触发 ChannelInboundHandler#channelRead。

ChannelPipeline 和*ChannelHandlerContext*都继承了这两个接口。

但他们作用不同,ChannelPipeline 是拦截器链,实际请求委托给 ChannelHandlerContext 处理。

ChannelHandlerContext 接口(即 ChannelHandler 上下文)维护了链表的上下节点,它作为 ChannelHandler 方法参数, 负责与 ChannelPipeline 及其他 ChannelHandler 互动。通过它可以动态修改 Channel 的属性,给 EventLoop 提交任务,也可以向下一个(上一个)ChannelHandler 传播事件。

例如,在 ChannelInboundHandler#channelRead 处理完数据后,可以通过 ChannelHandlerContext#write 将数据写到 Channel。

ChannelInboundHandler#handler 方法返回真正的 ChannelHandler,并使用该 ChannelHandler 执行实际操作。

通过 DefaultChannelPipeline#addFirst 等方法添加 ChannelHandler 时,Netty 会为 ChannelHandler 构造一个 DefaultChannelHandlerContext,handler 方法返回对应的 ChannelHandler。

HeadContext,TailContext 也实现了 AbstractChannelHandlerContext,handler 方法返回自身 this。


我们也可以通过 ChannelHandlerContext 给 EventLoop 提交异步任务

ctx.channel().eventLoop().execute(new Runnable() {	public void run() {		...	}});
复制代码

对于阻塞时间较长的操作,使用异步任务完成是不错的选择。


下面以 DefaultChannelPipeline#fireChannelRead 为例,看一下他们的事件传播过程。

DefaultChannelPipeline

public final ChannelPipeline fireChannelRead(Object msg) {	AbstractChannelHandlerContext.invokeChannelRead(head, msg);	return this;}
复制代码

使用 HeadContext 作为开始节点,调用 AbstractChannelHandlerContext#invokeChannelRead 方法开始调用拦截器链表。


AbstractChannelHandlerContext

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {	final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);	EventExecutor executor = next.executor();	if (executor.inEventLoop()) {		next.invokeChannelRead(m);	} else {		...	}}
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { // #1 ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); }}
复制代码

#1

handler 方法获取 AbstractChannelHandlerContext 真正的 Handler,再触发其 ChannelPipeline#channelRead 方法

由于 invokeChannelRead 方法在 HeadContext 中执行,handler()这里返回 HeadContext,这时会触发 HeadContext#channelRead


HeadContext#channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {	ctx.fireChannelRead(msg);}
复制代码

HeadContext 方法调用ctx.fireChannelRead(msg),就是向下一个 ChannelInboundHandler 传播事件。


AbstractChannelHandlerContext#fireChannelRead

public ChannelHandlerContext fireChannelRead(final Object msg) {	invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);	return this;}
复制代码

AbstractChannelHandlerContext#fireChannelRead(final Object msg)方法主要负责找到下一个 ChannelInboundHandler,并触发其 channelRead 方法。


从 DefaultChannelPipeline#fireChannelRead 方法可以看到一个完整的调用链路:

#1 DefaultChannelPipeline 通过 HeadContext 开始调用

#2 ChannelInboundHandler 处理完当前逻辑后,调用ctx.fireChannelRead(msg)向后传播事件

#3 AbstractChannelHandlerContext 找到下一个 ChannelInboundHandler,并触发其 channelRead,从而保证拦截器链继续执行。


注意:对于 ChannelOutboundHandler 中的方法,DefaultChannelPipeline 从 TailContext 开始调用,并向前传播事件,与 ChannelInboundHandler 方向相反。

大家在阅读 Netty 源码时,对于 DefaultChannelPipeline 的方法,要注意该方法底层调用是 ChannelInboundHandler 还是 ChannelOutboundHandler 的方法,以及他们的传播方向。


如果我们定义一个 Http 回声程序,示意代码如下

new ServerBootstrap().group(parentGroup, childGroup)                .channel(NioServerSocketChannel.class)                .childHandler(new ChannelInitializer<SocketChannel>() {                    public void initChannel(SocketChannel ch) throws Exception {                        ChannelPipeline p = ch.pipeline();                        p.addLast(new HttpRequestDecoder());				        p.addLast(new HttpResponseEncoder());				        p.addLast(new LoggingHandler(LogLevel.INFO));				        p.addLast(new HttpEchoHandler());                    }                });
复制代码


其中 HttpEchoHandler 实现了 ChannelInboundHandler,并在 channelRead 方法中调用 ChannelHandlerContext#write 方法回传数据。

那么,数据流转如下所示

Socket.read() -> head#channelRead  -> HttpRequestDecoder#channelRead -> LoggingHandler#channelRead -> HttpEchoHandler#channelRead                                                                                                                 |                                                                                                                \|/Socket.write() <-   head#write     <- HttpResponseEncoder#write     <-     LoggingHandler#write   <-  ChannelHandlerContext#write
复制代码

ChannelHandlerContext#write 和 DefaultChannelPipeline#write 不同,前者从当前节点向前找到一个 ChannelOutboundHandler 开始调用,而后者则是从 tail 开始调用。


Read

前面文章《事件循环机制实现原理》中说过,NioEventLoop#processSelectedKey 中,通过 NioUnsafe#read 方法处理 accept 和 read 事件。下面来看一些 read 事件的处理。

NioByteUnsafe#read

public final void read() {	final ChannelConfig config = config();	if (shouldBreakReadReady(config)) {		clearReadPending();		return;	}	final ChannelPipeline pipeline = pipeline();	final ByteBufAllocator allocator = config.getAllocator();	final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();	allocHandle.reset(config);
ByteBuf byteBuf = null; boolean close = false; try { do { // #1 byteBuf = allocHandle.allocate(allocator); // #2 allocHandle.lastBytesRead(doReadBytes(byteBuf)); // #3 if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { readPending = false; } break; }
allocHandle.incMessagesRead(1); readPending = false; // #4 pipeline.fireChannelRead(byteBuf); byteBuf = null; // #5 } while (allocHandle.continueReading()); // #6 allocHandle.readComplete(); // #7 pipeline.fireChannelReadComplete();
if (close) { // #8 closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { ... }}
复制代码

#1 分配内存给 ByteBuf

#2 读取 Socket 数据到 ByteBuf,这里默认会尝试读取 1024 字节的数据。

#3 如果 lastBytesRead 方法返回-1,表示 Channel 已关闭,这时释放当前 ByteBuf 引用,准备关闭 Channel

#4 使用读取到的数据,触发 ChannelPipeline#fireChannelRead,通常我们在这里处理数据。

#5 判断是否需要继续读取数据。

默认条件是,如果读取到的数据大小等于尝试读取数据大小 1024 字节,则继续读取。

#6 预留方法,提供给 RecvByteBufAllocator 做一些扩展操作

#7 触发 ChannelPipeline#fireChannelReadComplete,例如将前面多次读取到的数据转换为一个对象。

#8 关闭 Channel


注意,ChannelPipeline#fireChannelRead 如果不再继续传播 channelRead 事件,就不会执行到 TailContext#channelRead 方法,这是我们需要自行释放对应的 ByteBuf。

可以通过继承 SimpleChannelInboundHandler 类实现,SimpleChannelInboundHandler#channelRead 保证最终释放 ByteBuf。


Write

我们需要调用 ChannelHandlerContext#write 方法触发 write 操作。

ChannelHandlerContext#write -> HeadContext#write -> AbstractUnsafe#write

public final void write(Object msg, ChannelPromise promise) {	assertEventLoop();	// #1	ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;	...
int size; try { // #2 msg = filterOutboundMessage(msg); // #3 size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } // #4 outboundBuffer.addMessage(msg, size, promise);}
复制代码

#1 获取 AbstractUnsafe 中维护的 ChannelOutboundBuffer,该类负责缓存 write 的数据,等到 flush 再实际写数据。

#2 AbstractChannel 提供给子类的扩展方法,可以做一些 ByteBuf 检查,转化等操作。

#3 检查待写入数据量

#4 将数据添加到 ChannelOutboundBuffer 缓存中。

可以看到,write 并没有真正的写数据,而是将数据放到了一个缓冲对象 ChannelOutboundBuffer。

ChannelOutboundBuffer 中的数据要等到 ChannelHandlerContext#flush 时再写出。


ByteBuf 是 Netty 中负责与 Channel 交互的内存缓冲区,而 ByteBufAllocator,RecvByteBufAllocator 主要负责分配内存给 ByteBuf,后面有文章解析它们。

ChannelOutboundBuffer 主要是缓存 write 数据,等到 flush 时再一并写入 Channel。后面有文章解析它。


如果您觉得本文不错,欢迎关注我的微信公众号,您的关注是我坚持的动力!


发布于: 2020 年 10 月 24 日阅读数: 766
用户头像

binecy

关注

还未添加个人签名 2020.08.26 加入

还未添加个人简介

评论

发布
暂无评论
Netty源码解析 -- ChannelPipeline机制与读写过程