写点什么

【Netty】「源码解析」(一)ByteBuf 的动态扩容策略与实现原理

作者:sidiot
  • 2023-06-20
    浙江
  • 本文字数:5723 字

    阅读完需:约 19 分钟

前言


本篇博文是《从 0 到 1 学习 Netty》中源码系列的第一篇博文,主要内容是通过源码逐步讲解 Netty 中 ByteBuf 的动态扩容机制,并结合应用案例加以验证,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;


介绍


在我们写入新数据时,如果 ByteBuf 的内部空间不足以容纳新数据,它会自动进行扩容。一般 ByteBuf 会使用 ensureWritable0 方法进行扩容,ensureWritable0 的大致流程如下所示:



其中,可写部分可扩容部分的相关内容在博文 ByteBuf 的基本使用 中进行了详细介绍,这里就不再赘述。


ensureWritable0 的源码如下所示:


final void ensureWritable0(int minWritableBytes) {    // 判断部分    ensureAccessible();    if (minWritableBytes <= writableBytes()) {        return;    }    final int writerIndex = writerIndex();    if (checkBounds) {        if (minWritableBytes > maxCapacity - writerIndex) {            throw new IndexOutOfBoundsException(String.format(                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",                    writerIndex, minWritableBytes, maxCapacity, this));        }    }
// 计算部分 int minNewCapacity = writerIndex + minWritableBytes; int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity); int fastCapacity = writerIndex + maxFastWritableBytes();
if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) { newCapacity = fastCapacity; }
capacity(newCapacity);}
复制代码


接下来将对源码进行讲解与分析,主要分为判断策略与扩容实现;


判断策略


判断策略主要有三个,分别是判断 ByteBuf 对象是否可以访问,是否需要扩容以及是否写入溢出,如果一切符合策略要求,将会进入扩容实现阶段;




第一个判断:判断 ByteBuf 对象是否可以访问


源码如下所示:


/** * Should be called by every method that tries to access the buffers content to check * if the buffer was released before. */protected final void ensureAccessible() {    if (checkAccessible && !isAccessible()) {        throw new IllegalReferenceCountException(0);    }}
/** * Used internally by {@link AbstractByteBuf#ensureAccessible()} to try to guard * against using the buffer after it was released (best-effort). */boolean isAccessible() { return refCnt() != 0;}
复制代码


在上述源码中,通过 ensureAccessible(); 方法来检查 ByteBuf 对象是否被销毁,如果 checkAccessible 标志位为 true,表示需要检查缓冲区是否可访问。如果缓冲区已经被释放(即引用计数为 0,通过 refCnt() 方法获取当前缓冲区的引用计数),则会抛出 IllegalReferenceCountException 异常,表示缓冲区已经无法访问。


这是一个用于检测已经释放的缓冲区的最佳努力实现,它可以提高性能并允许更好的内联优化,每个尝试访问缓冲区内容的方法都应调用该方法,以检查缓冲区之前是否已释放,防止在释放缓冲区后使用缓冲区。




第二个判断:判断 ByteBuf 对象是否需要扩容


源码如下所示:


if (minWritableBytes <= writableBytes()) {      return;  }
@Override public int writableBytes() { return capacity() - writerIndex; }
复制代码


在上述源码中,writableBytes() 方法返回缓冲区中还剩余多少可写入的字节数量,即缓冲区的当前容量减去已经写入的字节数,capacity() 返回缓冲区的当前容量,而 writerIndex 返回下一次写入的索引位置;


通过当前可写部分的长度 writableBytes() 与等待写入的字节数量 minWritableBytes 进行比较来判断 ByteBuf 对象是否需要扩容,如果 minWritableBytes <= writableBytes(),那么 ByteBuf 就不需要进行扩容,直接返回调用该函数的上层函数或者退出当前函数的执行,否则,程序会继续向下执行。




第三个判断:判断 ByteBuf 对象是否写入溢出


源码如下所示:


if (checkBounds) {    if (minWritableBytes > maxCapacity - writerIndex) {        throw new IndexOutOfBoundsException(String.format(                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",                writerIndex, minWritableBytes, maxCapacity, this));    }}
复制代码


在上述源码中,checkBounds 用于判断是否需要进行边界检查,然后再计算出 ByteBuf 可写入的最大字节数量,即 maxCapacity - writerIndex


将计算出来的最大字节数量与等待写入的字节数量 minWritableBytes 比较,若 minWritableBytes 大于可写入的最大字节数量,则说明缓冲区剩余空间不足以容纳要写入的数据,于是抛出一个 IndexOutOfBoundsException 异常,表示写入操作越界,否则,将会进入扩容阶段;


下面将详细讲解扩容实现相关内容;


扩容实现


ByteBuf 扩容的实现主要是计算出 minNewCapacitynewCapacityfastCapacity 这三个值,然后从中选出比较合理的值作为 ByteBuf 当前容量进行扩容;




计算 minNewCapacity


源码如下所示:


int minNewCapacity = writerIndex + minWritableBytes;
复制代码


在上述源码中,最小需要的新容量 minNewCapacity 就是当前写位置 writerIndex 加上等待写入的字节数量 minWritableBytes




计算 newCapacity


源码如下所示:


int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity);
@Overridepublic int calculateNewCapacity(int minNewCapacity, int maxCapacity) { checkPositiveOrZero(minNewCapacity, "minNewCapacity"); if (minNewCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "minNewCapacity: %d (expected: not greater than maxCapacity(%d)", minNewCapacity, maxCapacity)); } final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) { return threshold; }
// If over threshold, do not double but just increase by threshold. if (minNewCapacity > threshold) { int newCapacity = minNewCapacity / threshold * threshold; if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { newCapacity += threshold; } return newCapacity; }
// Not over threshold. Double up to 4 MiB, starting from 64. int newCapacity = 64; while (newCapacity < minNewCapacity) { newCapacity <<= 1; }
return Math.min(newCapacity, maxCapacity);}
复制代码


在上述源码中,主要作用就是将当前容量规范化为 2 的幂次方,第一行代码使用分配器 alloc() 计算出一个新的容量值 newCapacity,不过要注意的是,这个值有可能会大于 maxCapacity


接下来看到这里的关键函数 calculateNewCapacity(int minNewCapacity, int maxCapacity),该函数接收两个参数:minNewCapacitymaxCapacity,分别代表最小需要的新容量和最大容量,然后函数中会进行一些逻辑操作,返回一个新的合理的容量大小。


首先,检查 minNewCapacity 是否为正,并且是否小于等于 maxCapacity。如果不满足条件,则会抛出 IllegalArgumentException 异常。


然后,定义一个阈值 threshold,其值为 4MB:


static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
复制代码


如果 minNewCapacity 等于 threshold,则直接返回 threshold。如果 minNewCapacity 大于 threshold,则将 minNewCapacity 除以 threshold 得到一个整数,再将该整数乘以 threshold 得到当前容量的第一个整数倍值,然后加上 threshold。此时,如果新计算得到的容量值已经超过了 maxCapacity-threshold,则返回 maxCapacity,否则实际容量值要再加上一个 threshold


举个例子,比如说当前的 minNewCapacity=7threshold=4,那么 newCapacity = minNewCapacity / threshold * threshold = 7 / 4 * 4 = 4,因此,最后得到的 newCapacity 就是 8,即 newCapacity += threshold


最后,若 minNewCapacity 小于等于 threshold,则将容量大小从 64 开始连续翻倍,直到达到 minNewCapacity 或者超过 threshold 后停止。如果翻倍后的容量大小超过了 maxCapacity,则返回 maxCapacity。如果没有超过,则返回翻倍后的容量大小。




计算 fastCapacity


源码如下所示:


int fastCapacity = writerIndex + maxFastWritableBytes();
@Overridepublic int maxFastWritableBytes() { return Math.min(maxLength, maxCapacity()) - writerIndex;}
复制代码


上述源码中,fastCapacity 是当前写位置 writerIndex 加上一个最大快速可写字节数 maxFastWritableBytes() 得到的结果。


其中,maxLength 值是根据你定义的 ByteBuf 的空间大小决定的,它会是 16 的倍数,比如 ByteBuf 的空间大小为 9,那么 maxLength 值为 16;ByteBuf 的空间大小为 65,那么 maxLength 值为 80。




选择合理的容量并进行扩容


源码如下所示:


if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) {      newCapacity = fastCapacity;  }    capacity(newCapacity);
复制代码


上述源码中,如果新容量值 newCapacity 大于 fastCapacity 且最小需要的新容量 minNewCapacity 小于或等于 fastCapacity,则选择较小的 fastCapacity 作为新的容量值,以避免不必要的重新分配。


