写点什么

Netty 如何高效接收网络数据?一文聊透 ByteBuffer 动态自适应扩缩容机制

  • 2022 年 2 月 23 日
  • 本文字数:20321 字

    阅读完需:约 67 分钟

Netty如何高效接收网络数据?一文聊透ByteBuffer动态自适应扩缩容机制

本系列 Netty 源码解析文章基于 4.1.56.Final 版本


前文回顾

在前边的系列文章中,我们从内核如何收发网络数据开始以一个 C10K 的问题作为主线详细从内核角度阐述了网络 IO 模型的演变,最终在此基础上引出了 Netty 的网络 IO 模型如下图所示:



详细内容可回看《从内核角度看IO模型的演变》


后续我们又围绕着 Netty 的主从 Reactor 网络 IO 线程模型,在《Reactor模型在Netty中的实现》一文中详细阐述了 Netty 的主从 Reactor 模型的创建,以及介绍了 Reactor 模型的关键组件。搭建了 Netty 的核心骨架如下图所示:



在核心骨架搭建完毕之后,我们随后又在《详细图解Reactor启动全流程》一文中阐述了 Reactor 启动的全流程,一个非常重要的核心组件 NioServerSocketChannel 开始在这里初次亮相,承担着一个网络框架最重要的任务--高效接收网络连接。我们介绍了 NioServerSocketChannel 的创建,初始化,向 Main Reactor 注册并监听 OP_ACCEPT 事件的整个流程。在此基础上,Netty 得以整装待发,枕戈待旦开始迎接海量的客户端连接。



随后紧接着我们在《Netty如何高效接收网络连接》一文中详细介绍了 Netty 高效接收客户端网络连接的全流程,在这里 Netty 的核心重要组件 NioServerSocketChannel 开始正是登场,在 NioServerSocketChannel 中我们创建了客户端连接 NioSocketChannel,并详细介绍了 NioSocketChannel 的初始化过程,随后通过在 NioServerSocketChannel 的 pipeline 中触发 ChannelRead 事件,并最终在 ServerBootstrapAcceptor 中将客户端连接 NioSocketChannel 注册到 Sub Reactor 中开始监听客户端连接上的 OP_READ 事件,准备接收客户端发送的网络数据也就是本文的主题内容。



自此 Netty 的核心组件全部就绪并启动完毕,开始起飞~~~



之前文章中的主角是 Netty 中主 Reactor 组中的 Main Reactor 以及注册在 Main Reactor 上边的 NioServerSocketChannel,那么从本文开始,我们文章中的主角就切换为 Sub Reactor 以及注册在 SubReactor 上的 NioSocketChannel 了。


下面就让我们正式进入今天的主题,看一下 Netty 是如何处理 OP_READ 事件以及如何高效接收网络数据的。

1. Sub Reactor 处理 OP_READ 事件流程总览



客户端发起系统 IO 调用向服务端发送数据之后,当网络数据到达服务端的网卡并经过内核协议栈的处理,最终数据到达 Socket 的接收缓冲区之后,Sub Reactor 轮询到 NioSocketChannel 上的OP_READ事件就绪,随后 Sub Reactor 线程就会从 JDK Selector 上的阻塞轮询 APIselector.select(timeoutMillis)调用中返回。转而去处理 NioSocketChannel 上的OP_READ事件


注意这里的 Reactor 为负责处理客户端连接的 Sub Reactor。连接的类型为 NioSocketChannel,处理的事件为 OP_READ 事件。


在之前的文章中笔者已经多次强调过了,Reactor 在处理 Channel 上的 IO 事件入口函数为NioEventLoop#processSelectedKey


public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ..............省略.................
try { int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { ..............处理OP_CONNECT事件................. }

if ((readyOps & SelectionKey.OP_WRITE) != 0) { ..............处理OP_WRITE事件................. }

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //本文重点处理OP_ACCEPT事件 unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
}
复制代码


这里需要重点强调的是,当前的执行线程现在已经变成了 Sub Reactor,而 Sub Reactor 上注册的正是 netty 客户端 NioSocketChannel 负责处理连接上的读写事件。


所以这里入口函数的参数AbstractNioChannel ch则是 IO 就绪的客户端连接NioSocketChannel


开头通过ch.unsafe()获取到的 NioUnsafe 操作类正是 NioSocketChannel 中对底层 JDK NIO SocketChannel 的 Unsafe 底层操作类。实现类型为NioByteUnsafe定义在下图继承结构中的AbstractNioByteChannel父类中。



下面我们到NioByteUnsafe#read方法中来看下 Netty 对OP_READ事件的具体处理过程:

2. Netty 接收网络数据流程总览

我们直接按照老规矩,先从整体上把整个 OP_READ 事件的逻辑处理框架提取出来,让大家先总体俯视下流程全貌,然后在针对每个核心点位进行各个击破。



流程中相关置灰的步骤为 Netty 处理连接关闭时的逻辑,和本文主旨无关,我们这里暂时忽略,等后续笔者介绍连接关闭时,会单独开一篇文章详细为大家介绍。


从上面这张 Netty 接收网络数据总体流程图可以看出 NioSocketChannel 在接收网络数据的整个流程和我们在上篇文章《Netty如何高效接收网络连接》中介绍的 NioServerSocketChannel 在接收客户端连接时的流程在总体框架上是一样的。


NioSocketChannel 在接收网络数据的过程处理中,也是通过在一个do{....}while(...)循环 read loop 中不断的循环读取连接 NioSocketChannel 上的数据。


同样在 NioSocketChannel 读取连接数据的 read loop 中也是受最大读取次数的限制。默认配置最多只能读取 16 次,超过 16 次无论此时 NioSocketChannel 中是否还有数据可读都不能在进行读取了。


