Netty 源码学习 7——netty 是如何发送数据的
一丶 Write 事件的产生和传播
在业务逻辑处理完毕后,需要调用 write 或者 writeAndFlush 方法
ChannelHandlerContext#write or writeAndFlush
方法会从当前 ChannelHandler 开始在 pipeline 中向前传播 write 事件直到 HeadContext。
ChannelHandlerContext.channel()#write or writeAndFlush
方法则会从 pipeline 的尾结点 TailContext 开始在 pipeline 中向前传播 write 事件直到 HeadContext 。
write 方法并不是真将数据写到 socket 缓存区,而是写道 Netty 的 ChannelOutBoundBuffer 中,调用 flush 方法才会真正调用 JDK SockectChannel 将数据写入。
如下是 pipeline#writeAndFlush,可以看到直接调用 TailContext#writeAndFlush 进行处理
关键源码如下:
可以看到 TailContext 会通过 findContextOutbound 方法在当前 ChannelHandler 的前边找到 ChannelOutboundHandler 类型并且覆盖实现 write 回调方法的 ChannelHandler 作为下一个要执行的对象。
然后如果当前执行的线程就是 EventLoop 线程,那么直接调用,反之提交一个异步任务,从而保证执行 write 的一定是 reactor 线程——保证线程安全性
如下是next.invokeWriteAndFlush
的源码
最终事件会传播到 HeadContext 进行处理(如果中间的 ChannelHandler 不截胡的话)
二丶 write 源码解析
write 事件最终会由 HeadContext 进行处理
可以看到 HeadContext#write 其实就是使用 Channel 的 Unsafe#write,其主要逻辑如下
ChannelOutboundBuffer#addMessage
ChannelOutboundBuffer 是 Netty 内部使用的一个数据结构,它用于存储待发送的出站数据。在 Netty 的网络框架中,当需要写数据到网络时,数据并不会立即被发送出去,而是首先被放入一个出站缓冲区中,即 ChannelOutboundBuffer。这个缓冲区负责管理和存储所有待写入通道的数据。
批量发送优化: ChannelOutboundBuffer 允许 Netty 批量地发送数据,而不是每次写操作都立即进行网络发送。这样可以减少系统调用次数,提高网络效率。
流量控制: 它有助于实现流量控制,防止数据发送过快,导致接收方处理不过来。
缓冲区管理: 可以有效地管理内存,当数据被写入网络后,及时释放相应的内存。
异步处理: Netty 是异步事件驱动的框架,使用 ChannelOutboundBuffer 可以将数据发送的异步化,提升处理性能
下面是向 ChannelOutboundBuffer 写入 messge 的源码
可与看到 ChannelOutboundBuffer 会将 msg 和 promise 包装为 Entry,然后改变 tailEntry,flushedEntry,unflushedEntry 指针的指向
然后 incrementPendingOutboundBytes 将记录下待写出数据 size,如果大于高水位还会触发 channelWritabilityChanged 事件
channelWritabilityChanged 会在 pipeline 上传播,并触发ChannelInboundHandler#channelWritabilityChanged
,我们可以实现此方法调用 flush 将数据写出
三丶 flush 源码解析
上面看了 write 将待发送的数据缓存到 ChannelOutboundBuffer 中,正真将数据写到 SocketChannel 中的是 flush 方法
1.addFlush
此方法只是负责更改 flushedEntry 和 unflushedEntry 指针指向
将 flushedEntry 指针指向 unflushedEntry 指针表示的第一个未被 flush 的 Entry 节点。并将 unflushedEntry 指针置为空,准备开始 flush 发送数据流程。
这样在 flushedEntry 与 tailEntry 之间的 Entry 节点即为本次 flush 操作需要发送的数据范围。
2.flush0
可以看到如果注册了 write 到 selector 上,那么不会进行 flush,
如下是 NioSockectChannel 发送数据的源码
可以看到
如果数据全部写完了,会调用 clearOpWrite 清除当前 Channel 在 Reactor 上注册的 OP_WRITE 事件
这意味着,不需要再监听 write 来触发 flush 了
写入的过程会写入多次,并控制自旋次数,做到雨露均沾
如上是写入的过程
如果 ByteBuffer 个数为 0,说明发送的是 FileRegion 类型,
case 0
的分支主要就是用于处理网络文件传输的情况
case1 和 default 则调用 jdk SocketChannel#write 进行数据发送,如果写入的数据小于等于 0,说明当前 Socket 发送缓冲区满了写不进去了,则注册 OP_WRITE 事件,等待 Socket 发送缓冲区可写时再写
触发 Write 后,再 Sockect 写缓冲区可写后,会触发对应事件,即可再 NioEventLoop 中进行处理,如下图中会直接调用 forceFlush
完成发送会调用 adjustMaxBytesPerGatheringWrite 进行调整
两个分支分别表示
期望写入和真正写入的相等,说明数据能全部写入到 Socket 的写缓冲区中了,那么下次 write loop 就应该尝试去写入更多的数据。
本次写入的数量 x2>maxBytesPerGatheringWrite 说明要写的数据很多,那么更新为本次 write loop 两倍的写入量大小
如果本次写入的数据还不及尝试写入数据的一半,说明 Socket 写缓冲区容量不多了,尝试缩容为一半
处理
四丶总结
这一节中我们学习了 netty 写入数据的流程,写入数据时出站事件,一般最终将有 HeadContext 进行处理
write 方法将写入的数据转换为 DirectByteBuf 包装到 ChannelOutboundBuffer 中,并且记录了对应的 Promise 实现异步驱动,还可以减少系统调用
flush 方法,调用 jdk SocketChannel#write 进行写入,使用自旋次数控制,让多个 Channel 的处理得到平衡,如果 Socket 缓冲区满无法在继续写入那么会 OP_WRITE 事件,等 Socket 缓冲区变的可写时,epoll 通知 EventLoop 线程继续发送。
Socket 缓冲区可写,写满 16 次但依然没有写完,这时候注册异步任务使用 EventLoop 线程进行异步发送。如果写的时 FileRegion 类型,那么会使用 transferTo 进行零拷贝写入。
文章转载自:Cuzzz
评论