前言
本篇博文是《从 0 到 1 学习 Netty》中入门系列的第六篇博文,主要内容是介绍 Netty 中 ByteBuf 的基本使用,包含其组成、创建、写入和读取,通过源码分析和应用案例进行详细讲解,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;
介绍
在 Netty 中,ByteBuf 是一个可扩展的字节容器。它是一个抽象类,其实现提供了对字节数据的高效访问。ByteBuf 可以像普通缓冲区一样进行读写操作,但与常规缓冲区不同的是,在进行读写操作时可以使用不同的指针,这使得 ByteBuf 的读写更加灵活。
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> {}
public abstract class AbstractByteBuf extends ByteBuf {
int readerIndex;
int writerIndex;
private int markedReaderIndex;
private int markedWriterIndex;
}
复制代码
ByteBuf 的内部实现采用了类似链表的数据结构,可以动态扩容和释放空间。由于它的实现方式不同于传统的字节数组,因此可以更好地适应现代计算机体系结构下的存储模式,具有更好的内存管理、并发性能等优势。
final void ensureWritable0(int minWritableBytes) {
ensureAccessible();
if (minWritableBytes <= writableBytes()) {
return;
}
final int writerIndex = writerIndex();
if (checkBounds) {
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
}
// Normalize the current capacity to the power of 2.
int minNewCapacity = writerIndex + minWritableBytes;
int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity);
int fastCapacity = writerIndex + maxFastWritableBytes();
// Grow by a smaller amount if it will avoid reallocation
if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) {
newCapacity = fastCapacity;
}
// Adjust to the new capacity.
capacity(newCapacity);
}
复制代码
通过源码可以获知,在 ensureWritable0()
方法中,如果当前可写空间小于指定的最小可写字节数,则需要进行扩容操作。首先会判断是否已经达到了 ByteBuf 实例的最大容量,如果是则抛出异常 IndexOutOfBoundsException
;否则,通过 calculateNewCapacity()
方法计算出新的容量值,然后通过 capacity
方法进行扩容操作。calculateNewCapacity()
方法中会根据当前 ByteBuf 实例的容量和最大容量进行计算,以确定新的容量值。
另外,ByteBuf 还提供了方便的读写方法和一些高级功能,例如池化和零拷贝技术,以及支持多协议的编解码器等。这些功能都大大简化了网络编程和数据处理任务的实现过程,提高了性能和可靠性。
组成
ByteBuf 主要由以下四个部分组成:
废弃部分:指读指针之前的部分,表示已读空间;
可读部分:指读指针与写指针之间的部分,表示可读空间;
可写部分:指写指针与当前容量之间的部分,表示可写空间;
可扩容部分:指当前容量与最大容量之间的部分,表示可扩充空间;
相较于 ByteBuffer 的读写需要用 position
进行控制,ByteBuf 的读写分别由读指针和写指针两个指针控制,在读写操作时,无需进行模式的切换;
在构造 ByteBuf 时,可传入两个参数,分别代表初始容量 DEFAULT_INITIAL_CAPACITY
和最大容量 DEFAULT_MAX_CAPACITY
,其中,初始容量默认为 256 字节,最大容量默认为 Integer.MAX_VALUE
;
当 ByteBuf 容量无法容纳所有数据时,会进行扩容操作,若 超出最大容量,会抛出java.lang.IndexOutOfBoundsException
异常;
部分源码如下所示:
static final int DEFAULT_INITIAL_CAPACITY = 256;
static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;
@Override
public ByteBuf directBuffer() {
return directBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);
}
@Override
public ByteBuf heapBuffer() {
return heapBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);
}
复制代码
创建
ByteBufAllocator
接口提供了一种分配 ByteBuf
实例的抽象方法,而 DEFAULT
静态成员则提供了该接口的默认实现,buffer()
方法分配了一个新的 ByteBuf
实例。部分源代码如下所示:
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
@Override
public ByteBuf buffer() {
if (directByDefault) {
return directBuffer();
}
return heapBuffer();
}
复制代码
测试代码:
public class TestByteBuf {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
System.out.println(buf);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 50; i++) {
sb.append("sidiot");
}
buf.writeBytes(sb.toString().getBytes());
System.out.println(buf);
}
}
复制代码
运行结果:
PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)
复制代码
这里观察运行结果发现,只是一些简单的数据显示,并没有 ByteBuf 中的详细内容,因此编写一个调试工具方法来帮助我们更为详细地查看 ByteBuf 中的内容:
import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
复制代码
运行结果:
写入
接下来,讲解一下几个有点意思的常用写入方法:
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> {
public abstract ByteBuf writeBoolean(boolean value);
public abstract ByteBuf writeInt(int value);
public abstract ByteBuf writeIntLE(int value);
}
复制代码
writeBoolean
方法用一字节 01|00
代表 true|false
,部分源码如下所示:
@Override
public ByteBuf writeBoolean(boolean value) {
writeByte(value ? 1 : 0);
return this;
}
@Override
public ByteBuf writeByte(int value) {
ensureWritable0(1);
_setByte(writerIndex++, value);
return this;
}
复制代码
编写一个测试方法 testWriteBoolean
:
public static void testWriteBoolean(ByteBuf buf) {
buf.writeBoolean(true);
buf.writeBoolean(true);
buf.writeBoolean(false);
log(buf);
}
复制代码
运行结果:
read index:0 write index:3 capacity:256
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 01 00 |... |
+--------+-------------------------------------------------+----------------+
复制代码
部分源码如下所示:
@Override
public ByteBuf writeInt(int value) {
ensureWritable0(4);
_setInt(writerIndex, value);
writerIndex += 4;
return this;
}
@Override
public ByteBuf writeIntLE(int value) {
ensureWritable0(4);
_setIntLE(writerIndex, value);
writerIndex += 4;
return this;
}
复制代码
编写一个测试方法 testWriteInt
:
public static void testWriteInt(ByteBuf buf) {
buf.writeInt(1314);
log(buf);
}
复制代码
编写一个测试方法 testWriteIntLE
:
public static void testWriteIntLE(ByteBuf buf) {
buf.writeIntLE(1314);
log(buf);
}
复制代码
运行结果:
testWriteInt - value: 1314
read index:0 write index:4 capacity:256
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 05 22 |..." |
+--------+-------------------------------------------------+----------------+
testWriteIntLE - value: 1314
read index:0 write index:4 capacity:256
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 22 05 00 00 |"... |
+--------+-------------------------------------------------+----------------+
复制代码
一般来说,小端字节序在计算机内部处理时更高效,因为计算都是从低位开始的;大端字节序在网络传输和文件存储时更方便,因为符号位在第一个字节,容易判断正负。
在计算机系统中,CPU 和操作系统的设计决定了字节序的采用方式。x86 架构的 CPU 采用小端字节序,因此大多数 PC 和手机等设备也都是采用小端字节序。
然而,在网络通信中,由于涉及到不同设备之间的数据交换,为了确保数据的正确传输和解析,需要使用一个固定的字节序。因此,网络通信协议一般规定采用大端字节序进行数据传输,例如 TCP/IP 协议中就采用了大端字节序。
在进行网络通信时,如果通信双方的字节序相同,则可以直接传输数据。但是,如果通信双方的字节序不同,则需要进行字节序转换(即将数据从一种字节序转换成另一种字节序)。为了确保数据传输的正确性和效率,字节序转换一般会在网络协议层完成,例如在 TCP/IP 协议栈中进行处理。
读取
1、获取当前可读取的字节数:可以通过调用 ByteBuf 的 readableBytes()
方法获取当前可读取的字节数。
readableBytes()
的源码如下:
@Override
public int readableBytes() {
return writerIndex - readerIndex;
}
复制代码
通过源码可以获知,可读部分就是写指针与读指针之间的空间;
测试代码:
public static void testReadableBytes(ByteBuf buf) {
buf.writeBytes(new byte[]{'s', 'i', 'd', 'i', 'o', 't'});
log(buf);
System.out.println("当前可读取的字节数为" + buf.readableBytes());
buf.readByte();
log(buf);
System.out.println("当前可读取的字节数为" + buf.readableBytes());
}
复制代码
运行结果:
read index:0 write index:6 capacity:256
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 69 64 69 6f 74 |sidiot |
+--------+-------------------------------------------------+----------------+
当前可读取的字节数为6
read index:1 write index:6 capacity:256
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 69 64 69 6f 74 |idiot |
+--------+-------------------------------------------------+----------------+
当前可读取的字节数为5
复制代码
2、读取指定长度的字节数据:可以通过调用 ByteBuf 的 readBytes()
方法读取指定长度的字节数据并存储到一个字节数组中。
readBytes()
的源码如下:
@Override
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
checkReadableBytes(length);
getBytes(readerIndex, dst, dstIndex, length);
readerIndex += length;
return this;
}
复制代码
通过源码可以获知,该方法会先调用 checkReadableBytes
方法检查是否有足够的可读字节数,如果不足则抛出异常;
然后调用 getBytes
方法将数据从 ByteBuf
中读取到目标字节数组中:从当前 ByteBuf
的读索引位置 readerIndex
开始,将指定长度的数据读取到目标字节数组 dst
的指定位置 dstIndex
开始的地方,并将读索引位置增加相应的长度;
测试代码:
public static void testReadBytes(ByteBuf buf) {
buf.writeBytes(new byte[]{'s', 'i', 'd', 'i', 'o', 't'});
byte[] bytes = new byte[3];
buf.readBytes(bytes);
System.out.println(new String(bytes));
log(buf);
}
复制代码
运行结果:
sid
read index:3 write index:6 capacity:256
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 69 6f 74 |iot |
+--------+-------------------------------------------------+----------------+
复制代码
3、读取单个字节数据:可以通过调用 ByteBuf 的 readByte()
方法读取单个字节数据。
readByte()
的源码如下:
@Override
public byte readByte() {
checkReadableBytes0(1);
int i = readerIndex;
byte b = _getByte(i);
readerIndex = i + 1;
return b;
}
复制代码
通过源码可以获知,该方法会先调用 checkReadableBytes0(1)
方法检查可读字节数是否大于等于 1,如果小于 1,则抛出异常,然后从读指针 readerIndex
的当前位置读取一个字节,并将其作为返回值;
测试代码:
public static void testReadByte(ByteBuf buf) {
buf.writeBytes(new byte[]{'s', 'i', 'd', 'i', 'o', 't'});
System.out.println((char)buf.readByte());
System.out.println((char)buf.readByte());
log(buf);
}
复制代码
运行结果:
s
i
read index:2 write index:6 capacity:256
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 64 69 6f 74 |diot |
+--------+-------------------------------------------------+----------------+
复制代码
4、读取基本类型数据:可以通过调用 ByteBuf 相应的读取方法读取基本类型数据。
例如:
readBoolean()
:读取布尔类型数据;
readShort()
:读取短整型数据;
readInt()
:读取整型数据;
readLong()
:读取长整型数据;
readFloat()
:读取浮点型数据;
readDouble()
:读取双精度浮点型数据;
readInt()
的源码如下:
@Override
public int readInt() {
checkReadableBytes0(4);
int v = _getInt(readerIndex);
readerIndex += 4;
return v;
}
复制代码
通过源码可以获知,该方法与上述的 readByte()
方法相近,因为在 Java 中 Int 是四个字节的,所以 checkReadableBytes0(4);
;
测试代码:
public static void testReadInt(ByteBuf buf) {
buf.writeInt(6);
System.out.println(buf.readInt());
}
复制代码
运行结果:
在上述提到的读取方法中,但凡是被读取的数据都会进入废弃部分,不能被再次读取,如果需要重复读取,需要调用 ByteBuf 的 markReaderIndex()
对读指针进行标记,并通过 ByteBuf 的 resetReaderIndex()
将读指针恢复到 mark 标记的位置;
源码如下:
@Override
public ByteBuf markReaderIndex() {
markedReaderIndex = readerIndex;
return this;
}
@Override
public ByteBuf resetReaderIndex() {
readerIndex(markedReaderIndex);
return this;
}
复制代码
通过源码可以获知,markReaderIndex()
方法是将当前的读指针位置 readerIndex
赋值给 mark 读指针标记 markedReaderIndex
,在使用 resetReaderIndex()
方法时,将 markedReaderIndex
重新赋值给 readerIndex
,以实现重复读取;
测试代码:
public static void testReadRepeat(ByteBuf buf) {
buf.writeBytes(new byte[]{'s', 'i', 'd', 'i', 'o', 't'});
log(buf);
buf.markReaderIndex();
System.out.println((char)buf.readByte());
System.out.println((char)buf.readByte());
log(buf);
System.out.println("resetReaderIndex");
buf.resetReaderIndex();
log(buf);
}
复制代码
运行结果:
read index:0 write index:6 capacity:256
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 69 64 69 6f 74 |sidiot |
+--------+-------------------------------------------------+----------------+
s
i
read index:2 write index:6 capacity:256
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 64 69 6f 74 |diot |
+--------+-------------------------------------------------+----------------+
resetReaderIndex
read index:0 write index:6 capacity:256
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 69 64 69 6f 74 |sidiot |
+--------+-------------------------------------------------+----------------+
复制代码
后记
以上就是 ByteBuf 的基本使用 的所有内容了,希望本篇博文对大家有所帮助!
参考:
评论