这里 read loop 循环最大读取次数可在启动配置类 ServerBootstrap 中通过ChannelOption.MAX_MESSAGES_PER_READ选项设置,默认为 16。


ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)  .channel(NioServerSocketChannel.class)  .option(ChannelOption.MAX_MESSAGES_PER_READ, 自定义次数)
复制代码


**Netty 这里为什么非得限制 read loop 的最大读取次数呢?**为什么不在 read loop 中一次性把数据读取完呢?


这时候就是考验我们大局观的时候了,在前边的文章介绍中我们提到 Netty 的 IO 模型为主从 Reactor 线程组模型,在 Sub Reactor Group 中包含了多个 Sub Reactor 专门用于监听处理客户端连接上的 IO 事件。


为了能够高效有序的处理全量客户端连接上的读写事件,Netty 将服务端承载的全量客户端连接分摊到多个 Sub Reactor 中处理,同时也能保证Channel上IO处理的线程安全性


其中一个 Channel 只能分配给一个固定的 Reactor。一个 Reactor 负责处理多个 Channel 上的 IO 就绪事件,Reactor 与 Channel 之间的对应关系如下图所示:



而一个 Sub Reactor 上注册了多个 NioSocketChannel,Netty 不可能在一个 NioSocketChannel 上无限制的处理下去,要将读取数据的机会均匀分摊给其他 NioSocketChannel,所以需要限定每个 NioSocketChannel 上的最大读取次数。


此外,Sub Reactor 除了需要监听处理所有注册在它上边的 NioSocketChannel 中的 IO 就绪事件之外,还需要腾出事件来处理有用户线程提交过来的异步任务。从这一点看,Netty 也不会一直停留在 NioSocketChannel 的 IO 处理上。所以限制 read loop 的最大读取次数是非常必要的。


关于 Reactor 的整体运转架构,对细节部分感兴趣的同学可以回看下笔者的《一文聊透Netty核心引擎Reactor的运转架构》这篇文章。


所以基于这个原因,我们需要在 read loop 循环中,每当通过doReadBytes方法从 NioSocketChannel 中读取到数据时(方法返回值会大于 0,并记录在 allocHandle.lastBytesRead 中),都需要通过allocHandle.incMessagesRead(1)方法统计已经读取的次数。当达到 16 次时不管 NioSocketChannel 是否还有数据可读,都需要在 read loop 末尾退出循环。转去执行 Sub Reactor 上的异步任务。以及其他 NioSocketChannel 上的 IO 就绪事件。平均分配,雨露均沾!!


public abstract class MaxMessageHandle implements ExtendedHandle {
//read loop总共读取了多少次 private int totalMessages;
@Override public final void incMessagesRead(int amt) { totalMessages += amt; }
}
复制代码


本次 read loop 读取到的数据大小会记录在allocHandle.lastBytesRead


public abstract class MaxMessageHandle implements ExtendedHandle {
//本次read loop读取到的字节数 private int lastBytesRead; //整个read loop循环总共读取的字节数 private int totalBytesRead;
@Override public void lastBytesRead(int bytes) { lastBytesRead = bytes; if (bytes > 0) { totalBytesRead += bytes; } }}
复制代码


  • lastBytesRead < 0:表示客户端主动发起了连接关闭流程,Netty 开始连接关闭处理流程。这个和本文的主旨无关,我们先不用管。后面笔者会专门用一篇文章来详解关闭流程。

  • lastBytesRead = 0:表示当前 NioSocketChannel 上的数据已经全部读取完毕,没有数据可读了。本次 OP_READ 事件圆满处理完毕,可以开开心心的退出 read loop。

  • lastBytesRead > 0:表示在本次 read loop 中从 NioSocketChannel 中读取到了数据,会在 NioSocketChannel 的 pipeline 中触发 ChannelRead 事件。进而在 pipeline 中负责 IO 处理的 ChannelHandelr 中响应,处理网络请求。



public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { .......处理网络请求,比如解码,反序列化等操作....... }}
复制代码


最后会在 read loop 循环的末尾调用allocHandle.continueReading()判断是否结束本次 read loop 循环。这里的结束循环条件的判断会比我们在介绍 NioServerSocketChannel 接收连接时的判断条件复杂很多,笔者会将这个判断条件的详细解析放在文章后面细节部分为大家解读,这里大家只需要把握总体核心流程,不需要关注太多细节。


总体上在 NioSocketChannel 中读取网络数据的 read loop 循环结束条件需要满足以下几点:


  • 当前 NioSocketChannel 中的数据已经全部读取完毕,则退出循环。

  • 本轮 read loop 如果没有读到任何数据,则退出循环。

  • read loop 的读取次数达到 16 次,退出循环。


当满足这里的 read loop 退出条件之后,Sub Reactor 线程就会退出循环,随后会调用allocHandle.readComplete()方法根据本轮 read loop 总共读取到的字节数totalBytesRead来决定是否对用于接收下一轮 OP_READ 事件数据的 ByteBuffer 进行扩容或者缩容。


最后在 NioSocketChannel 的 pipeline 中触发ChannelReadComplete事件,通知 ChannelHandler 本次 OP_READ 事件已经处理完毕。




public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { .......处理网络请求,比如解码,反序列化等操作....... }
@Override public void channelReadComplete(ChannelHandlerContext ctx) { ......本次OP_READ事件处理完毕....... ......决定是否向客户端响应处理结果...... }}
复制代码

2.1 ChannelRead 与 ChannelReadComplete 事件的区别

有些小伙伴可能对 Netty 中的一些传播事件触发的时机,或者事件之间的区别理解的不是很清楚,概念容易混淆。在后面的文章中笔者也会从源码的角度出发给大家说清楚 Netty 中定义的所有异步事件,以及这些事件之间的区别和联系和触发时机,传播机制。


这里我们主要探讨本文主题中涉及到的两个事件:ChannelRead 事件与 ChannelReadComplete 事件。


