大厂 Offer 收割机:Netty 处理写事件之连环四问,你能抗住吗?
该系列已分别介绍了服务端、客户端的启动流程、网络读事件处理流程,本文将重点剖析 Netty 是如何封装 NIO 的写事件。
温馨提示:本文虽然是源码分析,但强烈建议精读,因为根据源码阐述其背后的设计哲学,也用黑体进行了标注,请特别留意。
在阅读本篇文章之前,请稍微思考如下几个问题:
写事件需要先注册才能往通道中写入数据?
什么时候需要向通道注册写事件呢?
业务线程池执行业务逻辑后,是如何通过 IO 线程将数据写入到网络中的呢?
Netty 中如何针对写限流
1、写事件概述
写事件,顾名思义,就是将数据写入网络,通过网络传输给接收端,通常我们知道业务都会在专属的业务线程池中执行,那数据是如何通过 IO 线程写入网络中的呢?
正如上图中的线程模型,业务线程池是如何将数据通过 IO 线程写入网络中的呢?
接下来我们将带着问题,尝试看 Netty 是如何封装 NIO 的写事件。
2、写事件处理流程分析
在 Netty 中,写事件的处理入口为 NioEventLoop 的 processSelectedKey 方法:
根据调用链,最终调用 AbstractChannel 的内部类 AbstractUnsafe 的 flush0 方法。
AbstractUnsafe#flush0
该方法有三个实现要点:
获取写缓存队列,如果写缓存队列为空,则跳过本次写事件。每一个通道独享一个写缓存队列,写事件触发执行的动作就是要将写缓存中的数据写入到网络中。
如果通道处于未激活状态,需要清理写缓存区
通过调用 doWrite 方法将写缓存中的数据写入网络通道中。
技巧:该方法的触发点是事件选择器,即 NIO 中的 Selector 事件线程,需要写入的数据是该通道对应的通道缓存,那写缓存中的数据从哪里来呢?通常业务线程如何将数据写入到通道的写缓存呢?
2.1 NIO 写事件的优雅封装
首先,我们将写缓存区当成一个黑盒,先重点看一下 doWrite 方法的实现,窥探一下 Netty 是如何基于 NIO 来处理网络写入的。
Step1:如果写缓存区中没有可写的数据,取消注册写事件。我们来看一下取消写事件的经典实现技巧:
首先判断一下注册键是否有效,然后通过位运算,取消写事件。
思考题:问题来了,取消写事件,从系统层面就无法继续触发写操作了,那后续如何触发写事件呢?
写事件的处理要考虑如下几个问题:
本次写缓存区中数据是否写完?
如果底层网络 Socket 缓存区积压,导致写缓冲区未写完如何处理?
如果网络缓存区数据特别大又如何处理?
Netty 给出的解决方案如下:
通过底层 NIO 的 SocketChannel 的 write 方法将数据写入到 Socket 缓存区,如果返回值为0,表示 Socket 缓冲区已满,需要暂停写入,具体做法,注册写事件,等待下次继续写入。
如果写缓存区的数据全部处理完毕,可取消注册写事件,避免毫无意义的写事件就绪。
如果写缓存区中数据很大,为了避免单个通道对其他通道的影响,默认设置单次写事件最多调用底层 NIO SocketChannel 的 write 方法次数,默认为 16。
写事件的核心处理要点就介绍到这里了。
2.2 通道写缓冲区详解
在 Netty 中调用通道的 write 方法并不会立即将数据写入到底层网络 Socket 中,而是写入到“写缓存区”,为应用级别的缓存区,即 ChannelOutboundBuffer,这是 Netty 实现写操作最重要的一个数据结构。
2.2.1 类图
ChannelOutboundBuffer 的核心类图如下:
核心属性与方法简介:
FastThreadLocal
NIO_BUFFERS
可以看出是线程本地变量 ThreadLocal 的优化版本,存储一个一个 ByteBuffer 数组。
Channel channel 该写缓存区所属的通道,每一个 Channel 独享一个写缓冲区。
Entry flushedEntry 表示第一个被刷新的 Entry,在写入时,从该 Entry 开始写。
Entry unflushedEntry 在链表中第一个未刷新的节点(未刷新链表中第一个节点)。
Entry tailEntry 在链表中尾部的节点。
int flushed 待写入的 entry 个数,这个数据代表在执行一次真正的 flush(flush0),将会有多少个 entry 中的内容会被写入到通道。
接下来我们按照写事件对待写入缓存区方法调用的顺序来讲解一下该方法的核心实现逻辑。
2.2.2 size 方法详解
返回本次写缓存区可期望刷新的消息个数(Entry)。在 NioSocketChannel 的 doWriter 方法中,如果 isEmpty 返回 true,直接结束本次写入操作,更加准确的是结束本次 flush 操作。
flushed 该字段代表的当时待写入的 Entry,如果为 0,表示没有待 flush 的 Entry,但不代表 ChannelOutboundBuffer 中没有 Entry 存在,比如调用 Channel.writer 方法,会往 ChannelOutboundBuffer 增加 Entry,但在没有调用 addFlush 方法之前,ChannelOutboundBuffer 中的 flushed 字段的值不会增加。
2.2.3 addMessage 方法详解
向写缓存中添加消息,方法本身的实现非常简单,因为 ChannelOutboundBuffer 其内部数据结构为一个链表,这是一个往链表中添加消息的过程,这里的关键点是该方法的调用入口为 Channel 的 write 方法,即调用通道的 write 方法只是将数据写入到写缓存,并不会触发真正的往网络中写消息。
该方法会调用 incrementPendingOutboundBytes,我们简单看一下该方法的实现细节:
该方法蕴含了 Netty 一个非常重要的机制,写操作限流,高低水位线机制。当缓存区中存储的数据超过了设置的高水位线(阔值),则会设置为不可写,并向通道传播写状态变更事件。
2.2.4 addFlush 方法详解
该方法并没有真正的执行刷新动作,而是计算可刷写的 Entry 个数,一次刷新动作,会将 unfluedEntry 开始,一直扫描到 tailEntry。
同样这里和 Netty 的写限流有关,将数据刷写后,会减少缓存区中的大小,如果低于设置的低水位线,会将缓存区恢复到可写状态。
该方法的调用入口为下图:
即在调用通道的 flush 方法时会先计算本次看刷写到 Socket 缓存区中的数据,然后执行 flush0 方法执行真正的网络写,该方法在第一部分中已详细介绍。
评论