写点什么

netty 系列之:netty 中的自动解码器 ReplayingDecoder

作者:程序那些事
  • 2022 年 4 月 11 日
  • 本文字数:3625 字

    阅读完需:约 12 分钟

netty系列之:netty中的自动解码器ReplayingDecoder

简介

netty 提供了一个从 ByteBuf 到用户自定义的 message 的解码器叫做 ByteToMessageDecoder,要使用这个 decoder,我们需要继承这个 decoder,并实现 decode 方法,从而在这个方法中实现 ByteBuf 中的内容到用户自定义 message 对象的转换。


那么在使用 ByteToMessageDecoder 的过程中会遇到什么问题呢?为什么又会有一个 ReplayingDecoder 呢?带着这个问题我们一起来看看吧。

ByteToMessageDecoder 可能遇到的问题

要想实现自己的解码器将 ByteBuf 转换成为自己的消息对象,可以继承 ByteToMessageDecoder,然后实现其中的 decode 方法即可,先来看下 decode 方法的定义:


     protected void decode(ChannelHandlerContext ctx,                             ByteBuf buf, List<Object> out) throws Exception 
复制代码


输入的参数中 buf 是要解码的 ByteBuf,out 是解码过后的对象列表,我们需要把 ByteBuf 中的数据转换成为我们自己的对象加入 out 的 list 中。


那么这里可能会遇到一个问题,因为我们在调用 decode 方法的时候 buf 中的数据可能还没有准备好,比如我们需要一个 Integer,但是 buf 中的数据不够一个整数,那么就需要一些 buf 中数据逻辑的判断,我们以一个带有消息长度的 Buf 对象来描述一下这个过程。


所谓带有消息长度的 Buf 对象,就是说 Buf 消息中的前 4 位,构成了一个整数,这个整数表示的是 buf 中后续消息的长度。


所以我们读取消息进行转换的流程是,先读取前面 4 个字节,得到消息的长度,然后再读取该长度的字节,这就是我们真正要获取的消息内容。


来看一下如果是继承自 ByteToMessageDecoder 应该怎么实现这个逻辑呢?


   public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {        @Override     protected void decode(ChannelHandlerContext ctx,                             ByteBuf buf, List<Object> out) throws Exception {         if (buf.readableBytes() < 4) {          return;       }         buf.markReaderIndex();       int length = buf.readInt();         if (buf.readableBytes() < length) {          buf.resetReaderIndex();          return;       }         out.add(buf.readBytes(length));     }   }
复制代码


在 decode 中,我们首先需要判断 buf 中可读的字节有没有 4 个,没有的话直接返回。如果有,则先读取这 4 个字节的长度,然后再判断 buf 中的可读字节是否小于应该读取的长度,如果小于,则说明数据还没有准备好,需要调用 resetReaderIndex 进行重置。


最后,如果所有的条件都满足,才真正进行读取工作。


有没有一个办法可以不提前进行判断,可以直接按照自己想要的内容来读取 buf 的方式呢?答案就是 ReplayingDecoder。


我们先来看一下上面的例子用 ReplayingDecoder 重写是什么情况:


   public class IntegerHeaderFrameDecoder        extends ReplayingDecoder<Void> {       protected void decode(ChannelHandlerContext ctx,                             ByteBuf buf, List<Object> out) throws Exception {         out.add(buf.readBytes(buf.readInt()));     }   }
复制代码


使用 ReplayingDecoder,我们可以忽略 buf 是否已经接收到了足够的可读数据,直接读取即可。


相比之下 ReplayingDecoder 非常的简单。接下来,我们来探究一下 ReplayingDecoder 的实现原理。

ReplayingDecoder 的实现原理

ReplayingDecoder 实际上是 ByteToMessageDecoder 的一个子类,它的定义如下:


public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder 
复制代码