从上述介绍的 Netty 接收网络数据流程总览中我们可以看出ChannelRead事件ChannelReadComplete事件是不一样的,但是对于刚接触 Netty 的小伙伴来说从命名上乍一看感觉又差不多。


下面我们来看这两个事件之间的差别:


Netty 服务端对于一次 OP_READ 事件的处理,会在一个do{}while()循环 read loop 中分多次从客户端 NioSocketChannel 中读取网络数据。每次读取我们分配的 ByteBuffer 容量大小,初始容量为 2048。


  • ChanneRead事件:一次循环读取一次数据,就触发一次ChannelRead事件。本次最多读取在 read loop 循环开始分配的 DirectByteBuffer 容量大小。这个容量会动态调整,文章后续笔者会详细介绍。

  • ChannelReadComplete事件:当读取不到数据或者不满足continueReading 的任意一个条件就会退出 read loop,这时就会触发ChannelReadComplete事件。表示本次OP_READ事件处理完毕。


这里需要特别注意下触发ChannelReadComplete事件并不代表 NioSocketChannel 中的数据已经读取完了,只能说明本次OP_READ事件处理完毕。因为有可能是客户端发送的数据太多,Netty 读了16次还没读完,那就只能等到下次OP_READ事件到来的时候在进行读取了。




以上内容就是 Netty 在接收客户端发送网络数据的全部核心逻辑。目前为止我们还未涉及到这部分的主干核心源码,笔者想的是先给大家把核心逻辑讲解清楚之后,这样理解起来核心主干源码会更加清晰透彻。


经过前边对网络数据接收的核心逻辑介绍,笔者在把这张流程图放出来,大家可以结合这张图在来回想下主干核心逻辑。



下面笔者会结合这张流程图,给大家把这部分的核心主干源码框架展现出来,大家可以将我们介绍过的核心逻辑与主干源码做个一一对应,还是那句老话,我们要从主干框架层面把握整体处理流程,不需要读懂每一行代码,文章后续笔者会将这个过程中涉及到的核心点位给大家拆开来各个击破!!


3. 源码核心框架总览

        @Override        public final void read() {            final ChannelConfig config = config();
...............处理半关闭相关代码省略............... //获取NioSocketChannel的pipeline final ChannelPipeline pipeline = pipeline(); //PooledByteBufAllocator 具体用于实际分配ByteBuf的分配器 final ByteBufAllocator allocator = config.getAllocator(); //自适应ByteBuf分配器 AdaptiveRecvByteBufAllocator ,用于动态调节ByteBuf容量 //需要与具体的ByteBuf分配器配合使用 比如这里的PooledByteBufAllocator final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); //allocHandler用于统计每次读取数据的大小,方便下次分配合适大小的ByteBuf //重置清除上次的统计指标 allocHandle.reset(config);
ByteBuf byteBuf = null; boolean close = false; try { do { //利用PooledByteBufAllocator分配合适大小的byteBuf 初始大小为2048 byteBuf = allocHandle.allocate(allocator); //记录本次读取了多少字节数 allocHandle.lastBytesRead(doReadBytes(byteBuf)); //如果本次没有读取到任何字节,则退出循环 进行下一轮事件轮询 if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { ......表示客户端发起连接关闭..... } break; }
//read loop读取数据次数+1 allocHandle.incMessagesRead(1); //客户端NioSocketChannel的pipeline中触发ChannelRead事件 pipeline.fireChannelRead(byteBuf); //解除本次读取数据分配的ByteBuffer引用,方便下一轮read loop分配 byteBuf = null; } while (allocHandle.continueReading());//判断是否应该继续read loop
//根据本次read loop总共读取的字节数,决定下次是否扩容或者缩容 allocHandle.readComplete(); //在NioSocketChannel的pipeline中触发ChannelReadComplete事件,表示一次read事件处理完毕 //但这并不表示 客户端发送来的数据已经全部读完,因为如果数据太多的话,这里只会读取16次,剩下的会等到下次read事件到来后在处理 pipeline.fireChannelReadComplete();
.........省略连接关闭流程处理......... } catch (Throwable t) { ...............省略............... } finally { ...............省略............... } } }
复制代码


这里再次强调下当前执行线程为 Sub Reactor 线程,处理连接数据读取逻辑是在 NioSocketChannel 中。


首先通过config()获取客户端 NioSocketChannel 的 Channel 配置类 NioSocketChannelConfig。


通过pipeline()获取 NioSocketChannel 的 pipeline。我们在《详细图解Netty Reactor启动全流程》一文中提到的 Netty 服务端模板所举的示例中,NioSocketChannelde pipeline 中只有一个 EchoChannelHandler。


3.1 分配 DirectByteBuffer 接收网络数据

Sub Reactor 在接收 NioSocketChannel 上的 IO 数据时,都会分配一个 ByteBuffer 用来存放接收到的 IO 数据。


这里大家可能觉得比较奇怪,为什么在 NioSocketChannel 接收数据这里会有两个 ByteBuffer 分配器呢?一个是 ByteBufAllocator,另一个是 RecvByteBufAllocator。


    final ByteBufAllocator allocator = config.getAllocator();    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
复制代码


这两个 ByteBuffer 又有什么区别和联系呢?


在上篇文章《抓到Netty一个Bug,顺带来透彻地聊一下Netty是如何高效接收网络连接》中,笔者为了阐述上篇文章中提到的 Netty 在接收网络连接时的 Bug 时,简单和大家介绍了下这个 RecvByteBufAllocator。


在上篇文章提到的 NioServerSocketChannelConfig 中,这里的 RecvByteBufAllocator 类型为 ServerChannelRecvByteBufAllocator。



还记得这个 ServerChannelRecvByteBufAllocator 类型在 4.1.69.final 版本引入是为了解决笔者在上篇文章中提到的那个 Bug 吗?在 4.1.69.final 版本之前,NioServerSocketChannelConfig 中的 RecvByteBufAllocator 类型为 AdaptiveRecvByteBufAllocator。


