写点什么

netty 系列之: 基于流的数据传输

发布于: 4 小时前

简介我们知道由两种数据的传输方式,分别是字符流和字节流,字符流的意思是传输的对象就是字符串,格式已经被设置好了,发送方和接收方按照特定的格式去读取就行了,而字节流是指将数据作为最原始的二进制字节来进行传输。


今天给大家介绍一下在 netty 中的基于流的数据传输。


package 和 byte 熟悉 TCP/IP 协议的同学应该知道,在 TCP/IP 中,因为底层协议有支持的数据包的最大值,所以对于大数据传输来说,需要对数据进行拆分和封包处理,并将这些拆分组装过的包进行发送,最后在接收方对这些包进行组合。在各个包中有固定的结构,所以接收方可以很清楚的知道到底应该组合多少个包作为最终的结果。


那么对于 netty 来说,channel 中传输的是 ByteBuf,实际上最最最底层的就是 byte 数组。对于这种 byte 数组来说,接收方并不知道到底应该组合多少个 byte 来合成原来的消息,所以需要在接收端对收到的 byte 进行组合,从而生成最终的数据。


那么对于 netty 中的 byte 数据流应该怎么组合呢?我们接下来看两种组合方法。


手动组合这种组合的方式的基本思路是构造一个目标大小的 ByteBuf,然后将接收到的 byte 通过调用 ByteBuf 的 writeBytes 方法写入到 ByteBuf 中。最后从 ByteBuf 中读取对应的数据。


比如我们想从服务端发送一个 int 数字给客户端,一般来说 int 是 32bits,然后一个 byte 是 8bits,那么一个 int 就需要 4 个 bytes 组成。


在 server 端,可以建立一个 byte 的数组,数组中包含 4 个元素。将 4 个元素的 byte 发送给客户端,那么客户端该如何处理呢?


首先我们需要建立一个 clientHander,这个 handler 应该继承 ChannelInboundHandlerAdapter,并且在其 handler 被添加到 ChannelPipeline 的时候初始化一个包含 4 个 byte 的 byteBuf。


handler 被添加的时候会触发一个 handlerAdded 事件,所以我们可以这样写:


private ByteBuf buf;
@Overridepublic void handlerAdded(ChannelHandlerContext ctx) { //创建一个4个byte的缓冲器 buf = ctx.alloc().buffer(4); }
复制代码


上例中,我们从 ctx 分配了一个 4 个字节的缓冲器,并将其赋值给 handler 中的私有变量 buf。


当 handler 执行完毕,从 ChannelPipeline 中删除的时候,会触发 handlerRemoved 事件,在这个事件中,我们可以对分配的 Bytebuf 进行清理,通常来说,可以调用其 release 方法,如下所示:


public void handlerRemoved(ChannelHandlerContext ctx) {    buf.release(); // 释放buf    buf = null;}
复制代码


然后最关键的一步就是从 channel 中读取 byte 并将其放到 4 个字节的 byteBuf 中。在之前的文章中我们提到了,可以在 channelRead 方法中,处理消息读取的逻辑。


public void channelRead(ChannelHandlerContext ctx, Object msg) {    ByteBuf m = (ByteBuf) msg;    buf.writeBytes(m); // 写入一个byte    m.release();
if (buf.readableBytes() >= 4) { // 已经凑够4个byte,将4个byte组合称为一个int long result = buf.readUnsignedInt(); ctx.close(); }}
复制代码


每次触发 channelRead 方法,都会将读取到的一个字节的 byte 通过调用 writeBytes 方法写入 buf 中。当 buf 的可读 byte 大于等于 4 个的时候就说明 4 个字节已经读满了,可以对其进行操作了。


这里我们将 4 个字节组合成一个 unsignedInt,并使用 readUnsignedInt 方法从 buf 中读取出来组合称为一个 int 数字。


上面的例子虽然可以解决 4 个字节的 byte 问题,但是如果数据结构再负责一点,上面的方式就会力不从心,需要考虑太多的数据组合问题。接下来我们看另外一种方式。


Byte 的转换类 netty 提供了一个 ByteToMessageDecoder 的转换类,可以方便的对 Byte 转换为其他的类型。


我们只需要重新其中的 decode 方法,就可以实现对 ByteBuf 的转换:


   public class SquareDecoder extends ByteToMessageDecoder {        @Override       public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)               throws Exception {           out.add(in.readBytes(in.readableBytes()));       }   }
复制代码


上面的例子将 byte 从 input 转换到 output 中,当然,你还可以在上面的方法中进行格式转换,如下所示:


public class TimeDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 4) {return;}


    out.add(in.readBytes(4)); }
复制代码


}上面的例子会先判断 in 中是否有 4 个 byte,如果有就将其读出来放到 out 中去。那么有同学会问了,输入不是一个 byte 一个 byte 来的吗?为什么这里可以一次读取到 4 个 byte?这是因为 ByteToMessageDecoder 内置了一个缓存装置,所以这里的 in 实际上是一个缓存集合。


