前言
本篇博文是《从 0 到 1 学习 Netty》中 NIO 系列的第四篇博文,主要内容是介绍如何处理消息边界以及通过可写事件解决写入内容过多的问题,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;
消息边界
将缓冲区的大小设置为 4 个字节,发送以下消息时:
sc.write(Charset.defaultCharset().encode("你好,sidiot!"));
复制代码
运行结果:
这是因为 UTF-8 字符集下,1 个汉字占用 3 个字节,此时缓冲区大小为 4 个字节,一次读时间无法处理完通道中的所有数据,所以会触发多次读事件。这导致其他几个中文字符被拆分开来发送,因此解码时就会出现如上问题。
一般的解决思路有以下三种:
固定消息长度,数据包大小一样,服务器按预定长度读取,当发送的数据较少时,需要将数据进行填充,直到长度与消息规定长度一致,缺点是浪费带宽;
按分隔符拆分,缺点是效率低,需要一个一个字符地去匹配分隔符;
TLV 格式,即 Type 类型、Length 长度、Value 数据,也就是在消息开头用一些空间存放后面数据的长度,如 HTTP 请求头中的 Content-Type 与 Content-Length。类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 的吞吐量;
Http 1.1 是 TLV 格式;
Http 2.0 是 LTV 格式;
接下来通过按分隔符拆分的方式来处理消息边界问题;
先编写一个 split() 函数,用于处理将 buffer 的内容按分隔符进行拆分,代码如下:
private static void split(ByteBuffer buffer) { buffer.flip(); for(int i=0; i<buffer.limit(); i++) { if (buffer.get(i) == '\n') { int length = i + 1 - buffer.position(); ByteBuffer target = ByteBuffer.allocate(length); for(int j=0; j<length; j++) { target.put(buffer.get()); } debugAll(target); } } buffer.compact();}
复制代码
然后再看到 ByteBuffer 类,虽然 ByteBuffer 是线程安全的,但是它并不是设计用于多线程并发访问,如果多个线程同时访问同一个 ByteBuffer 对象,那么可能会出现数据竞争和一致性问题,因此,我们需要确保每个 Channel 都有自己的 ByteBuffer 对象,来避免共享;
这时就要看到 register 函数:
public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
复制代码
这第三个参数 att 表示附件的意思,即可以向其中放入一个 Object 类型的对象,该对象会与登记的 Channel 以及其对应的 SelectionKey 绑定,可以从 SelectionKey 获取到对应 Channel 的附件,我们可以将 buffer 放入其中:
ByteBuffer buffer = ByteBuffer.allocate(16);channel.register(selector, SelectionKey.OP_READ, buffer);
复制代码
之后可以通过 SelectionKey 的 attachment() 方法获得附件:
ByteBuffer buf = (ByteBuffer) key.attachment();
复制代码
此外,还要注意 buffer 的大小,如果发送内容的大小要大于 buffer 的大小,则会出现消息丢失的情况,比如要发送 "Hello, World! --sid10t.\n",由于 buffer 为 16,最后接收到的只有 Hello, World! --,但是因为采用了按分隔符拆分,控制台不会输出任何字符;
因此,需要对 buffer 进行动态扩容,代码如下:
if (buf.position() == buf.limit()) { ByteBuffer newBuf = ByteBuffer.allocate(buf.capacity() * 2); buf.flip(); newBuf.put(buf); key.attach(newBuf);}
复制代码
上述代码是考虑到 split() 函数中使用的是 compact() 方法,因此当 position 与 limit 相等时,说明缓冲区中的数据并未被读取(容量太小),此时创建新的缓冲区,其大小扩大至原先的两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,并调用 SelectionKey 的 attach() 方法,将新的缓冲区作为新的附件放入 SelectionKey 中;
整体代码如下所示:
@Slf4jpublic class MSGBoundary {
public static void main(String[] args) { try { Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false);
SelectionKey sscKey = ssc.register(selector, SelectionKey.OP_ACCEPT); log.debug("Register Key: {}", sscKey);
ssc.bind(new InetSocketAddress(7999));
while (true) { selector.select();
Set<SelectionKey> keySet = selector.selectedKeys(); Iterator<SelectionKey> iter = keySet.iterator();// log.debug("count: {}", keySet.size());
while (iter.hasNext()) { SelectionKey key = iter.next(); log.debug("Selection Key: {}", key);
if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); sc.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(16); sc.register(selector, SelectionKey.OP_READ, buffer); log.debug("sc Key: {}", sc); iter.remove(); } else if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment();
try { int read = channel.read(buf); log.debug("read: {}", read); if (read <= 0) { key.cancel(); channel.close(); } else { split(buf); if (buf.position() == buf.limit()) { ByteBuffer newBuf = ByteBuffer.allocate(buf.capacity() * 2); buf.flip(); newBuf.put(buf); key.attach(newBuf); } } } catch (IOException e) { e.printStackTrace(); key.cancel(); } finally { iter.remove(); } } } } } catch (IOException e) { e.printStackTrace(); } }
private static void split(ByteBuffer buffer) { buffer.flip(); for(int i=0; i<buffer.limit(); i++) { if (buffer.get(i) == '\n') { int length = i + 1 - buffer.position(); ByteBuffer target = ByteBuffer.allocate(length); for(int j=0; j<length; j++) { target.put(buffer.get()); } debugAll(target); } } buffer.compact(); }}
复制代码
运行结果:
22:18:16 [DEBUG] [main] c.s.n.c.MSGBoundary - Register Key: channel=sun.nio.ch.ServerSocketChannelImpl[unbound], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=022:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - Selection Key: channel=sun.nio.ch.ServerSocketChannelImpl[/[0:0:0:0:0:0:0:0]:7999], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=1622:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - sc Key: java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:52604]22:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - Selection Key: channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:52604], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=1, readyOps=122:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - read: 1622:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - Selection Key: channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:52604], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=1, readyOps=122:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - read: 8
+--------+-------------------- all ------------------------+----------------+position: [24], limit: [24] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 48 65 6c 6c 6f 2c 20 57 6f 72 6c 64 21 20 2d 2d |Hello, World! --||00000010| 73 69 64 31 30 74 2e 0a |sid10t.. |+--------+-------------------------------------------------+----------------+
复制代码
这里还需要考虑一个问题,就是 Bytebuffer 的大小,ByteBuffer 不能太大,比如一个 ByteBuffer 的大小为 1MB 的话,如果要支持百万连接就要 1TB 内存,因此需要设计大小可变的 ByteBuffer:
可写事件
服务器通过 Buffer 向通道中写入数据时,可能会遇到通道容量小于 Buffer 中的数据大小,导致无法一次性将 Buffer 中的数据全部写入到 Channel 中,这时便需要分多次写入,通过 hasRemaining() 方法来判断 Buffer 中是否还有数据,代码如下:
StringBuilder sb = new StringBuilder(); for (int i = 0; i < 5000000; i++) { sb.append("sidiot"); } ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString()); while (buffer.hasRemaining()) { int write = sc.write(buffer); System.out.println(write); }
复制代码
客户端通过循环来接收数据,代码如下:
int cnt = 0; while (true) { ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); cnt += sc.read(buffer); System.out.println(cnt); buffer.clear(); }
复制代码
运行结果:
# WriteServer4718556301463340632010471855602490349002621420026214200262142002621420...509025
复制代码
上述结果出现 0 是因为缓冲区还没消费完, 无法进行写入,这样子会导致滞留在此,性能低下;
接下来,优化一下代码,通过 Selector 进行处理,提高效率:
while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); sc.configureBlocking(false); SelectionKey scKey = sc.register(selector, 0, null); scKey.interestOps(SelectionKey.OP_READ);
// 1. 向客户端发送大量数据 StringBuilder sb = new StringBuilder(); for (int i = 0; i < 5000000; i++) { sb.append("sidiot"); } ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 2. 返回值代表实际写入的字节数 int write = sc.write(buffer); System.out.println(write);
// 3. 判断是否有剩余内存 if (buffer.hasRemaining()) { // 4. 关注可写事件 1+4 scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE); // 5. 将未写完的数据挂到 scKey 上 scKey.attach(buffer); } } else if (key.isWritable()) { ByteBuffer buffer = (ByteBuffer) key.attachment(); SocketChannel sc = (SocketChannel) key.channel(); int write = sc.write(buffer); System.out.println(write); // 6. 清理操作 if (!buffer.hasRemaining()) { key.attach(null); key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); // 不在关注写事件 } }}
复制代码
注意,这里需要使用组合事件类型,即 scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);,如果只有 SelectionKey.OP_WRITE 意味着写事件覆盖原先的读事件;
运行结果:
# WriteServer209713626214203014633380105968156923932132228207393213222820752428401164380
# WriteClient131071262142393213524284...29622046297531172988418830000000
复制代码
后记
以上就是 消息边界与可写事件 的所有内容了,希望本篇博文对大家有所帮助!
参考:
📝 上篇精讲:「NIO」(三)剖析 Selector
💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注;
👍 创作不易,请多多支持;
🔥 系列专栏:探索 Netty:源码解析与应用案例分享
评论