在 ByteToMessageDecoder 中,最重要的方法是 channelRead,在这个方法中实际调用了callDecode(ctx, cumulation, out);来实现 cumulation 到 out 的解码过程。


ReplayingDecoder 的秘密就在于对这个方法的重写,我们来看下这个方法的具体实现:


   protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {        replayable.setCumulation(in);        try {            while (in.isReadable()) {                int oldReaderIndex = checkpoint = in.readerIndex();                int outSize = out.size();                if (outSize > 0) {                    fireChannelRead(ctx, out, outSize);                    out.clear();                    if (ctx.isRemoved()) {                        break;                    }                    outSize = 0;                }                S oldState = state;                int oldInputLength = in.readableBytes();                try {                    decodeRemovalReentryProtection(ctx, replayable, out);                    if (ctx.isRemoved()) {                        break;                    }                    if (outSize == out.size()) {                        if (oldInputLength == in.readableBytes() && oldState == state) {                            throw new DecoderException(                                    StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " +                                    "data or change its state if it did not decode anything.");                        } else {                            continue;                        }                    }                } catch (Signal replay) {                    replay.expect(REPLAY);                    if (ctx.isRemoved()) {                        break;                    }
// Return to the checkpoint (or oldPosition) and retry. int checkpoint = this.checkpoint; if (checkpoint >= 0) { in.readerIndex(checkpoint); } else { } break; } if (oldReaderIndex == in.readerIndex() && oldState == state) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " + "or change its state if it decoded something."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); } }
复制代码


这里的实现和 ByteToMessageDecoder 不同的是 ReplayingDecoder 中定义了一个 checkpoint,这个 checkpint 是在尝试进行数据解码之初设置的:


int oldReaderIndex = checkpoint = in.readerIndex();
复制代码


如果是在解码的过程中出现了异常,则使用 checkpoint 重置 index:


    int checkpoint = this.checkpoint;         if (checkpoint >= 0) {            in.readerIndex(checkpoint);        } else {    }
复制代码


这里捕获的异常是 Signal,Signal 是什么呢?


Signal 是一个 Error 对象:


public final class Signal extends Error implements Constant<Signal> 
复制代码


这个异常是从 replayable 中抛出来的。


replayable 是一个特有的 ByteBuf 对象,叫做 ReplayingDecoderByteBuf:


final class ReplayingDecoderByteBuf extends ByteBuf
复制代码


在 ReplayingDecoderByteBuf 中定义了 Signal 属性:


    private static final Signal REPLAY = ReplayingDecoder.REPLAY;
复制代码


这个 Signal 异常是从 ReplayingDecoderByteBuf 中的 get 方法中抛出的,这里以 getInt 为例,看一下异常是如何抛出的:


    public int getInt(int index) {        checkIndex(index, 4);        return buffer.getInt(index);    }
复制代码


getInt 方法首先会去调用 checkIndex 方法进行 buff 中的长度检测,如果小于要读取的长度,则会抛出异常 REPLAY:


    private void checkIndex(int index, int length) {        if (index + length > buffer.writerIndex()) {            throw REPLAY;        }    }
复制代码


这就是 Signal 异常的由来。

总结

以上就是对 ReplayingDecoder 的介绍,虽然 ReplayingDecoder 好用,但是从它的实现可以看出,ReplayingDecoder 是通过抛出异常来不断的重试,所以在某些特殊的情况下会造成性能的下降。


也就是说在减少我们代码量的同时,降低了程序的执行效率。看来要想马儿跑又想马儿不吃草,这样的好事是不可能的了。


本文已收录于 http://www.flydean.com/14-4-netty-replayingdecoder/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

发布于: 21 小时前阅读数: 14
用户头像

关注公众号:程序那些事,更多精彩等着你! 2020.06.07 加入

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧,尽在公众号:程序那些事!

评论

发布
暂无评论
netty系列之:netty中的自动解码器ReplayingDecoder_Java_程序那些事_InfoQ写作平台