而在本文中 NioSocketChannelConfig 中的 RecvByteBufAllocator 类型为 AdaptiveRecvByteBufAllocator。



所以这里recvBufAllocHandle()获得到的 RecvByteBufAllocator 为 AdaptiveRecvByteBufAllocator。顾名思义,这个类型的 RecvByteBufAllocator 可以根据 NioSocketChannel 上每次到来的 IO 数据大小来自适应动态调整 ByteBuffer 的容量。


对于客户端 NioSocketChannel 来说,它里边包含的 IO 数据时客户端发送来的网络数据,长度是不定的,所以才会需要这样一个可以根据每次 IO 数据的大小来自适应动态调整容量的 ByteBuffer 来接收。


如果我们把用于接收数据用的 ByteBuffer 看做一个桶的话,那么小数据用大桶装或者大数据用小桶装肯定是不合适的,所以我们需要根据接收数据的大小来动态调整桶的容量。而 AdaptiveRecvByteBufAllocator 的作用正是用来根据每次接收数据的容量大小来动态调整 ByteBuffer 的容量的。


现在 RecvByteBufAllocator 笔者为大家解释清楚了,接下来我们继续看 ByteBufAllocator。


大家这里需要注意的是 AdaptiveRecvByteBufAllocator 并不会真正的去分配 ByteBuffer,它只是负责动态调整分配 ByteBuffer 的大小。


而真正具体执行内存分配动作的是这里的 ByteBufAllocator 类型为 PooledByteBufAllocator。它会根据 AdaptiveRecvByteBufAllocator 动态调整出来的大小去真正的申请内存分配 ByteBuffer。


PooledByteBufAllocator 为 Netty 中的内存池,用来管理堆外内存 DirectByteBuffer。


AdaptiveRecvByteBufAllocator 中的 allocHandle 在上篇文章中我们也介绍过了,它的实际类型为 MaxMessageHandle。


public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
@Override public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); } private final class HandleImpl extends MaxMessageHandle { .................省略................ }}
复制代码


