前言
本篇博文是《从 0 到 1 学习 Netty》中入门系列的第六篇博文,主要内容是介绍 Netty 中 ByteBuf 的基本使用,包含其组成、创建、写入和读取,通过源码分析和应用案例进行详细讲解,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;
介绍
在 Netty 中,ByteBuf 是一个可扩展的字节容器。它是一个抽象类,其实现提供了对字节数据的高效访问。ByteBuf 可以像普通缓冲区一样进行读写操作,但与常规缓冲区不同的是,在进行读写操作时可以使用不同的指针,这使得 ByteBuf 的读写更加灵活。
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> {}
public abstract class AbstractByteBuf extends ByteBuf { int readerIndex; int writerIndex; private int markedReaderIndex; private int markedWriterIndex;}
复制代码
ByteBuf 的内部实现采用了类似链表的数据结构,可以动态扩容和释放空间。由于它的实现方式不同于传统的字节数组,因此可以更好地适应现代计算机体系结构下的存储模式,具有更好的内存管理、并发性能等优势。
final void ensureWritable0(int minWritableBytes) { ensureAccessible(); if (minWritableBytes <= writableBytes()) { return; } final int writerIndex = writerIndex(); if (checkBounds) { if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); } } // Normalize the current capacity to the power of 2. int minNewCapacity = writerIndex + minWritableBytes; int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity);
int fastCapacity = writerIndex + maxFastWritableBytes(); // Grow by a smaller amount if it will avoid reallocation if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) { newCapacity = fastCapacity; }
// Adjust to the new capacity. capacity(newCapacity); }
复制代码
通过源码可以获知,在 ensureWritable0() 方法中,如果当前可写空间小于指定的最小可写字节数,则需要进行扩容操作。首先会判断是否已经达到了 ByteBuf 实例的最大容量,如果是则抛出异常 IndexOutOfBoundsException;否则,通过 calculateNewCapacity() 方法计算出新的容量值,然后通过 capacity 方法进行扩容操作。calculateNewCapacity() 方法中会根据当前 ByteBuf 实例的容量和最大容量进行计算,以确定新的容量值。
另外,ByteBuf 还提供了方便的读写方法和一些高级功能,例如池化和零拷贝技术,以及支持多协议的编解码器等。这些功能都大大简化了网络编程和数据处理任务的实现过程,提高了性能和可靠性。
组成
ByteBuf 主要由以下四个部分组成:
废弃部分:指读指针之前的部分,表示已读空间;
可读部分:指读指针与写指针之间的部分,表示可读空间;
可写部分:指写指针与当前容量之间的部分,表示可写空间;
可扩容部分:指当前容量与最大容量之间的部分,表示可扩充空间;
相较于 ByteBuffer 的读写需要用 position 进行控制,ByteBuf 的读写分别由读指针和写指针两个指针控制,在读写操作时,无需进行模式的切换;
在构造 ByteBuf 时,可传入两个参数,分别代表初始容量 DEFAULT_INITIAL_CAPACITY 和最大容量 DEFAULT_MAX_CAPACITY,其中,初始容量默认为 256 字节,最大容量默认为 Integer.MAX_VALUE;
当 ByteBuf 容量无法容纳所有数据时,会进行扩容操作,若 超出最大容量,会抛出java.lang.IndexOutOfBoundsException 异常;
部分源码如下所示:
static final int DEFAULT_INITIAL_CAPACITY = 256; static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;
@Override public ByteBuf directBuffer() { return directBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY); }
@Override public ByteBuf heapBuffer() { return heapBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY); }
复制代码
创建
ByteBufAllocator 接口提供了一种分配 ByteBuf 实例的抽象方法,而 DEFAULT 静态成员则提供了该接口的默认实现,buffer() 方法分配了一个新的 ByteBuf 实例。部分源代码如下所示:
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
@Override public ByteBuf buffer() { if (directByDefault) { return directBuffer(); } return heapBuffer(); }
复制代码
测试代码:
public class TestByteBuf { public static void main(String[] args) { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); System.out.println(buf); StringBuilder sb = new StringBuilder(); for (int i = 0; i < 50; i++) { sb.append("sidiot"); } buf.writeBytes(sb.toString().getBytes()); System.out.println(buf); } }
复制代码
运行结果:
PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)
复制代码
这里观察运行结果发现,只是一些简单的数据显示,并没有 ByteBuf 中的详细内容,因此编写一个调试工具方法来帮助我们更为详细地查看 ByteBuf 中的内容:
import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump; import static io.netty.util.internal.StringUtil.NEWLINE;
public static void log(ByteBuf buffer) { int length = buffer.readableBytes(); int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4; StringBuilder buf = new StringBuilder(rows * 80 * 2) .append("read index:").append(buffer.readerIndex()) .append(" write index:").append(buffer.writerIndex()) .append(" capacity:").append(buffer.capacity()) .append(NEWLINE); appendPrettyHexDump(buf, buffer); System.out.println(buf.toString()); }
复制代码
运行结果:
写入
接下来,讲解一下几个有点意思的常用写入方法:
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> {
public abstract ByteBuf writeBoolean(boolean value); public abstract ByteBuf writeInt(int value); public abstract ByteBuf writeIntLE(int value);}
复制代码
writeBoolean 方法用一字节 01|00 代表 true|false,部分源码如下所示:
@Override public ByteBuf writeBoolean(boolean value) { writeByte(value ? 1 : 0); return this; } @Override public ByteBuf writeByte(int value) { ensureWritable0(1); _setByte(writerIndex++, value); return this; }
复制代码
编写一个测试方法 testWriteBoolean:
public static void testWriteBoolean(ByteBuf buf) { buf.writeBoolean(true); buf.writeBoolean(true); buf.writeBoolean(false); log(buf); }
复制代码
运行结果:
read index:0 write index:3 capacity:256 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 01 01 00 |... |+--------+-------------------------------------------------+----------------+
复制代码
部分源码如下所示:
@Override public ByteBuf writeInt(int value) { ensureWritable0(4); _setInt(writerIndex, value); writerIndex += 4; return this; } @Override public ByteBuf writeIntLE(int value) { ensureWritable0(4); _setIntLE(writerIndex, value); writerIndex += 4; return this; }
复制代码
编写一个测试方法 testWriteInt:
public static void testWriteInt(ByteBuf buf) { buf.writeInt(1314); log(buf); }
复制代码
编写一个测试方法 testWriteIntLE:
public static void testWriteIntLE(ByteBuf buf) { buf.writeIntLE(1314); log(buf); }
复制代码
运行结果:
testWriteInt - value: 1314read index:0 write index:4 capacity:256 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 00 00 05 22 |..." |+--------+-------------------------------------------------+----------------+
testWriteIntLE - value: 1314read index:0 write index:4 capacity:256 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 22 05 00 00 |"... |+--------+-------------------------------------------------+----------------+
复制代码
一般来说,小端字节序在计算机内部处理时更高效,因为计算都是从低位开始的;大端字节序在网络传输和文件存储时更方便,因为符号位在第一个字节,容易判断正负。
在计算机系统中,CPU 和操作系统的设计决定了字节序的采用方式。x86 架构的 CPU 采用小端字节序,因此大多数 PC 和手机等设备也都是采用小端字节序。
然而,在网络通信中,由于涉及到不同设备之间的数据交换,为了确保数据的正确传输和解析,需要使用一个固定的字节序。因此,网络通信协议一般规定采用大端字节序进行数据传输,例如 TCP/IP 协议中就采用了大端字节序。
在进行网络通信时,如果通信双方的字节序相同,则可以直接传输数据。但是,如果通信双方的字节序不同,则需要进行字节序转换(即将数据从一种字节序转换成另一种字节序)。为了确保数据传输的正确性和效率,字节序转换一般会在网络协议层完成,例如在 TCP/IP 协议栈中进行处理。
读取
1、获取当前可读取的字节数:可以通过调用 ByteBuf 的 readableBytes() 方法获取当前可读取的字节数。
readableBytes() 的源码如下:
@Override public int readableBytes() { return writerIndex - readerIndex; }
复制代码
通过源码可以获知,可读部分就是写指针与读指针之间的空间;
测试代码:
public static void testReadableBytes(ByteBuf buf) { buf.writeBytes(new byte[]{'s', 'i', 'd', 'i', 'o', 't'}); log(buf); System.out.println("当前可读取的字节数为" + buf.readableBytes()); buf.readByte(); log(buf); System.out.println("当前可读取的字节数为" + buf.readableBytes()); }
复制代码
运行结果:
read index:0 write index:6 capacity:256 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 73 69 64 69 6f 74 |sidiot |+--------+-------------------------------------------------+----------------+当前可读取的字节数为6
read index:1 write index:6 capacity:256 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 69 64 69 6f 74 |idiot |+--------+-------------------------------------------------+----------------+当前可读取的字节数为5
复制代码
2、读取指定长度的字节数据:可以通过调用 ByteBuf 的 readBytes() 方法读取指定长度的字节数据并存储到一个字节数组中。
readBytes() 的源码如下:
@Override public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { checkReadableBytes(length); getBytes(readerIndex, dst, dstIndex, length); readerIndex += length; return this; }
复制代码
通过源码可以获知,该方法会先调用 checkReadableBytes 方法检查是否有足够的可读字节数,如果不足则抛出异常;
然后调用 getBytes 方法将数据从 ByteBuf 中读取到目标字节数组中:从当前 ByteBuf 的读索引位置 readerIndex 开始,将指定长度的数据读取到目标字节数组 dst 的指定位置 dstIndex 开始的地方,并将读索引位置增加相应的长度;
测试代码:
public static void testReadBytes(ByteBuf buf) { buf.writeBytes(new byte[]{'s', 'i', 'd', 'i', 'o', 't'}); byte[] bytes = new byte[3]; buf.readBytes(bytes); System.out.println(new String(bytes)); log(buf); }
复制代码
运行结果:
sid
read index:3 write index:6 capacity:256 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 69 6f 74 |iot |+--------+-------------------------------------------------+----------------+
复制代码
3、读取单个字节数据:可以通过调用 ByteBuf 的 readByte() 方法读取单个字节数据。
readByte() 的源码如下:
@Override public byte readByte() { checkReadableBytes0(1); int i = readerIndex; byte b = _getByte(i); readerIndex = i + 1; return b; }
复制代码
通过源码可以获知,该方法会先调用 checkReadableBytes0(1) 方法检查可读字节数是否大于等于 1,如果小于 1,则抛出异常,然后从读指针 readerIndex 的当前位置读取一个字节,并将其作为返回值;
测试代码:
public static void testReadByte(ByteBuf buf) { buf.writeBytes(new byte[]{'s', 'i', 'd', 'i', 'o', 't'}); System.out.println((char)buf.readByte()); System.out.println((char)buf.readByte()); log(buf); }
复制代码
运行结果:
si
read index:2 write index:6 capacity:256 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 64 69 6f 74 |diot |+--------+-------------------------------------------------+----------------+
复制代码
4、读取基本类型数据:可以通过调用 ByteBuf 相应的读取方法读取基本类型数据。
例如:
readBoolean():读取布尔类型数据;
readShort():读取短整型数据;
readInt():读取整型数据;
readLong():读取长整型数据;
readFloat():读取浮点型数据;
readDouble():读取双精度浮点型数据;
readInt() 的源码如下:
@Override public int readInt() { checkReadableBytes0(4); int v = _getInt(readerIndex); readerIndex += 4; return v; }
复制代码
通过源码可以获知,该方法与上述的 readByte() 方法相近,因为在 Java 中 Int 是四个字节的,所以 checkReadableBytes0(4);;
测试代码:
public static void testReadInt(ByteBuf buf) { buf.writeInt(6); System.out.println(buf.readInt()); }
复制代码
运行结果:
在上述提到的读取方法中,但凡是被读取的数据都会进入废弃部分,不能被再次读取,如果需要重复读取,需要调用 ByteBuf 的 markReaderIndex() 对读指针进行标记,并通过 ByteBuf 的 resetReaderIndex() 将读指针恢复到 mark 标记的位置;
源码如下:
@Override public ByteBuf markReaderIndex() { markedReaderIndex = readerIndex; return this; } @Override public ByteBuf resetReaderIndex() { readerIndex(markedReaderIndex); return this; }
复制代码
通过源码可以获知,markReaderIndex() 方法是将当前的读指针位置 readerIndex 赋值给 mark 读指针标记 markedReaderIndex,在使用 resetReaderIndex() 方法时,将 markedReaderIndex 重新赋值给 readerIndex,以实现重复读取;
测试代码:
public static void testReadRepeat(ByteBuf buf) { buf.writeBytes(new byte[]{'s', 'i', 'd', 'i', 'o', 't'}); log(buf); buf.markReaderIndex(); System.out.println((char)buf.readByte()); System.out.println((char)buf.readByte()); log(buf); System.out.println("resetReaderIndex"); buf.resetReaderIndex(); log(buf); }
复制代码
运行结果:
read index:0 write index:6 capacity:256 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 73 69 64 69 6f 74 |sidiot |+--------+-------------------------------------------------+----------------+si
read index:2 write index:6 capacity:256 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 64 69 6f 74 |diot |+--------+-------------------------------------------------+----------------+
resetReaderIndex
read index:0 write index:6 capacity:256 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 73 69 64 69 6f 74 |sidiot |+--------+-------------------------------------------------+----------------+
复制代码
后记
以上就是 ByteBuf 的基本使用 的所有内容了,希望本篇博文对大家有所帮助!
参考:
评论