ReplayingDecodernetty 还提供了一个更简单的转换 ReplayingDecoder,如果使用 ReplayingDecoder 重新上面的逻辑就是这样的:


public class TimeDecoder extends ReplayingDecoder<Void> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {out.add(in.readBytes(4));}}只需要一行代码即可。


事实上 ReplayingDecoder 是 ByteToMessageDecoder 的子类,是在 ByteToMessageDecoder 上丰富了一些功能的结果。


他们两的区别在于 ByteToMessageDecoder 还需要通过调用 readableBytes 来判断是否有足够的可以读 byte,而使用 ReplayingDecoder 直接读取即可,它假设的是所有的 bytes 都已经接受成功了。


比如下面使用 ByteToMessageDecoder 的代码:


public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {


  @Override protected void decode(ChannelHandlerContext ctx,                         ByteBuf buf, List<Object> out) throws Exception {
if (buf.readableBytes() < 4) { return; }
buf.markReaderIndex(); int length = buf.readInt();
if (buf.readableBytes() < length) { buf.resetReaderIndex(); return; }
out.add(buf.readBytes(length)); }
复制代码


}


上例假设在 byte 的头部是一个 int 大小的数组,代表着 byte 数组的长度,需要先读取 int 值,然后再根据 int 值来读取对应的 byte 数据。


和下面的代码是等价的:


public class IntegerHeaderFrameDecoderextends ReplayingDecoder<Void> {


 protected void decode(ChannelHandlerContext ctx,                         ByteBuf buf, List<Object> out) throws Exception {
out.add(buf.readBytes(buf.readInt())); }
复制代码


}


上面代码少了判断的步骤。


那么这是怎么实现的呢?


事实上 ReplayingDecoder 会传递一个会抛出 Error 的 ByteBuf , 当 ByteBuf 读取的 byte 个数不满足要求的时候,会抛出异常,当 ReplayingDecoder 捕获到这个异常之后,会重置 buffer 的 readerIndex 到最初的状态,然后等待后续的数据进来,然后再次调用 decode 方法。


所以,ReplayingDecoder 的效率会比较低,为了解决这个问题,netty 提供了 checkpoint() 方法。这是一个保存点,当报错的时候,可以不会退到最初的状态,而是回退到 checkpoint() 调用时候保存的状态,从而可以减少不必要的浪费。


总结本文介绍了在 netty 中进行 stream 操作和变换的几种方式,希望大家能够喜欢。


本文已收录于 http://www.flydean.com/07-netty-stream-based-transport/


最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!


欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

发布于: 4 小时前阅读数: 3
用户头像

关注公众号:程序那些事,更多精彩等着你! 2020.06.07 加入

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧,尽在公众号:程序那些事!

评论

发布
暂无评论
netty系列之:基于流的数据传输