简介
netty 提供了一个从 ByteBuf 到用户自定义的 message 的解码器叫做 ByteToMessageDecoder,要使用这个 decoder,我们需要继承这个 decoder,并实现 decode 方法,从而在这个方法中实现 ByteBuf 中的内容到用户自定义 message 对象的转换。
那么在使用 ByteToMessageDecoder 的过程中会遇到什么问题呢?为什么又会有一个 ReplayingDecoder 呢?带着这个问题我们一起来看看吧。
ByteToMessageDecoder 可能遇到的问题
要想实现自己的解码器将 ByteBuf 转换成为自己的消息对象,可以继承 ByteToMessageDecoder,然后实现其中的 decode 方法即可,先来看下 decode 方法的定义:
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception
复制代码
输入的参数中 buf 是要解码的 ByteBuf,out 是解码过后的对象列表,我们需要把 ByteBuf 中的数据转换成为我们自己的对象加入 out 的 list 中。
那么这里可能会遇到一个问题,因为我们在调用 decode 方法的时候 buf 中的数据可能还没有准备好,比如我们需要一个 Integer,但是 buf 中的数据不够一个整数,那么就需要一些 buf 中数据逻辑的判断,我们以一个带有消息长度的 Buf 对象来描述一下这个过程。
所谓带有消息长度的 Buf 对象,就是说 Buf 消息中的前 4 位,构成了一个整数,这个整数表示的是 buf 中后续消息的长度。
所以我们读取消息进行转换的流程是,先读取前面 4 个字节,得到消息的长度,然后再读取该长度的字节,这就是我们真正要获取的消息内容。
来看一下如果是继承自 ByteToMessageDecoder 应该怎么实现这个逻辑呢?
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { if (buf.readableBytes() < 4) { return; } buf.markReaderIndex(); int length = buf.readInt(); if (buf.readableBytes() < length) { buf.resetReaderIndex(); return; } out.add(buf.readBytes(length)); } }
复制代码
在 decode 中,我们首先需要判断 buf 中可读的字节有没有 4 个,没有的话直接返回。如果有,则先读取这 4 个字节的长度,然后再判断 buf 中的可读字节是否小于应该读取的长度,如果小于,则说明数据还没有准备好,需要调用 resetReaderIndex 进行重置。
最后,如果所有的条件都满足,才真正进行读取工作。
有没有一个办法可以不提前进行判断,可以直接按照自己想要的内容来读取 buf 的方式呢?答案就是 ReplayingDecoder。
我们先来看一下上面的例子用 ReplayingDecoder 重写是什么情况:
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> { protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { out.add(buf.readBytes(buf.readInt())); } }
复制代码
使用 ReplayingDecoder,我们可以忽略 buf 是否已经接收到了足够的可读数据,直接读取即可。
相比之下 ReplayingDecoder 非常的简单。接下来,我们来探究一下 ReplayingDecoder 的实现原理。
ReplayingDecoder 的实现原理
ReplayingDecoder 实际上是 ByteToMessageDecoder 的一个子类,它的定义如下:
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
复制代码
在 ByteToMessageDecoder 中,最重要的方法是 channelRead,在这个方法中实际调用了callDecode(ctx, cumulation, out);来实现 cumulation 到 out 的解码过程。
ReplayingDecoder 的秘密就在于对这个方法的重写,我们来看下这个方法的具体实现:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { replayable.setCumulation(in); try { while (in.isReadable()) { int oldReaderIndex = checkpoint = in.readerIndex(); int outSize = out.size(); if (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear(); if (ctx.isRemoved()) { break; } outSize = 0; } S oldState = state; int oldInputLength = in.readableBytes(); try { decodeRemovalReentryProtection(ctx, replayable, out); if (ctx.isRemoved()) { break; } if (outSize == out.size()) { if (oldInputLength == in.readableBytes() && oldState == state) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " + "data or change its state if it did not decode anything."); } else { continue; } } } catch (Signal replay) { replay.expect(REPLAY); if (ctx.isRemoved()) { break; }
// Return to the checkpoint (or oldPosition) and retry. int checkpoint = this.checkpoint; if (checkpoint >= 0) { in.readerIndex(checkpoint); } else { } break; } if (oldReaderIndex == in.readerIndex() && oldState == state) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " + "or change its state if it decoded something."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); } }
复制代码
这里的实现和 ByteToMessageDecoder 不同的是 ReplayingDecoder 中定义了一个 checkpoint,这个 checkpint 是在尝试进行数据解码之初设置的:
int oldReaderIndex = checkpoint = in.readerIndex();
复制代码
如果是在解码的过程中出现了异常,则使用 checkpoint 重置 index:
int checkpoint = this.checkpoint; if (checkpoint >= 0) { in.readerIndex(checkpoint); } else { }
复制代码
这里捕获的异常是 Signal,Signal 是什么呢?
Signal 是一个 Error 对象:
public final class Signal extends Error implements Constant<Signal>
复制代码
这个异常是从 replayable 中抛出来的。
replayable 是一个特有的 ByteBuf 对象,叫做 ReplayingDecoderByteBuf:
final class ReplayingDecoderByteBuf extends ByteBuf
复制代码
在 ReplayingDecoderByteBuf 中定义了 Signal 属性:
private static final Signal REPLAY = ReplayingDecoder.REPLAY;
复制代码
这个 Signal 异常是从 ReplayingDecoderByteBuf 中的 get 方法中抛出的,这里以 getInt 为例,看一下异常是如何抛出的:
public int getInt(int index) { checkIndex(index, 4); return buffer.getInt(index); }
复制代码
getInt 方法首先会去调用 checkIndex 方法进行 buff 中的长度检测,如果小于要读取的长度,则会抛出异常 REPLAY:
private void checkIndex(int index, int length) { if (index + length > buffer.writerIndex()) { throw REPLAY; } }
复制代码
这就是 Signal 异常的由来。
总结
以上就是对 ReplayingDecoder 的介绍,虽然 ReplayingDecoder 好用,但是从它的实现可以看出,ReplayingDecoder 是通过抛出异常来不断的重试,所以在某些特殊的情况下会造成性能的下降。
也就是说在减少我们代码量的同时,降低了程序的执行效率。看来要想马儿跑又想马儿不吃草,这样的好事是不可能的了。
本文已收录于 http://www.flydean.com/14-4-netty-replayingdecoder/
最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!
欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!
评论