在 MaxMessageHandle 中包含了用于动态调整 ByteBuffer 容量的统计指标。


   public abstract class MaxMessageHandle implements ExtendedHandle {        private ChannelConfig config;
//用于控制每次read loop里最大可以循环读取的次数,默认为16次 //可在启动配置类ServerBootstrap中通过ChannelOption.MAX_MESSAGES_PER_READ选项设置。 private int maxMessagePerRead;
//用于统计read loop中总共接收的连接个数,NioSocketChannel中表示读取数据的次数 //每次read loop循环后会调用allocHandle.incMessagesRead增加记录接收到的连接个数 private int totalMessages;
//用于统计在read loop中总共接收到客户端连接上的数据大小 private int totalBytesRead;
//表示本次read loop 尝试读取多少字节,byteBuffer剩余可写的字节数 private int attemptedBytesRead;
//本次read loop读取到的字节数 private int lastBytesRead; //预计下一次分配buffer的容量,初始:2048 private int nextReceiveBufferSize; ...........省略.............}
复制代码


在每轮 read loop 开始之前,都会调用allocHandle.reset(config)重置清空上一轮 read loop 的统计指标。


        @Override        public void reset(ChannelConfig config) {            this.config = config;            //默认每次最多读取16次            maxMessagePerRead = maxMessagesPerRead();            totalMessages = totalBytesRead = 0;        }
复制代码


在每次开始从 NioSocketChannel 中读取数据之前,需要利用PooledByteBufAllocator在内存池中为 ByteBuffer 分配内存,默认初始化大小为2048,这个容量由guess()方法决定。


        byteBuf = allocHandle.allocate(allocator);
复制代码


        @Override        public ByteBuf allocate(ByteBufAllocator alloc) {            return alloc.ioBuffer(guess());        }
@Override public int guess() { //预计下一次分配buffer的容量,一开始为2048 return nextReceiveBufferSize; }
复制代码


在每次通过doReadBytes从 NioSocketChannel 中读取到数据后,都会调用allocHandle.lastBytesRead(doReadBytes(byteBuf))记录本次读取了多少字节数据,并统计本轮 read loop 目前总共读取了多少字节。


        @Override        public void lastBytesRead(int bytes) {            lastBytesRead = bytes;            if (bytes > 0) {                totalBytesRead += bytes;            }        }
复制代码


每次循环从 NioSocketChannel 中读取数据之后,都会调用allocHandle.incMessagesRead(1)。统计当前已经读取了多少次。如果超过了最大读取限制此时 16 次,就需要退出 read loop。去处理其他 NioSocketChannel 上的 IO 事件。


        @Override        public final void incMessagesRead(int amt) {            totalMessages += amt;        }
复制代码


在每次 read loop 循环的末尾都需要通过调用allocHandle.continueReading()来判断是否继续 read loop 循环读取 NioSocketChannel 中的数据。


        @Override        public boolean continueReading() {            return continueReading(defaultMaybeMoreSupplier);        }
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() { @Override public boolean get() { //判断本次读取byteBuffer是否满载而归 return attemptedBytesRead == lastBytesRead; } };
@Override public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead > 0; }
复制代码


  • attemptedBytesRead :表示当前 ByteBuffer 预计尝试要写入的字节数。

  • lastBytesRead :表示本次 read loop 真实读取到了多少个字节。


defaultMaybeMoreSupplier 用于判断经过本次 read loop 读取数据后,ByteBuffer 是否满载而归。如果是满载而归的话(attemptedBytesRead == lastBytesRead),表明可能 NioSocketChannel 里还有数据。如果不是满载而归,表明 NioSocketChannel 里没有数据了已经。


是否继续进行 read loop 需要同时满足以下几个条件:


  • totalMessages < maxMessagePerRead 当前读取次数是否已经超过16次,如果超过,就退出do(...)while()循环。进行下一轮OP_READ事件的轮询。因为每个 Sub Reactor 管理了多个 NioSocketChannel,不能在一个 NioSocketChannel 上占用太多时间,要将机会均匀地分配给 Sub Reactor 所管理的所有 NioSocketChannel。

  • totalBytesRead > 0 本次OP_READ事件处理是否读取到了数据,如果已经没有数据可读了,那么就直接退出 read loop。

  • !respectMaybeMoreData || maybeMoreDataSupplier.get() 这个条件比较复杂,它其实就是通过respectMaybeMoreData字段来控制 NioSocketChannel 中可能还有数据可读的情况下该如何处理。

  • maybeMoreDataSupplier.get():true 表示本次读取从 NioSocketChannel 中读取数据,ByteBuffer 满载而归。说明可能 NioSocketChannel 中还有数据没读完。fasle 表示 ByteBuffer 还没有装满,说明 NioSocketChannel 中已经没有数据可读了。

  • respectMaybeMoreData = true表示要对可能还有更多数据进行处理的这种情况要respect认真对待,如果本次循环读取到的数据已经装满ByteBuffer,表示后面可能还有数据,那么就要进行读取。如果ByteBuffer还没装满表示已经没有数据可读了那么就退出循环。


  • respectMaybeMoreData = false表示对可能还有更多数据的这种情况不认真对待 not respect。不管本次循环读取数据ByteBuffer是否满载而归,都要继续进行读取,直到读取不到数据在退出循环,属于无脑读取。


同时满足以上三个条件,那么 read loop 继续进行。继续从 NioSocketChannel 中读取数据,直到读取不到或者不满足三个条件中的任意一个为止。

3.2 从 NioSocketChannel 中读取数据

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
@Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }}
复制代码


这里会直接调用底层 JDK NIO 的SocketChannel#read方法将数据读取到 DirectByteBuffer 中。读取数据大小为本次分配的 DirectByteBuffer 容量,初始为 2048。

4. ByteBuffer 动态自适应扩缩容机制

由于我们一开始并不知道客户端会发送多大的网络数据,所以这里先利用PooledByteBufAllocator分配一个初始容量为2048的 DirectByteBuffer 用于接收数据。


  byteBuf = allocHandle.allocate(allocator);
复制代码


这就好比我们需要拿着一个桶去排队装水,但是第一次去装的时候,我们并不知道管理员会给我们分配多少水,桶拿大了也不合适拿小了也不合适,于是我们就先预估一个差不多容量大小的桶,如果分配的多了,我们下次就拿更大一点的桶,如果分配少了,下次我们就拿一个小点的桶。


在这种场景下,我们需要 ByteBuffer 可以自动根据每次网络数据的大小来动态自适应调整自己的容量。


而 ByteBuffer 动态自适应扩缩容机制依赖于 AdaptiveRecvByteBufAllocator 类的实现。让我们先回到 AdaptiveRecvByteBufAllocator 类的创建起点开始说起~~

4.1 AdaptiveRecvByteBufAllocator 的创建

在前文《Netty是如何高效接收网络连接》中我们提到,当 Main Reactor 监听到 OP_ACCPET 事件活跃后,会在 NioServerSocketChannel 中 accept 完成三次握手的客户端连接。并创建 NioSocketChannel,伴随着 NioSocketChannel 的创建其对应的配置类 NioSocketChannelConfig 类也会随之创建。


    public NioSocketChannel(Channel parent, SocketChannel socket) {        super(parent, socket);        config = new NioSocketChannelConfig(this, socket.socket());    }
复制代码


最终会在 NioSocketChannelConfig 的父类DefaultChannelConfig的构造器中创建AdaptiveRecvByteBufAllocator 。并保存在RecvByteBufAllocator rcvBufAllocator字段中。


public class DefaultChannelConfig implements ChannelConfig {
//用于Channel接收数据用的buffer分配器 AdaptiveRecvByteBufAllocator private volatile RecvByteBufAllocator rcvBufAllocator;
public DefaultChannelConfig(Channel channel) { this(channel, new AdaptiveRecvByteBufAllocator()); }
}
复制代码


new AdaptiveRecvByteBufAllocator()创建 AdaptiveRecvByteBufAllocator 类实例的时候会先触发 AdaptiveRecvByteBufAllocator 类的初始化。


我们先来看下 AdaptiveRecvByteBufAllocator 类的初始化都做了些什么事情:

4.2 AdaptiveRecvByteBufAllocator 类的初始化

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
//扩容步长 private static final int INDEX_INCREMENT = 4; //缩容步长 private static final int INDEX_DECREMENT = 1;
//RecvBuf分配容量表(扩缩容索引表)按照表中记录的容量大小进行扩缩容 private static final int[] SIZE_TABLE;
static { //初始化RecvBuf容量分配表 List<Integer> sizeTable = new ArrayList<Integer>(); //当分配容量小于512时,扩容单位为16递增 for (int i = 16; i < 512; i += 16) { sizeTable.add(i); }
//当分配容量大于512时,扩容单位为一倍 for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); }
//初始化RecbBuf扩缩容索引表 SIZE_TABLE = new int[sizeTable.size()]; for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); } }}
复制代码


AdaptiveRecvByteBufAllocator 主要的作用就是为接收数据的ByteBuffer进行扩容缩容,那么每次怎么扩容?扩容多少?怎么缩容?缩容多少呢??


这四个问题将是本小节笔者要为大家解答的内容~~~


Netty 中定义了一个int型的数组SIZE_TABLE 来存储每个扩容单位对应的容量大小。建立起扩缩容的容量索引表。每次扩容多少,缩容多少全部记录在这个容量索引表中。


在 AdaptiveRecvByteBufAllocatorl 类初始化的时候会在static{}静态代码块中对扩缩容索引表SIZE_TABLE 进行初始化。


从源码中我们可以看出SIZE_TABLE 的初始化分为两个部分:


  • 当索引容量小于512时,SIZE_TABLE 中定义的容量索引是从16开始16递增。



  • 当索引容量大于512时,SIZE_TABLE 中定义的容量索引是按前一个索引容量的 2 倍递增。


4.3 扩缩容逻辑

现在扩缩容索引表SIZE_TABLE 已经初始化完毕了,那么当我们需要对ByteBuffer进行扩容或者缩容的时候如何根据SIZE_TABLE 决定扩容多少或者缩容多少呢??


这就用到了在 AdaptiveRecvByteBufAllocator 类中定义的扩容步长INDEX_INCREMENT = 4,缩容步长INDEX_DECREMENT = 1了。


我们就以上面两副扩缩容容量索引表SIZE_TABLE 中的容量索引展示截图为例,来介绍下扩缩容逻辑,假设我们当前ByteBuffer的容量索引为33,对应的容量为2048

4.3.1 扩容

当对容量为2048的 ByteBuffer 进行扩容时,根据当前的容量索引index = 33 加上 扩容步长INDEX_INCREMENT = 4计算出扩容后的容量索引为37,那么扩缩容索引表SIZE_TABLE下标37对应的容量就是本次 ByteBuffer 扩容后的容量SIZE_TABLE[37] = 32768

4.3.1 缩容

同理对容量为2048的 ByteBuffer 进行缩容时,我们就需要用当前容量索引index = 33 减去 缩容步长INDEX_DECREMENT = 1计算出缩容后的容量索引32,那么扩缩容索引表SIZE_TABLE下标32对应的容量就是本次 ByteBuffer 缩容后的容量SIZE_TABLE[32] = 1024

4.4 扩缩容时机

public abstract class AbstractNioByteChannel extends AbstractNioChannel {        @Override        public final void read() {            .........省略......            try {                do {                      .........省略......                } while (allocHandle.continueReading());
//根据本次read loop总共读取的字节数,决定下次是否扩容或者缩容 allocHandle.readComplete();
.........省略.........
} catch (Throwable t) { ...............省略............... } finally { ...............省略............... } }}
复制代码


在每轮 read loop 结束之后,我们都会调用allocHandle.readComplete()来根据在 allocHandle 中统计的在本轮 read loop 中读取字节总大小,来决定在下一轮 read loop 中是否对 DirectByteBuffer 进行扩容或者缩容。


public abstract class MaxMessageHandle implements ExtendedHandle {
@Override public void readComplete() { //是否对recvbuf进行扩容缩容 record(totalBytesRead()); }
private void record(int actualReadBytes) { if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) { if (decreaseNow) { index = max(index - INDEX_DECREMENT, minIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } else { decreaseNow = true; } } else if (actualReadBytes >= nextReceiveBufferSize) { index = min(index + INDEX_INCREMENT, maxIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } } }
复制代码


我们以当前 ByteBuffer 容量为2048,容量索引index = 33为例,对allocHandle的扩容缩容规则进行说明。


扩容步长INDEX_INCREMENT = 4,缩容步长INDEX_DECREMENT = 1


4.4.1 缩容

  • 如果本次OP_READ事件实际读取到的总字节数actualReadBytes在 SIZE_TABLE[index - INDEX_DECREMENT]与 SIZE_TABLE[index]之间的话,也就是如果本轮 read loop 结束之后总共读取的字节数在[1024,2048]之间。说明此时分配的ByteBuffer容量正好,不需要进行缩容也不需要进行扩容。比如本次actualReadBytes = 2000,正好处在10242048之间。说明2048的容量正好。

  • 如果actualReadBytes 小于等于 SIZE_TABLE[index - INDEX_DECREMENT],也就是如果本轮 read loop 结束之后总共读取的字节数小于等于1024。表示本次读取到的字节数比当前 ByteBuffer 容量的下一级容量还要小,说明当前 ByteBuffer 的容量分配的有些大了,设置缩容标识decreaseNow = true。当下次OP_READ事件继续满足缩容条件的时候,开始真正的进行缩容。缩容后的容量为 SIZE_TABLE[index - INDEX_DECREMENT],但不能小于 SIZE_TABLE[minIndex]。


注意需要满足两次缩容条件才会进行缩容,且缩容步长为 1,缩容比较谨慎

4.4.2 扩容

如果本次OP_READ事件处理总共读取的字节数actualReadBytes 大于等于 当前 ByteBuffer 容量(nextReceiveBufferSize)时,说明 ByteBuffer 分配的容量有点小了,需要进行扩容。扩容后的容量为 SIZE_TABLE[index + INDEX_INCREMENT],但不能超过 SIZE_TABLE[maxIndex]。


满足一次扩容条件就进行扩容,并且扩容步长为 4, 扩容比较奔放

4.5 AdaptiveRecvByteBufAllocator 类的实例化

AdaptiveRecvByteBufAllocator 类的实例化主要是确定 ByteBuffer 的初始容量,以及最小容量和最大容量在扩缩容索引表SIZE_TABLE中的下标:minIndex maxIndex


AdaptiveRecvByteBufAllocator 定义了三个关于 ByteBuffer 容量的字段:


  • DEFAULT_MINIMUM :表示 ByteBuffer 最小的容量,默认为64,也就是无论 ByteBuffer 在怎么缩容,容量也不会低于64

  • DEFAULT_INITIAL :表示 ByteBuffer 的初始化容量。默认为2048

  • DEFAULT_MAXIMUM :表示 ByteBuffer 的最大容量,默认为65536,也就是无论 ByteBuffer 在怎么扩容,容量也不会超过65536


public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
static final int DEFAULT_MINIMUM = 64; static final int DEFAULT_INITIAL = 2048; static final int DEFAULT_MAXIMUM = 65536;
public AdaptiveRecvByteBufAllocator() { this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM); }
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { .................省略异常检查逻辑.............
//计算minIndex maxIndex //在SIZE_TABLE中二分查找最小 >= minimum的容量索引 :3 int minIndex = getSizeTableIndex(minimum); if (SIZE_TABLE[minIndex] < minimum) { this.minIndex = minIndex + 1; } else { this.minIndex = minIndex; }
//在SIZE_TABLE中二分查找最大 <= maximum的容量索引 :38 int maxIndex = getSizeTableIndex(maximum); if (SIZE_TABLE[maxIndex] > maximum) { this.maxIndex = maxIndex - 1; } else { this.maxIndex = maxIndex; }
this.initial = initial; }}
复制代码


接下来的事情就是确定最小容量 DEFAULT_MINIMUM 在 SIZE_TABLE 中的下标minIndex,以及最大容量 DEFAULT_MAXIMUM 在 SIZE_TABLE 中的下标maxIndex


从 AdaptiveRecvByteBufAllocator 类初始化的过程中,我们可以看出 SIZE_TABLE 中存储的数据特征是一个有序的集合。


我们可以通过二分查找在 SIZE_TABLE 中找出第一个容量大于等于 DEFAULT_MINIMUM 的容量索引minIndex


同理通过二分查找在 SIZE_TABLE 中找出最后一个容量小于等于 DEFAULT_MAXIMUM 的容量索引maxIndex


根据上一小节关于SIZE_TABLE中容量数据分布的截图,我们可以看出minIndex = 3maxIndex = 38

4.5.1 二分查找容量索引下标

    private static int getSizeTableIndex(final int size) {        for (int low = 0, high = SIZE_TABLE.length - 1;;) {            if (high < low) {                return low;            }            if (high == low) {                return high;            }
int mid = low + high >>> 1;//无符号右移,高位始终补0 int a = SIZE_TABLE[mid]; int b = SIZE_TABLE[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { return mid + 1; } } }
复制代码


经常刷 LeetCode 的小伙伴肯定一眼就看出这个是二分查找的模板了。


它的目的就是根据给定容量,在扩缩容索引表SIZE_TABLE中,通过二分查找找到最贴近给定 size 的容量的索引下标(第一个大于等于 size 的容量)

4.6 RecvByteBufAllocator.Handle

前边我们提到最终动态调整 ByteBuffer 容量的是由 AdaptiveRecvByteBufAllocator 中的Handler负责的,我们来看下这个allocHandle的创建过程。


protected abstract class AbstractUnsafe implements Unsafe {
private RecvByteBufAllocator.Handle recvHandle;
@Override public RecvByteBufAllocator.Handle recvBufAllocHandle() { if (recvHandle == null) { recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle; }
}
复制代码


从 allocHandle 的获取过程我们看到最 allocHandle 的创建是由AdaptiveRecvByteBufAllocator#newHandle方法执行的。


public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
@Override public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); }
private final class HandleImpl extends MaxMessageHandle { //最小容量在扩缩容索引表中的index private final int minIndex; //最大容量在扩缩容索引表中的index private final int maxIndex; //当前容量在扩缩容索引表中的index 初始33 对应容量2048 private int index; //预计下一次分配buffer的容量,初始:2048 private int nextReceiveBufferSize; //是否缩容 private boolean decreaseNow;
HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex;
//在扩缩容索引表中二分查找到最小大于等于initial 的容量 index = getSizeTableIndex(initial); //2048 nextReceiveBufferSize = SIZE_TABLE[index]; }
.......................省略................... }
}
复制代码


这里我们看到 Netty 中用于动态调整 ByteBuffer 容量的allocHandle 的实际类型为MaxMessageHandle


下面我们来介绍下HandleImpl 中的核心字段,它们都和 ByteBuffer 的容量有关:


  • minIndex :最小容量在扩缩容索引表SIZE_TABE中的 index。默认是3

  • maxIndex :最大容量在扩缩容索引表SIZE_TABE中的 index。默认是38

  • index :当前容量在扩缩容索引表SIZE_TABE中的 index。初始是33

  • nextReceiveBufferSize :预计下一次分配 buffer 的容量,初始为2048。在每次申请内存分配 ByteBuffer 的时候,采用nextReceiveBufferSize的值指定容量。

  • decreaseNow : 是否需要进行缩容。

5. 使用堆外内存为 ByteBuffer 分配内存

AdaptiveRecvByteBufAllocator 类只是负责动态调整 ByteBuffer 的容量,而具体为 ByteBuffer 申请内存空间的是由PooledByteBufAllocator负责。

5.1 类名前缀 Pooled 的来历

在我们使用 Java 进行日常开发过程中,在为对象分配内存空间的时候我们都会选择在 JVM 堆中为对象分配内存,这样做对我们 Java 开发者特别的友好,我们只管使用就好而不必过多关心这块申请的内存如何回收,因为 JVM 堆完全受 Java 虚拟机控制管理,Java 虚拟机会帮助我们回收不再使用的内存。


但是 JVM 在进行垃圾回收时候的stop the world会对我们应用程序的性能造成一定的影响。


除此之外我们在《聊聊Netty那些事儿之从内核角度看IO模型》一文中介绍 IO 模型的时候提到,当数据达到网卡时,网卡会通过 DMA 的方式将数据拷贝到内核空间中,这是第一次拷贝。当用户线程在用户空间发起系统 IO 调用时,CPU 会将内核空间的数据再次拷贝到用户空间。这是第二次拷贝


于此不同的是当我们在 JVM 中发起 IO 调用时,比如我们使用 JVM 堆内存读取Socket接收缓冲区中的数据时,会多一次内存拷贝,CPU 在第二次拷贝中将数据从内核空间拷贝到用户空间时,此时的用户空间站在 JVM 角度是堆外内存,所以还需要将堆外内存中的数据拷贝到堆内内存中。这就是第三次内存拷贝


同理当我们在 JVM 中发起 IO 调用向Socket发送缓冲区写入数据时,JVM 会将 IO 数据先拷贝堆外内存,然后才能发起系统 IO 调用。


那为什么操作系统不直接使用 JVM 的堆内内存进行IO操作呢?


因为 JVM 的内存布局和操作系统分配的内存是不一样的,操作系统不可能按照 JVM 规范来读写数据,所以就需要第三次拷贝中间做个转换将堆外内存中的数据拷贝到 JVM 堆中。




所以基于上述内容,在使用 JVM 堆内内存时会产生以下两点性能影响:


  1. JVM 在垃圾回收堆内内存时,会发生stop the world导致应用程序卡顿。

  2. 在进行 IO 操作的时候,会多产生一次由堆外内存到堆内内存的拷贝。


基于以上两点使用JVM堆内内存对性能造成的影响,于是对性能有卓越追求的 Netty 采用堆外内存也就是DirectBuffer来为 ByteBuffer 分配内存空间。


采用堆外内存为 ByteBuffer 分配内存的好处就是:


  • 堆外内存直接受操作系统的管理,不会受 JVM 的管理,所以 JVM 垃圾回收对应用程序的性能影响就没有了。

  • 网络数据到达之后直接在堆外内存上接收,进程读取网络数据时直接在堆外内存中读取,所以就避免了第三次内存拷贝


所以 Netty 在进行 I/O 操作时都是使用的堆外内存,可以避免数据从 JVM 堆内存到堆外内存的拷贝。但是由于堆外内存不受 JVM 的管理,所以就需要额外关注对内存的使用和释放,稍有不慎就会造成内存泄露,于是 Netty 就引入了内存池堆外内存进行统一管理。


PooledByteBufAllocator 类的这个前缀Pooled就是内存池的意思,这个类会使用 Netty 的内存池为 ByteBuffer 分配堆外内存

5.2 PooledByteBufAllocator 的创建

创建时机

在服务端 NioServerSocketChannel 的配置类 NioServerSocketChannelConfig 以及客户端 NioSocketChannel 的配置类 NioSocketChannelConfig 实例化的时候会触发 PooledByteBufAllocator 的创建。


public class DefaultChannelConfig implements ChannelConfig {    //PooledByteBufAllocator    private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
..........省略......}
复制代码


创建出来的 PooledByteBufAllocator 实例保存在DefaultChannelConfig类中的ByteBufAllocator allocator字段中。

创建过程

public interface ByteBufAllocator {
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR; ..................省略............}
复制代码


public final class ByteBufUtil {
static final ByteBufAllocator DEFAULT_ALLOCATOR;
static { String allocType = SystemPropertyUtil.get( "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled"); allocType = allocType.toLowerCase(Locale.US).trim();
ByteBufAllocator alloc; if ("unpooled".equals(allocType)) { alloc = UnpooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else if ("pooled".equals(allocType)) { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType); }
DEFAULT_ALLOCATOR = alloc; ...................省略.................. }}
复制代码


从 ByteBufUtil 类的初始化过程我们可以看出,在为 ByteBuffer 分配内存的时候是否使用内存池在 Netty 中是可以配置的。


  • 通过系统变量-D io.netty.allocator.type 可以配置是否使用内存池为 ByteBuffer 分配内存。默认情况下是需要使用内存池的。但是在安卓系统中默认是不使用内存池的。

  • 通过PooledByteBufAllocator.DEFAULT获取内存池 ByteBuffer 分配器


   public static final PooledByteBufAllocator DEFAULT =            new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
复制代码


由于本文的主线是介绍 Sub Reactor 处理OP_READ事件的完整过程,所以这里只介绍主线相关的内容,这里只是简单介绍下在接收数据的时候为什么会用PooledByteBufAllocator来为ByteBuffer分配内存。而内存池的架构设计比较复杂,所以笔者后面会单独写一篇关于 Netty 内存管理的文章。



总结

本文介绍了 Sub Reactor 线程在处理 OP_READ 事件的整个过程。并深入剖析了 AdaptiveRecvByteBufAllocator 类动态调整 ByteBuffer 容量的原理。


同时也介绍了 Netty 为什么会使用堆外内存来为 ByteBuffer 分配内存,并由此引出了 Netty 的内存池分配器 PooledByteBufAllocator 。


在介绍 AdaptiveRecvByteBufAllocator 类和 PooledByteBufAllocator 一起组合实现动态地为 ByteBuffer 分配容量的时候,笔者不禁想起了多年前看过的《Effective Java》中第 16 条 复合优先于继承


Netty 在这里也遵循了这条军规,首先两个类设计的都是单一的功能。


  • AdaptiveRecvByteBufAllocator 类只负责动态的调整 ByteBuffer 容量,并不管具体的内存分配。

  • PooledByteBufAllocator 类负责具体的内存分配,用内存池的方式。


这样设计的就比较灵活,具体内存分配的工作交给具体的ByteBufAllocator,可以使用内存池的分配方式PooledByteBufAllocator,也可以不使用内存池的分配方式UnpooledByteBufAllocator。具体的内存可以采用 JVM 堆内内存(HeapBuffer),也可以使用堆外内存(DirectBuffer)。


AdaptiveRecvByteBufAllocator只需要关注调整它们的容量工作就可以了,而并不需要关注它们具体的内存分配方式。


最后通过io.netty.channel.RecvByteBufAllocator.Handle#allocate方法灵活组合不同的内存分配方式。这也是装饰模式的一种应用。


byteBuf = allocHandle.allocate(allocator);
复制代码


好了,今天的内容就到这里,我们下篇文章见~~~~

发布于: 刚刚阅读数: 3
用户头像

微信公众号:bin的技术小屋,专注源码解析 2018.01.31 加入

专注源码解析系列原创技术文章,分享自己的技术感悟

评论

发布
暂无评论
Netty如何高效接收网络数据?一文聊透ByteBuffer动态自适应扩缩容机制