写点什么

netty 系列之:netty 中的核心 MessageToByte 编码器

作者:程序那些事
  • 2022 年 4 月 02 日
  • 本文字数:3532 字

    阅读完需:约 12 分钟

netty系列之:netty中的核心MessageToByte编码器

简介

之前的文章中,我们讲解了 netty 中从一个 message 转换成为另外一个 message 的框架叫做 MessageToMessage 编码器。但是 message to message 只考虑了 channel 中消息在处理过程中的转换,但是我们知道 channel 中最终传输的数据一定是 ByteBuf,所以我们还需要一个 message 和 ByteBuf 相互转换的框架,这个框架就叫做 MessageToByte。


注意,这里的 byte 指的是 ByteBuf 而不是 byte 这个字节类型。

MessageToByte 框架简介

为了方便扩展和用户的自定义,netty 封装了一套 MessageToByte 框架,这个框架中有三个核心的类,分别是 MessageToByteEncoder,ByteToMessageDecoder 和 ByteToMessageCodec。


我们分别看一下这三个核心类的定义:


public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter
复制代码


public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter 
复制代码


public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler 
复制代码


这三个类分别继承自 ChannelOutboundHandlerAdapter,ChannelInboundHandlerAdapter 和 ChannelDuplexHandler,分别表示的是向 channel 中写消息,从 channel 中读消息和一个向 channel 中读写消息的双向操作。


这三个类都是抽象类,接下来我们会详细分析这三个类的具体实现逻辑。

MessageToByteEncoder

先来看 encoder,如果你对比 MessageToByteEncoder 和 MessageToMessageEncoder 的源码实现,可以发现他们有诸多相似之处。


首先在 MessageToByteEncoder 中定义了一个用作消息类型匹配的 TypeParameterMatcher。


这个 matcher 用来匹配收到的消息类型,如果类型匹配则进行消息的转换操作,否则直接将消息写入 channel 中。


和 MessageToMessageEncoder 不同的是,MessageToByteEncoder 多了一个 preferDirect 字段,这个字段表示消息转换成为 ByteBuf 的时候是使用 diret Buf 还是 heap Buf。


这个字段的使用情况如下:


    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,                               boolean preferDirect) throws Exception {        if (preferDirect) {            return ctx.alloc().ioBuffer();        } else {            return ctx.alloc().heapBuffer();        }    }
复制代码


最后来看一下它的核心方法 write:


    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        ByteBuf buf = null;        try {            if (acceptOutboundMessage(msg)) {                @SuppressWarnings("unchecked")                I cast = (I) msg;                buf = allocateBuffer(ctx, cast, preferDirect);                try {                    encode(ctx, cast, buf);                } finally {                    ReferenceCountUtil.release(cast);                }
if (buf.isReadable()) { ctx.write(buf, promise); } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { buf.release(); } } }
复制代码


上面我们已经提到了,write 方法首先通过 matcher 来判断是否是要接受的消息类型,如果是的话就调用 encode 方法,将消息对象转换成为 ByteBuf,如果不是,则直接将消息写入 channel 中。


和 MessageToMessageEncoder 不同的是,encode 方法需要传入一个 ByteBuf 对象,而不是 CodecOutputList。


MessageToByteEncoder 有一个需要实现的抽象方法 encode 如下,


    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
复制代码

ByteToMessageDecoder

ByteToMessageDecoder 用来将 channel 中的 ByteBuf 消息转换成为特定的消息类型,其中 Decoder 中最重要的方法就是好 channelRead 方法,如下所示:


    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        if (msg instanceof ByteBuf) {            CodecOutputList out = CodecOutputList.newInstance();            try {                first = cumulation == null;                cumulation = cumulator.cumulate(ctx.alloc(),                        first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);                callDecode(ctx, cumulation, out);            } catch (DecoderException e) {                throw e;            } catch (Exception e) {                throw new DecoderException(e);            } finally {                try {                    if (cumulation != null && !cumulation.isReadable()) {                        numReads = 0;                        cumulation.release();                        cumulation = null;                    } else if (++numReads >= discardAfterReads) {                        numReads = 0;                        discardSomeReadBytes();                    }
int size = out.size(); firedChannelRead |= out.insertSinceRecycled(); fireChannelRead(ctx, out, size); } finally { out.recycle(); } } } else { ctx.fireChannelRead(msg); } }
复制代码


channelRead 接收要进行消息读取的 Object 对象,因为这里只接受 ByteBuf 消息,所以在方法内部调用了msg instanceof ByteBuf 来判断消息的类型,如果不是 ByteBuf 类型的消息则不进行消息的转换。


输出的对象是 CodecOutputList,在将 ByteBuf 转换成为 CodecOutputList 之后,调用 fireChannelRead 方法将 out 对象传递下去。


这里的关键就是如何将接收到的 ByteBuf 转换成为 CodecOutputList。


转换的方法叫做 callDecode,它接收一个叫做 cumulation 的参数,在上面的方法中,我们还看到一个和 cumulation 非常类似的名称叫做 cumulator。那么他们两个有什么区别呢?


在 ByteToMessageDecoder 中 cumulation 是一个 ByteBuf 对象,而 Cumulator 是一个接口,这个接口定义了一个 cumulate 方法:


    public interface Cumulator {        ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);    }
复制代码


Cumulator 用来将传入的 ByteBuf 合并成为一个新的 ByteBuf。


ByteToMessageDecoder 中定义了两种 Cumulator,分别是 MERGE_CUMULATOR 和 COMPOSITE_CUMULATOR。


MERGE_CUMULATOR 是将传入的 ByteBuf 通过 memory copy 的方式拷贝到目标 ByteBuf cumulation 中。


而 COMPOSITE_CUMULATOR 则是将 ByteBuf 添加到一个 CompositeByteBuf 的结构中,并不做 memory copy,因为目标的结构比较复杂,所以速度会比直接进行 memory copy 要慢。


用户要扩展的方法就是 decode 方法,用来将一个 ByteBuf 转换成为其他对象:


    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
复制代码

ByteToMessageCodec

最后要介绍的类是 ByteToMessageCodec,ByteToMessageCodec 表示的是 message 和 ByteBuf 之间的互相转换,它里面的 encoder 和 decoder 分别就是上面讲到的 MessageToByteEncoder 和 ByteToMessageDecoder。


用户可以继承 ByteToMessageCodec 来同时实现 encode 和 decode 的功能,所以需要实现 encode 和 decode 这两个方法:


    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
复制代码


ByteToMessageCodec 的本质就是封装了 MessageToByteEncoder 和 ByteToMessageDecoder,然后实现了编码和解码的功能。

总结

如果想实现 ByteBuf 和用户自定义消息的直接转换,那么选择 netty 提供的上面三个编码器是一个很好的选择。


本文已收录于 http://www.flydean.com/14-0-2-netty-codec-msg-to-bytebuf/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

发布于: 刚刚阅读数: 2
用户头像

关注公众号:程序那些事,更多精彩等着你! 2020.06.07 加入

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧,尽在公众号:程序那些事!

评论

发布
暂无评论
netty系列之:netty中的核心MessageToByte编码器_Java_程序那些事_InfoQ写作平台