实战验证


现在,我们自定义一个空间大小为 36 的 ByteBuf,然后向其中写入 60 个字节的数据,以触发 ByteBuf 的动态扩容机制,测试代码如下:


ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(36);  log(buf);  
StringBuilder sb = new StringBuilder(); for (int i = 0; i < 10; i++) { sb.append("sidiot"); }
buf.writeBytes(sb.toString().getBytes()); log(buf);
复制代码


根据 int minNewCapacity = writerIndex + minWritableBytes; 计算出 minNewCapacity 的值为 0 + 60 = 60;


接着调用 calculateNewCapacity 函数,由于 minNewCapacity 的值为 60 小于 64,所以 newCapacity 的值为 64;



由于我们定义的 ByteBuf 的空间大小为 36,因此 maxLength 的值为 48,fastCapacity 的值也是 48:



最终,我们获得了 minNewCapacitynewCapacityfastCapacity 这三个变量的数值,分别为 60,64,48:



由于 minNewCapacity > fastCapacity,因此 if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) 条件不成立,所以 newCapacity 依旧是 64,即最后 ByteBuf 的空间扩容为 64:



运行结果:


read index:0 write index:0 capacity:36
read index:0 write index:60 capacity:64 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 73 69 64 69 6f 74 73 69 64 69 6f 74 73 69 64 69 |sidiotsidiotsidi||00000010| 6f 74 73 69 64 69 6f 74 73 69 64 69 6f 74 73 69 |otsidiotsidiotsi||00000020| 64 69 6f 74 73 69 64 69 6f 74 73 69 64 69 6f 74 |diotsidiotsidiot||00000030| 73 69 64 69 6f 74 73 69 64 69 6f 74 |sidiotsidiot |+--------+-------------------------------------------------+----------------+
复制代码




那如果将 ByteBuf 的空间大小设置为 69,向其中写入 72 个字节的数据,最终 ByteBuf 的空间大小会扩容至多少呢?


想必小伙伴都知道答案了,是的,80!


暂时没有算出来的小伙伴也不要气馁,我们接着往下分析;


minNewCapacity 毋庸置疑的是 72,然后在计算 newCapacity 时,由于 128 > 72 > 64,因此 newCapacity 的大小为 128:



注意,newCapacity <<= 1; 相当于 newCapacity *= 2,但是位运算速度会快一点;


由于我们将 ByteBuf 的空间大小设置为 69,因此 fastCapacity 的大小就是 5 * 16 = 80


最终,我们获得了 minNewCapacitynewCapacityfastCapacity 这三个变量的数值,72,128,80:



而这三个变量值又符合逻辑判断 if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity),所以 newCapacity = fastCapacity = 80


因此,最终 ByteBuf 的空间扩容为 80:



运行结果:


read index:0 write index:0 capacity:69
read index:0 write index:72 capacity:80 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 73 69 64 69 6f 74 73 69 64 69 6f 74 73 69 64 69 |sidiotsidiotsidi||00000010| 6f 74 73 69 64 69 6f 74 73 69 64 69 6f 74 73 69 |otsidiotsidiotsi||00000020| 64 69 6f 74 73 69 64 69 6f 74 73 69 64 69 6f 74 |diotsidiotsidiot||00000030| 73 69 64 69 6f 74 73 69 64 69 6f 74 73 69 64 69 |sidiotsidiotsidi||00000040| 6f 74 73 69 64 69 6f 74 |otsidiot |+--------+-------------------------------------------------+----------------+
复制代码


后记


总之,ByteBuf 的动态扩容策略是一种非常高效的内存管理方法,使得 ByteBuf 的使用更加灵活方便,并且在实现上也非常巧妙。通过了解它的实现原理,我们可以更好地掌握如何使用它来优化我们的程序性能,避免出现一些潜在的问题。当然,在使用过程中需要结合具体场景进行调整,以最大限度地发挥其优势,同时提高我们代码的可读性和可维护性。


以上就是 ByteBuf 的动态扩容策略与实现原理 的所有内容了,希望本篇博文对大家有所帮助!


参考:



发布于: 2023-06-20阅读数: 28
用户头像

sidiot

关注

还未添加个人签名 2023-06-04 加入

还未添加个人简介

评论

发布
暂无评论
【Netty】「源码解析」(一)ByteBuf 的动态扩容策略与实现原理_Java_sidiot_InfoQ写作社区