Netty 如何高效接收网络数据?一文聊透 ByteBuffer 动态自适应扩缩容机制
本系列 Netty 源码解析文章基于 4.1.56.Final 版本
前文回顾
在前边的系列文章中,我们从内核如何收发网络数据开始以一个 C10K 的问题作为主线详细从内核角度阐述了网络 IO 模型的演变,最终在此基础上引出了 Netty 的网络 IO 模型如下图所示:
详细内容可回看《从内核角度看IO模型的演变》
后续我们又围绕着 Netty 的主从 Reactor 网络 IO 线程模型,在《Reactor模型在Netty中的实现》一文中详细阐述了 Netty 的主从 Reactor 模型的创建,以及介绍了 Reactor 模型的关键组件。搭建了 Netty 的核心骨架如下图所示:
在核心骨架搭建完毕之后,我们随后又在《详细图解Reactor启动全流程》一文中阐述了 Reactor 启动的全流程,一个非常重要的核心组件 NioServerSocketChannel 开始在这里初次亮相,承担着一个网络框架最重要的任务--高效接收网络连接。我们介绍了 NioServerSocketChannel 的创建,初始化,向 Main Reactor 注册并监听 OP_ACCEPT 事件的整个流程。在此基础上,Netty 得以整装待发,枕戈待旦开始迎接海量的客户端连接。
随后紧接着我们在《Netty如何高效接收网络连接》一文中详细介绍了 Netty 高效接收客户端网络连接的全流程,在这里 Netty 的核心重要组件 NioServerSocketChannel 开始正是登场,在 NioServerSocketChannel 中我们创建了客户端连接 NioSocketChannel,并详细介绍了 NioSocketChannel 的初始化过程,随后通过在 NioServerSocketChannel 的 pipeline 中触发 ChannelRead 事件,并最终在 ServerBootstrapAcceptor 中将客户端连接 NioSocketChannel 注册到 Sub Reactor 中开始监听客户端连接上的 OP_READ 事件,准备接收客户端发送的网络数据也就是本文的主题内容。
自此 Netty 的核心组件全部就绪并启动完毕,开始起飞~~~
之前文章中的主角是 Netty 中主 Reactor 组中的 Main Reactor 以及注册在 Main Reactor 上边的 NioServerSocketChannel,那么从本文开始,我们文章中的主角就切换为 Sub Reactor 以及注册在 SubReactor 上的 NioSocketChannel 了。
下面就让我们正式进入今天的主题,看一下 Netty 是如何处理 OP_READ 事件以及如何高效接收网络数据的。
1. Sub Reactor 处理 OP_READ 事件流程总览
客户端发起系统 IO 调用向服务端发送数据之后,当网络数据到达服务端的网卡并经过内核协议栈的处理,最终数据到达 Socket 的接收缓冲区之后,Sub Reactor 轮询到 NioSocketChannel 上的OP_READ事件
就绪,随后 Sub Reactor 线程就会从 JDK Selector 上的阻塞轮询 APIselector.select(timeoutMillis)
调用中返回。转而去处理 NioSocketChannel 上的OP_READ事件
。
注意这里的 Reactor 为负责处理客户端连接的 Sub Reactor。连接的类型为 NioSocketChannel,处理的事件为 OP_READ 事件。
在之前的文章中笔者已经多次强调过了,Reactor 在处理 Channel 上的 IO 事件入口函数为NioEventLoop#processSelectedKey
。
这里需要重点强调的是,当前的执行线程现在已经变成了 Sub Reactor,而 Sub Reactor 上注册的正是 netty 客户端 NioSocketChannel 负责处理连接上的读写事件。
所以这里入口函数的参数AbstractNioChannel ch
则是 IO 就绪的客户端连接NioSocketChannel
。
开头通过ch.unsafe()
获取到的 NioUnsafe 操作类正是 NioSocketChannel 中对底层 JDK NIO SocketChannel 的 Unsafe 底层操作类。实现类型为NioByteUnsafe
定义在下图继承结构中的AbstractNioByteChannel
父类中。
下面我们到NioByteUnsafe#read
方法中来看下 Netty 对OP_READ事件
的具体处理过程:
2. Netty 接收网络数据流程总览
我们直接按照老规矩,先从整体上把整个 OP_READ 事件的逻辑处理框架提取出来,让大家先总体俯视下流程全貌,然后在针对每个核心点位进行各个击破。
流程中相关置灰的步骤为 Netty 处理连接关闭时的逻辑,和本文主旨无关,我们这里暂时忽略,等后续笔者介绍连接关闭时,会单独开一篇文章详细为大家介绍。
从上面这张 Netty 接收网络数据总体流程图可以看出 NioSocketChannel 在接收网络数据的整个流程和我们在上篇文章《Netty如何高效接收网络连接》中介绍的 NioServerSocketChannel 在接收客户端连接时的流程在总体框架上是一样的。
NioSocketChannel 在接收网络数据的过程处理中,也是通过在一个do{....}while(...)
循环 read loop 中不断的循环读取连接 NioSocketChannel 上的数据。
同样在 NioSocketChannel 读取连接数据的 read loop 中也是受最大读取次数的限制。默认配置最多只能读取 16 次,超过 16 次无论此时 NioSocketChannel 中是否还有数据可读都不能在进行读取了。
这里 read loop 循环最大读取次数可在启动配置类 ServerBootstrap 中通过ChannelOption.MAX_MESSAGES_PER_READ
选项设置,默认为 16。
**Netty 这里为什么非得限制 read loop 的最大读取次数呢?**为什么不在 read loop 中一次性把数据读取完呢?
这时候就是考验我们大局观的时候了,在前边的文章介绍中我们提到 Netty 的 IO 模型为主从 Reactor 线程组模型,在 Sub Reactor Group 中包含了多个 Sub Reactor 专门用于监听处理客户端连接上的 IO 事件。
为了能够高效有序的处理全量客户端连接上的读写事件,Netty 将服务端承载的全量客户端连接分摊到多个 Sub Reactor 中处理,同时也能保证Channel上IO处理的线程安全性
。
其中一个 Channel 只能分配给一个固定的 Reactor。一个 Reactor 负责处理多个 Channel 上的 IO 就绪事件,Reactor 与 Channel 之间的对应关系如下图所示:
而一个 Sub Reactor 上注册了多个 NioSocketChannel,Netty 不可能在一个 NioSocketChannel 上无限制的处理下去,要将读取数据的机会均匀分摊给其他 NioSocketChannel,所以需要限定每个 NioSocketChannel 上的最大读取次数。
此外,Sub Reactor 除了需要监听处理所有注册在它上边的 NioSocketChannel 中的 IO 就绪事件之外,还需要腾出事件来处理有用户线程提交过来的异步任务。从这一点看,Netty 也不会一直停留在 NioSocketChannel 的 IO 处理上。所以限制 read loop 的最大读取次数是非常必要的。
关于 Reactor 的整体运转架构,对细节部分感兴趣的同学可以回看下笔者的《一文聊透Netty核心引擎Reactor的运转架构》这篇文章。
所以基于这个原因,我们需要在 read loop 循环中,每当通过doReadBytes
方法从 NioSocketChannel 中读取到数据时(方法返回值会大于 0,并记录在 allocHandle.lastBytesRead 中),都需要通过allocHandle.incMessagesRead(1)
方法统计已经读取的次数。当达到 16 次时不管 NioSocketChannel 是否还有数据可读,都需要在 read loop 末尾退出循环。转去执行 Sub Reactor 上的异步任务。以及其他 NioSocketChannel 上的 IO 就绪事件。平均分配,雨露均沾!!
本次 read loop 读取到的数据大小会记录在allocHandle.lastBytesRead
中
lastBytesRead < 0
:表示客户端主动发起了连接关闭流程,Netty 开始连接关闭处理流程。这个和本文的主旨无关,我们先不用管。后面笔者会专门用一篇文章来详解关闭流程。lastBytesRead = 0
:表示当前 NioSocketChannel 上的数据已经全部读取完毕,没有数据可读了。本次 OP_READ 事件圆满处理完毕,可以开开心心的退出 read loop。当
lastBytesRead > 0
:表示在本次 read loop 中从 NioSocketChannel 中读取到了数据,会在 NioSocketChannel 的 pipeline 中触发 ChannelRead 事件。进而在 pipeline 中负责 IO 处理的 ChannelHandelr 中响应,处理网络请求。
最后会在 read loop 循环的末尾调用allocHandle.continueReading()
判断是否结束本次 read loop 循环。这里的结束循环条件的判断会比我们在介绍 NioServerSocketChannel 接收连接时的判断条件复杂很多,笔者会将这个判断条件的详细解析放在文章后面细节部分为大家解读,这里大家只需要把握总体核心流程,不需要关注太多细节。
总体上在 NioSocketChannel 中读取网络数据的 read loop 循环结束条件需要满足以下几点:
当前 NioSocketChannel 中的数据已经全部读取完毕,则退出循环。
本轮 read loop 如果没有读到任何数据,则退出循环。
read loop 的读取次数达到 16 次,退出循环。
当满足这里的 read loop 退出条件之后,Sub Reactor 线程就会退出循环,随后会调用allocHandle.readComplete()
方法根据本轮 read loop 总共读取到的字节数totalBytesRead
来决定是否对用于接收下一轮 OP_READ 事件数据的 ByteBuffer 进行扩容或者缩容。
最后在 NioSocketChannel 的 pipeline 中触发ChannelReadComplete事件
,通知 ChannelHandler 本次 OP_READ 事件已经处理完毕。
2.1 ChannelRead 与 ChannelReadComplete 事件的区别
有些小伙伴可能对 Netty 中的一些传播事件触发的时机,或者事件之间的区别理解的不是很清楚,概念容易混淆。在后面的文章中笔者也会从源码的角度出发给大家说清楚 Netty 中定义的所有异步事件,以及这些事件之间的区别和联系和触发时机,传播机制。
这里我们主要探讨本文主题中涉及到的两个事件:ChannelRead 事件与 ChannelReadComplete 事件。
从上述介绍的 Netty 接收网络数据流程总览中我们可以看出ChannelRead事件
和ChannelReadComplete事件
是不一样的,但是对于刚接触 Netty 的小伙伴来说从命名上乍一看感觉又差不多。
下面我们来看这两个事件之间的差别:
Netty 服务端对于一次 OP_READ 事件的处理,会在一个do{}while()
循环 read loop 中分多次从客户端 NioSocketChannel 中读取网络数据。每次读取我们分配的 ByteBuffer 容量大小,初始容量为 2048。
ChanneRead事件
:一次循环读取一次数据,就触发一次ChannelRead事件
。本次最多读取在 read loop 循环开始分配的 DirectByteBuffer 容量大小。这个容量会动态调整,文章后续笔者会详细介绍。ChannelReadComplete事件
:当读取不到数据或者不满足continueReading
的任意一个条件就会退出 read loop,这时就会触发ChannelReadComplete事件
。表示本次OP_READ事件
处理完毕。
这里需要特别注意下触发
ChannelReadComplete事件
并不代表 NioSocketChannel 中的数据已经读取完了,只能说明本次OP_READ事件
处理完毕。因为有可能是客户端发送的数据太多,Netty 读了16次
还没读完,那就只能等到下次OP_READ事件
到来的时候在进行读取了。
以上内容就是 Netty 在接收客户端发送网络数据的全部核心逻辑。目前为止我们还未涉及到这部分的主干核心源码,笔者想的是先给大家把核心逻辑讲解清楚之后,这样理解起来核心主干源码会更加清晰透彻。
经过前边对网络数据接收的核心逻辑介绍,笔者在把这张流程图放出来,大家可以结合这张图在来回想下主干核心逻辑。
下面笔者会结合这张流程图,给大家把这部分的核心主干源码框架展现出来,大家可以将我们介绍过的核心逻辑与主干源码做个一一对应,还是那句老话,我们要从主干框架层面把握整体处理流程,不需要读懂每一行代码,文章后续笔者会将这个过程中涉及到的核心点位给大家拆开来各个击破!!
3. 源码核心框架总览
这里再次强调下当前执行线程为 Sub Reactor 线程,处理连接数据读取逻辑是在 NioSocketChannel 中。
首先通过config()
获取客户端 NioSocketChannel 的 Channel 配置类 NioSocketChannelConfig。
通过pipeline()
获取 NioSocketChannel 的 pipeline。我们在《详细图解Netty Reactor启动全流程》一文中提到的 Netty 服务端模板所举的示例中,NioSocketChannelde pipeline 中只有一个 EchoChannelHandler。
3.1 分配 DirectByteBuffer 接收网络数据
Sub Reactor 在接收 NioSocketChannel 上的 IO 数据时,都会分配一个 ByteBuffer 用来存放接收到的 IO 数据。
这里大家可能觉得比较奇怪,为什么在 NioSocketChannel 接收数据这里会有两个 ByteBuffer 分配器呢?一个是 ByteBufAllocator,另一个是 RecvByteBufAllocator。
这两个 ByteBuffer 又有什么区别和联系呢?
在上篇文章《抓到Netty一个Bug,顺带来透彻地聊一下Netty是如何高效接收网络连接》中,笔者为了阐述上篇文章中提到的 Netty 在接收网络连接时的 Bug 时,简单和大家介绍了下这个 RecvByteBufAllocator。
在上篇文章提到的 NioServerSocketChannelConfig 中,这里的 RecvByteBufAllocator 类型为 ServerChannelRecvByteBufAllocator。
还记得这个 ServerChannelRecvByteBufAllocator 类型在 4.1.69.final 版本引入是为了解决笔者在上篇文章中提到的那个 Bug 吗?在 4.1.69.final 版本之前,NioServerSocketChannelConfig 中的 RecvByteBufAllocator 类型为 AdaptiveRecvByteBufAllocator。
而在本文中 NioSocketChannelConfig 中的 RecvByteBufAllocator 类型为 AdaptiveRecvByteBufAllocator。
所以这里recvBufAllocHandle()
获得到的 RecvByteBufAllocator 为 AdaptiveRecvByteBufAllocator。顾名思义,这个类型的 RecvByteBufAllocator 可以根据 NioSocketChannel 上每次到来的 IO 数据大小来自适应动态调整 ByteBuffer 的容量。
对于客户端 NioSocketChannel 来说,它里边包含的 IO 数据时客户端发送来的网络数据,长度是不定的,所以才会需要这样一个可以根据每次 IO 数据的大小来自适应动态调整容量的 ByteBuffer 来接收。
如果我们把用于接收数据用的 ByteBuffer 看做一个桶的话,那么小数据用大桶装或者大数据用小桶装肯定是不合适的,所以我们需要根据接收数据的大小来动态调整桶的容量。而 AdaptiveRecvByteBufAllocator 的作用正是用来根据每次接收数据的容量大小来动态调整 ByteBuffer 的容量的。
现在 RecvByteBufAllocator 笔者为大家解释清楚了,接下来我们继续看 ByteBufAllocator。
大家这里需要注意的是 AdaptiveRecvByteBufAllocator 并不会真正的去分配 ByteBuffer,它只是负责动态调整分配 ByteBuffer 的大小。
而真正具体执行内存分配动作的是这里的 ByteBufAllocator 类型为 PooledByteBufAllocator。它会根据 AdaptiveRecvByteBufAllocator 动态调整出来的大小去真正的申请内存分配 ByteBuffer。
PooledByteBufAllocator 为 Netty 中的内存池,用来管理堆外内存 DirectByteBuffer。
AdaptiveRecvByteBufAllocator 中的 allocHandle 在上篇文章中我们也介绍过了,它的实际类型为 MaxMessageHandle。
在 MaxMessageHandle 中包含了用于动态调整 ByteBuffer 容量的统计指标。
在每轮 read loop 开始之前,都会调用allocHandle.reset(config)
重置清空上一轮 read loop 的统计指标。
在每次开始从 NioSocketChannel 中读取数据之前,需要利用PooledByteBufAllocator
在内存池中为 ByteBuffer 分配内存,默认初始化大小为2048
,这个容量由guess()方法
决定。
在每次通过doReadBytes
从 NioSocketChannel 中读取到数据后,都会调用allocHandle.lastBytesRead(doReadBytes(byteBuf))
记录本次读取了多少字节数据,并统计本轮 read loop 目前总共读取了多少字节。
每次循环从 NioSocketChannel 中读取数据之后,都会调用allocHandle.incMessagesRead(1)
。统计当前已经读取了多少次。如果超过了最大读取限制此时 16 次,就需要退出 read loop。去处理其他 NioSocketChannel 上的 IO 事件。
在每次 read loop 循环的末尾都需要通过调用allocHandle.continueReading()
来判断是否继续 read loop 循环读取 NioSocketChannel 中的数据。
attemptedBytesRead :
表示当前 ByteBuffer 预计尝试要写入的字节数。lastBytesRead :
表示本次 read loop 真实读取到了多少个字节。
defaultMaybeMoreSupplier
用于判断经过本次 read loop 读取数据后,ByteBuffer 是否满载而归。如果是满载而归的话(attemptedBytesRead == lastBytesRead),表明可能 NioSocketChannel 里还有数据。如果不是满载而归,表明 NioSocketChannel 里没有数据了已经。
是否继续进行 read loop 需要同时满足以下几个条件:
totalMessages < maxMessagePerRead
当前读取次数是否已经超过16次
,如果超过,就退出do(...)while()
循环。进行下一轮OP_READ事件
的轮询。因为每个 Sub Reactor 管理了多个 NioSocketChannel,不能在一个 NioSocketChannel 上占用太多时间,要将机会均匀地分配给 Sub Reactor 所管理的所有 NioSocketChannel。totalBytesRead > 0
本次OP_READ事件
处理是否读取到了数据,如果已经没有数据可读了,那么就直接退出 read loop。!respectMaybeMoreData || maybeMoreDataSupplier.get()
这个条件比较复杂,它其实就是通过respectMaybeMoreData
字段来控制 NioSocketChannel 中可能还有数据可读的情况下该如何处理。maybeMoreDataSupplier.get()
:true 表示本次读取从 NioSocketChannel 中读取数据,ByteBuffer 满载而归。说明可能 NioSocketChannel 中还有数据没读完。fasle 表示 ByteBuffer 还没有装满,说明 NioSocketChannel 中已经没有数据可读了。respectMaybeMoreData = true
表示要对可能还有更多数据进行处理的这种情况要respect
认真对待,如果本次循环读取到的数据已经装满ByteBuffer
,表示后面可能还有数据,那么就要进行读取。如果ByteBuffer
还没装满表示已经没有数据可读了那么就退出循环。respectMaybeMoreData = false
表示对可能还有更多数据的这种情况不认真对待not respect
。不管本次循环读取数据ByteBuffer
是否满载而归,都要继续进行读取,直到读取不到数据在退出循环,属于无脑读取。
同时满足以上三个条件,那么 read loop 继续进行。继续从 NioSocketChannel 中读取数据,直到读取不到或者不满足三个条件中的任意一个为止。
3.2 从 NioSocketChannel 中读取数据
这里会直接调用底层 JDK NIO 的SocketChannel#read
方法将数据读取到 DirectByteBuffer 中。读取数据大小为本次分配的 DirectByteBuffer 容量,初始为 2048。
4. ByteBuffer 动态自适应扩缩容机制
由于我们一开始并不知道客户端会发送多大的网络数据,所以这里先利用PooledByteBufAllocator
分配一个初始容量为2048
的 DirectByteBuffer 用于接收数据。
这就好比我们需要拿着一个桶去排队装水,但是第一次去装的时候,我们并不知道管理员会给我们分配多少水,桶拿大了也不合适拿小了也不合适,于是我们就先预估一个差不多容量大小的桶,如果分配的多了,我们下次就拿更大一点的桶,如果分配少了,下次我们就拿一个小点的桶。
在这种场景下,我们需要 ByteBuffer 可以自动根据每次网络数据的大小来动态自适应调整自己的容量。
而 ByteBuffer 动态自适应扩缩容机制依赖于 AdaptiveRecvByteBufAllocator 类的实现。让我们先回到 AdaptiveRecvByteBufAllocator 类的创建起点开始说起~~
4.1 AdaptiveRecvByteBufAllocator 的创建
在前文《Netty是如何高效接收网络连接》中我们提到,当 Main Reactor 监听到 OP_ACCPET 事件活跃后,会在 NioServerSocketChannel 中 accept 完成三次握手的客户端连接。并创建 NioSocketChannel,伴随着 NioSocketChannel 的创建其对应的配置类 NioSocketChannelConfig 类也会随之创建。
最终会在 NioSocketChannelConfig 的父类DefaultChannelConfig
的构造器中创建AdaptiveRecvByteBufAllocator
。并保存在RecvByteBufAllocator rcvBufAllocator
字段中。
在new AdaptiveRecvByteBufAllocator()
创建 AdaptiveRecvByteBufAllocator 类实例的时候会先触发 AdaptiveRecvByteBufAllocator 类的初始化。
我们先来看下 AdaptiveRecvByteBufAllocator 类的初始化都做了些什么事情:
4.2 AdaptiveRecvByteBufAllocator 类的初始化
AdaptiveRecvByteBufAllocator 主要的作用就是为接收数据的ByteBuffer
进行扩容缩容,那么每次怎么扩容?扩容多少?怎么缩容?缩容多少呢??
这四个问题将是本小节笔者要为大家解答的内容~~~
Netty 中定义了一个int型
的数组SIZE_TABLE
来存储每个扩容单位对应的容量大小。建立起扩缩容的容量索引表。每次扩容多少,缩容多少全部记录在这个容量索引表中。
在 AdaptiveRecvByteBufAllocatorl 类初始化的时候会在static{}
静态代码块中对扩缩容索引表SIZE_TABLE
进行初始化。
从源码中我们可以看出SIZE_TABLE
的初始化分为两个部分:
当索引容量小于
512
时,SIZE_TABLE
中定义的容量索引是从16开始
按16
递增。
当索引容量大于
512
时,SIZE_TABLE
中定义的容量索引是按前一个索引容量的 2 倍递增。
4.3 扩缩容逻辑
现在扩缩容索引表SIZE_TABLE
已经初始化完毕了,那么当我们需要对ByteBuffer
进行扩容或者缩容的时候如何根据SIZE_TABLE
决定扩容多少或者缩容多少呢??
这就用到了在 AdaptiveRecvByteBufAllocator 类中定义的扩容步长INDEX_INCREMENT = 4
,缩容步长INDEX_DECREMENT = 1
了。
我们就以上面两副扩缩容容量索引表SIZE_TABLE
中的容量索引展示截图为例,来介绍下扩缩容逻辑,假设我们当前ByteBuffer
的容量索引为33
,对应的容量为2048
。
4.3.1 扩容
当对容量为2048
的 ByteBuffer 进行扩容时,根据当前的容量索引index = 33
加上 扩容步长INDEX_INCREMENT = 4
计算出扩容后的容量索引为37
,那么扩缩容索引表SIZE_TABLE
下标37
对应的容量就是本次 ByteBuffer 扩容后的容量SIZE_TABLE[37] = 32768
4.3.1 缩容
同理对容量为2048
的 ByteBuffer 进行缩容时,我们就需要用当前容量索引index = 33
减去 缩容步长INDEX_DECREMENT = 1
计算出缩容后的容量索引32
,那么扩缩容索引表SIZE_TABLE
下标32
对应的容量就是本次 ByteBuffer 缩容后的容量SIZE_TABLE[32] = 1024
4.4 扩缩容时机
在每轮 read loop 结束之后,我们都会调用allocHandle.readComplete()
来根据在 allocHandle 中统计的在本轮 read loop 中读取字节总大小,来决定在下一轮 read loop 中是否对 DirectByteBuffer 进行扩容或者缩容。
我们以当前 ByteBuffer 容量为2048
,容量索引index = 33
为例,对allocHandle
的扩容缩容规则进行说明。
扩容步长
INDEX_INCREMENT = 4
,缩容步长INDEX_DECREMENT = 1
。
4.4.1 缩容
如果本次
OP_READ事件
实际读取到的总字节数actualReadBytes
在 SIZE_TABLE[index - INDEX_DECREMENT]与 SIZE_TABLE[index]之间的话,也就是如果本轮 read loop 结束之后总共读取的字节数在[1024,2048]
之间。说明此时分配的ByteBuffer
容量正好,不需要进行缩容也不需要进行扩容。比如本次actualReadBytes = 2000
,正好处在1024
与2048
之间。说明2048
的容量正好。如果
actualReadBytes
小于等于 SIZE_TABLE[index - INDEX_DECREMENT],也就是如果本轮 read loop 结束之后总共读取的字节数小于等于1024
。表示本次读取到的字节数比当前 ByteBuffer 容量的下一级容量还要小,说明当前 ByteBuffer 的容量分配的有些大了,设置缩容标识decreaseNow = true
。当下次OP_READ事件
继续满足缩容条件的时候,开始真正的进行缩容。缩容后的容量为 SIZE_TABLE[index - INDEX_DECREMENT],但不能小于 SIZE_TABLE[minIndex]。
注意需要满足两次缩容条件才会进行缩容,且缩容步长为 1,缩容比较谨慎
4.4.2 扩容
如果本次OP_READ事件
处理总共读取的字节数actualReadBytes
大于等于 当前 ByteBuffer 容量(nextReceiveBufferSize)时,说明 ByteBuffer 分配的容量有点小了,需要进行扩容。扩容后的容量为 SIZE_TABLE[index + INDEX_INCREMENT],但不能超过 SIZE_TABLE[maxIndex]。
满足一次扩容条件就进行扩容,并且扩容步长为 4, 扩容比较奔放
4.5 AdaptiveRecvByteBufAllocator 类的实例化
AdaptiveRecvByteBufAllocator 类的实例化主要是确定 ByteBuffer 的初始容量,以及最小容量和最大容量在扩缩容索引表SIZE_TABLE
中的下标:minIndex
和maxIndex
。
AdaptiveRecvByteBufAllocator 定义了三个关于 ByteBuffer 容量的字段:
DEFAULT_MINIMUM
:表示 ByteBuffer 最小的容量,默认为64
,也就是无论 ByteBuffer 在怎么缩容,容量也不会低于64
。DEFAULT_INITIAL
:表示 ByteBuffer 的初始化容量。默认为2048
。DEFAULT_MAXIMUM
:表示 ByteBuffer 的最大容量,默认为65536
,也就是无论 ByteBuffer 在怎么扩容,容量也不会超过65536
。
接下来的事情就是确定最小容量 DEFAULT_MINIMUM 在 SIZE_TABLE 中的下标minIndex
,以及最大容量 DEFAULT_MAXIMUM 在 SIZE_TABLE 中的下标maxIndex
。
从 AdaptiveRecvByteBufAllocator 类初始化的过程中,我们可以看出 SIZE_TABLE 中存储的数据特征是一个有序的集合。
我们可以通过二分查找在 SIZE_TABLE 中找出第一个
容量大于等于 DEFAULT_MINIMUM 的容量索引minIndex
。
同理通过二分查找在 SIZE_TABLE 中找出最后一个
容量小于等于 DEFAULT_MAXIMUM 的容量索引maxIndex
。
根据上一小节关于SIZE_TABLE
中容量数据分布的截图,我们可以看出minIndex = 3
,maxIndex = 38
4.5.1 二分查找容量索引下标
经常刷 LeetCode 的小伙伴肯定一眼就看出这个是二分查找的模板了。
它的目的就是根据给定容量,在扩缩容索引表SIZE_TABLE
中,通过二分查找找到最贴近
给定 size 的容量的索引下标(第一个大于等于 size 的容量)
4.6 RecvByteBufAllocator.Handle
前边我们提到最终动态调整 ByteBuffer 容量的是由 AdaptiveRecvByteBufAllocator 中的Handler
负责的,我们来看下这个allocHandle
的创建过程。
从 allocHandle 的获取过程我们看到最 allocHandle 的创建是由AdaptiveRecvByteBufAllocator#newHandle
方法执行的。
这里我们看到 Netty 中用于动态调整 ByteBuffer 容量的allocHandle
的实际类型为MaxMessageHandle
。
下面我们来介绍下HandleImpl
中的核心字段,它们都和 ByteBuffer 的容量有关:
minIndex
:最小容量在扩缩容索引表SIZE_TABE
中的 index。默认是3
。maxIndex
:最大容量在扩缩容索引表SIZE_TABE
中的 index。默认是38
。index
:当前容量在扩缩容索引表SIZE_TABE
中的 index。初始是33
。nextReceiveBufferSize
:预计下一次分配 buffer 的容量,初始为2048
。在每次申请内存分配 ByteBuffer 的时候,采用nextReceiveBufferSize
的值指定容量。decreaseNow :
是否需要进行缩容。
5. 使用堆外内存为 ByteBuffer 分配内存
AdaptiveRecvByteBufAllocator 类只是负责动态调整 ByteBuffer 的容量,而具体为 ByteBuffer 申请内存空间的是由PooledByteBufAllocator
负责。
5.1 类名前缀 Pooled 的来历
在我们使用 Java 进行日常开发过程中,在为对象分配内存空间的时候我们都会选择在 JVM 堆中为对象分配内存,这样做对我们 Java 开发者特别的友好,我们只管使用就好而不必过多关心这块申请的内存如何回收,因为 JVM 堆完全受 Java 虚拟机控制管理,Java 虚拟机会帮助我们回收不再使用的内存。
但是 JVM 在进行垃圾回收时候的stop the world
会对我们应用程序的性能造成一定的影响。
除此之外我们在《聊聊Netty那些事儿之从内核角度看IO模型》一文中介绍 IO 模型的时候提到,当数据达到网卡时,网卡会通过 DMA 的方式将数据拷贝到内核空间中,这是第一次拷贝
。当用户线程在用户空间发起系统 IO 调用时,CPU 会将内核空间的数据再次拷贝到用户空间。这是第二次拷贝
。
于此不同的是当我们在 JVM 中发起 IO 调用时,比如我们使用 JVM 堆内存读取Socket接收缓冲区
中的数据时,会多一次内存拷贝,CPU 在第二次拷贝
中将数据从内核空间拷贝到用户空间时,此时的用户空间站在 JVM 角度是堆外内存
,所以还需要将堆外内存中的数据拷贝到堆内内存
中。这就是第三次内存拷贝
。
同理当我们在 JVM 中发起 IO 调用向Socket发送缓冲区
写入数据时,JVM 会将 IO 数据先拷贝
到堆外内存
,然后才能发起系统 IO 调用。
那为什么操作系统不直接使用 JVM 的堆内内存
进行IO操作
呢?
因为 JVM 的内存布局和操作系统分配的内存是不一样的,操作系统不可能按照 JVM 规范来读写数据,所以就需要第三次拷贝
中间做个转换将堆外内存中的数据拷贝到 JVM 堆中。
所以基于上述内容,在使用 JVM 堆内内存时会产生以下两点性能影响:
JVM 在垃圾回收堆内内存时,会发生
stop the world
导致应用程序卡顿。在进行 IO 操作的时候,会多产生一次由堆外内存到堆内内存的拷贝。
基于以上两点使用JVM堆内内存
对性能造成的影响,于是对性能有卓越追求的 Netty 采用堆外内存
也就是DirectBuffer
来为 ByteBuffer 分配内存空间。
采用堆外内存为 ByteBuffer 分配内存的好处就是:
堆外内存直接受操作系统的管理,不会受 JVM 的管理,所以 JVM 垃圾回收对应用程序的性能影响就没有了。
网络数据到达之后直接在
堆外内存
上接收,进程读取网络数据时直接在堆外内存中读取,所以就避免了第三次内存拷贝
。
所以 Netty 在进行 I/O 操作时都是使用的堆外内存,可以避免数据从 JVM 堆内存到堆外内存的拷贝。但是由于堆外内存不受 JVM 的管理,所以就需要额外关注对内存的使用和释放,稍有不慎就会造成内存泄露,于是 Netty 就引入了内存池对堆外内存
进行统一管理。
PooledByteBufAllocator 类的这个前缀Pooled
就是内存池
的意思,这个类会使用 Netty 的内存池为 ByteBuffer 分配堆外内存
。
5.2 PooledByteBufAllocator 的创建
创建时机
在服务端 NioServerSocketChannel 的配置类 NioServerSocketChannelConfig 以及客户端 NioSocketChannel 的配置类 NioSocketChannelConfig 实例化的时候会触发 PooledByteBufAllocator 的创建。
创建出来的 PooledByteBufAllocator 实例保存在DefaultChannelConfig类
中的ByteBufAllocator allocator
字段中。
创建过程
从 ByteBufUtil 类的初始化过程我们可以看出,在为 ByteBuffer 分配内存的时候是否使用内存池在 Netty 中是可以配置的。
通过系统变量
-D io.netty.allocator.type
可以配置是否使用内存池为 ByteBuffer 分配内存。默认情况下是需要使用内存池的。但是在安卓系统中默认是不使用内存池的。通过
PooledByteBufAllocator.DEFAULT
获取内存池 ByteBuffer 分配器。
由于本文的主线是介绍 Sub Reactor 处理
OP_READ事件
的完整过程,所以这里只介绍主线相关的内容,这里只是简单介绍下在接收数据的时候为什么会用PooledByteBufAllocator
来为ByteBuffer
分配内存。而内存池的架构设计比较复杂,所以笔者后面会单独写一篇关于 Netty 内存管理的文章。
总结
本文介绍了 Sub Reactor 线程在处理 OP_READ 事件的整个过程。并深入剖析了 AdaptiveRecvByteBufAllocator 类动态调整 ByteBuffer 容量的原理。
同时也介绍了 Netty 为什么会使用堆外内存来为 ByteBuffer 分配内存,并由此引出了 Netty 的内存池分配器 PooledByteBufAllocator 。
在介绍 AdaptiveRecvByteBufAllocator 类和 PooledByteBufAllocator 一起组合实现动态地为 ByteBuffer 分配容量的时候,笔者不禁想起了多年前看过的《Effective Java》中第 16 条 复合优先于继承
。
Netty 在这里也遵循了这条军规,首先两个类设计的都是单一的功能。
AdaptiveRecvByteBufAllocator 类只负责动态的调整 ByteBuffer 容量,并不管具体的内存分配。
PooledByteBufAllocator 类负责具体的内存分配,用内存池的方式。
这样设计的就比较灵活,具体内存分配的工作交给具体的ByteBufAllocator
,可以使用内存池的分配方式PooledByteBufAllocator
,也可以不使用内存池的分配方式UnpooledByteBufAllocator
。具体的内存可以采用 JVM 堆内内存(HeapBuffer),也可以使用堆外内存(DirectBuffer)。
而AdaptiveRecvByteBufAllocator
只需要关注调整它们的容量工作就可以了,而并不需要关注它们具体的内存分配方式。
最后通过io.netty.channel.RecvByteBufAllocator.Handle#allocate
方法灵活组合不同的内存分配方式。这也是装饰模式
的一种应用。
好了,今天的内容就到这里,我们下篇文章见~~~~
版权声明: 本文为 InfoQ 作者【bin的技术小屋】的原创文章。
原文链接:【http://xie.infoq.cn/article/5f5d62546d5c3124281a4be29】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论