写点什么

Netty 源码学习 7——netty 是如何发送数据的

  • 2023-12-04
    福建
  • 本文字数:3843 字

    阅读完需:约 13 分钟

一丶 Write 事件的产生和传播


在业务逻辑处理完毕后,需要调用 write 或者 writeAndFlush 方法


  • ChannelHandlerContext#write or writeAndFlush方法会从当前 ChannelHandler 开始在 pipeline 中向前传播 write 事件直到 HeadContext。


  • ChannelHandlerContext.channel()#write or writeAndFlush 方法则会从 pipeline 的尾结点 TailContext 开始在 pipeline 中向前传播 write 事件直到 HeadContext 。


write 方法并不是真将数据写到 socket 缓存区,而是写道 Netty 的 ChannelOutBoundBuffer 中,调用 flush 方法才会真正调用 JDK SockectChannel 将数据写入。


如下是 pipeline#writeAndFlush,可以看到直接调用 TailContext#writeAndFlush 进行处理



关键源码如下:


private void write(Object msg, boolean flush, ChannelPromise promise) {   // 省略 部分	    //flush 表示是否需要flush,调用writeAndFlush的时候为true    // 找到下一个ChannelHandlerContext    final AbstractChannelHandlerContext next = findContextOutbound(flush ?            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);    final Object m = pipeline.touch(msg, next);    EventExecutor executor = next.executor();    // 在eventLoop 中    if (executor.inEventLoop()) {        // 需要flush 那么调用invokeWriteAndFlush        if (flush) {            next.invokeWriteAndFlush(m, promise);        } else {            next.invokeWrite(m, promise);        }    } else {        // 在eventLoop 中 那么提交一个WriteTask到eventLoop中        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);        if (!safeExecute(executor, task, promise, m, !flush)) {            task.cancel();        }    }}
复制代码


可以看到 TailContext 会通过 findContextOutbound 方法在当前 ChannelHandler 的前边找到 ChannelOutboundHandler 类型并且覆盖实现 write 回调方法的 ChannelHandler 作为下一个要执行的对象。


然后如果当前执行的线程就是 EventLoop 线程,那么直接调用,反之提交一个异步任务,从而保证执行 write 的一定是 reactor 线程——保证线程安全性


如下是next.invokeWriteAndFlush的源码



最终事件会传播到 HeadContext 进行处理(如果中间的 ChannelHandler 不截胡的话)



二丶 write 源码解析


write 事件最终会由 HeadContext 进行处理



可以看到 HeadContext#write 其实就是使用 Channel 的 Unsafe#write,其主要逻辑如下



ChannelOutboundBuffer#addMessage


ChannelOutboundBuffer 是 Netty 内部使用的一个数据结构,它用于存储待发送的出站数据。在 Netty 的网络框架中,当需要写数据到网络时,数据并不会立即被发送出去,而是首先被放入一个出站缓冲区中,即 ChannelOutboundBuffer。这个缓冲区负责管理和存储所有待写入通道的数据。


  • 批量发送优化: ChannelOutboundBuffer 允许 Netty 批量地发送数据,而不是每次写操作都立即进行网络发送。这样可以减少系统调用次数,提高网络效率。


  • 流量控制: 它有助于实现流量控制,防止数据发送过快,导致接收方处理不过来。


  • 缓冲区管理: 可以有效地管理内存,当数据被写入网络后,及时释放相应的内存。


  • 异步处理: Netty 是异步事件驱动的框架,使用 ChannelOutboundBuffer 可以将数据发送的异步化,提升处理性能


下面是向 ChannelOutboundBuffer 写入 messge 的源码



可与看到 ChannelOutboundBuffer 会将 msg 和 promise 包装为 Entry,然后改变 tailEntry,flushedEntry,unflushedEntry 指针的指向



然后 incrementPendingOutboundBytes 将记录下待写出数据 size,如果大于高水位还会触发 channelWritabilityChanged 事件



channelWritabilityChanged 会在 pipeline 上传播,并触发ChannelInboundHandler#channelWritabilityChanged,我们可以实现此方法调用 flush 将数据写出


三丶 flush 源码解析


上面看了 write 将待发送的数据缓存到 ChannelOutboundBuffer 中,正真将数据写到 SocketChannel 中的是 flush 方法



1.addFlush


此方法只是负责更改 flushedEntry 和 unflushedEntry 指针指向



将 flushedEntry 指针指向 unflushedEntry 指针表示的第一个未被 flush 的 Entry 节点。并将 unflushedEntry 指针置为空,准备开始 flush 发送数据流程。


这样在 flushedEntry 与 tailEntry 之间的 Entry 节点即为本次 flush 操作需要发送的数据范围。


public void addFlush() {        Entry entry = unflushedEntry;        if (entry != null) {            if (flushedEntry == null) {                flushedEntry = entry;            }            do {                flushed ++;                //如果当前entry对应的write操作被用户取消,则释放msg,并降低channelOutboundBuffer水位线                if (!entry.promise.setUncancellable()) {                                      int pending = entry.cancel();                    decrementPendingOutboundBytes(pending, false, true);                }                entry = entry.next;            } while (entry != null);
// All flushed so reset unflushedEntry unflushedEntry = null; } }
复制代码


2.flush0



可以看到如果注册了 write 到 selector 上,那么不会进行 flush,


如下是 NioSockectChannel 发送数据的源码


@Override    protected void doWrite(ChannelOutboundBuffer in) throws Exception {        //获取jdk nio底层socketChannel        SocketChannel ch = javaChannel();        //最大写入次数 默认为16 ,因为EventLoop可能单线程处理多Channel,需要雨露均沾        int writeSpinCount = config().getWriteSpinCount();        do {            if (in.isEmpty()) {                // 如果全部数据已经写完 则移除OP_WRITE事件并直接退出writeLoop                clearOpWrite();                             return;            }
// 获取单次发送最大字节数 int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); //Netty的DirectBuffer底层就是JDK的DirectByteBuffer // 将ChannelOutboundBuffer中缓存的DirectBuffer转换成JDK的ByteBuffer, ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); // ChannelOutboundBuffer中总共的DirectBuffer数 int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) { // 真正进行发送 //java.nio.channels.SocketChannel#write(java.nio.ByteBuffer)进行写回 } } while (writeSpinCount > 0); // 处理Socket可写但已经写满16次还没写完的情况 incompleteWrite(writeSpinCount < 0); }
复制代码


可以看到


  • 如果数据全部写完了,会调用 clearOpWrite 清除当前 Channel 在 Reactor 上注册的 OP_WRITE 事件



这意味着,不需要再监听 write 来触发 flush 了


  • 写入的过程会写入多次,并控制自旋次数,做到雨露均沾



如上是写入的过程


  • 如果 ByteBuffer 个数为 0,说明发送的是 FileRegion 类型,case 0的分支主要就是用于处理网络文件传输的情况



  • case1 和 default 则调用 jdk SocketChannel#write 进行数据发送,如果写入的数据小于等于 0,说明当前 Socket 发送缓冲区满了写不进去了,则注册 OP_WRITE 事件,等待 Socket 发送缓冲区可写时再写



触发 Write 后,再 Sockect 写缓冲区可写后,会触发对应事件,即可再 NioEventLoop 中进行处理,如下图中会直接调用 forceFlush



  • 完成发送会调用 adjustMaxBytesPerGatheringWrite 进行调整


两个分支分别表示


  • 期望写入和真正写入的相等,说明数据能全部写入到 Socket 的写缓冲区中了,那么下次 write loop 就应该尝试去写入更多的数据。


  • 本次写入的数量 x2>maxBytesPerGatheringWrite 说明要写的数据很多,那么更新为本次 write loop 两倍的写入量大小


  • 如果本次写入的数据还不及尝试写入数据的一半,说明 Socket 写缓冲区容量不多了,尝试缩容为一半


  • 处理


protected final void incompleteWrite(boolean setOpWrite) {            if (setOpWrite) {            //socket缓冲区已满写不进去的情况 注册write事件            setOpWrite();        } else {            //处理socket缓冲区依然可写,但是写了16次还没写完,提交flushTask异步写            clearOpWrite();            eventLoop().execute(flushTask);
}
复制代码


四丶总结

这一节中我们学习了 netty 写入数据的流程,写入数据时出站事件,一般最终将有 HeadContext 进行处理


  • write 方法将写入的数据转换为 DirectByteBuf 包装到 ChannelOutboundBuffer 中,并且记录了对应的 Promise 实现异步驱动,还可以减少系统调用


  • flush 方法,调用 jdk SocketChannel#write 进行写入,使用自旋次数控制,让多个 Channel 的处理得到平衡,如果 Socket 缓冲区满无法在继续写入那么会 OP_WRITE 事件,等 Socket 缓冲区变的可写时,epoll 通知 EventLoop 线程继续发送。


  • Socket 缓冲区可写,写满 16 次但依然没有写完,这时候注册异步任务使用 EventLoop 线程进行异步发送。如果写的时 FileRegion 类型,那么会使用 transferTo 进行零拷贝写入。


文章转载自:Cuzzz

原文链接:https://www.cnblogs.com/cuzzz/p/17873524.html

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
Netty源码学习7——netty是如何发送数据的_学习_不在线第一只蜗牛_InfoQ写作社区