简介
在 netty 中我们需要传递各种类型的消息,这些 message 可以是字符串,可以是数组,也可以是自定义的对象。不同的对象之间可能需要互相转换,这样就需要一个可以自由进行转换的转换器,为了统一编码规则和方便用户的扩展,netty 提供了一套消息之间进行转换的框架。本文将会讲解这个框架的具体实现。
框架简介
netty 为消息和消息之间的转换提供了三个类,这三个类都是抽象类,分别是 MessageToMessageDecoder,MessageToMessageEncoder 和 MessageToMessageCodec。
先来看下他们的定义:
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter
复制代码
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
复制代码
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler
复制代码
MessageToMessageEncoder 继承自 ChannelOutboundHandlerAdapter,负责向 channel 中写消息。
MessageToMessageDecoder 继承自 ChannelInboundHandlerAdapter,负责从 channel 中读取消息。
MessageToMessageCodec 继承自 ChannelDuplexHandler,它是一个双向的 handler,可以从 channel 中读取消息,也可以向 channel 中写入消息。
有了这三个抽象类,我们再看下这三个类的具体实现。
MessageToMessageEncoder
先看一下消息的编码器 MessageToMessageEncoder,编码器中最重要的方法就是 write,看下 write 的实现:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
CodecOutputList out = null;
try {
if (acceptOutboundMessage(msg)) {
out = CodecOutputList.newInstance();
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
encode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
if (out.isEmpty()) {
throw new EncoderException(
StringUtil.simpleClassName(this) + " must produce at least one message.");
}
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable t) {
throw new EncoderException(t);
} finally {
if (out != null) {
try {
final int sizeMinusOne = out.size() - 1;
if (sizeMinusOne == 0) {
ctx.write(out.getUnsafe(0), promise);
} else if (sizeMinusOne > 0) {
if (promise == ctx.voidPromise()) {
writeVoidPromise(ctx, out);
} else {
writePromiseCombiner(ctx, out, promise);
}
}
} finally {
out.recycle();
}
}
}
}
复制代码
write 方法接受一个需要转换的原始对象 msg,和一个表示 channel 读写进度的 ChannelPromise。
首先会对 msg 进行一个类型判断,这个判断方法是在 acceptOutboundMessage 中实现的。
public boolean acceptOutboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
复制代码
这里的 matcher 是一个 TypeParameterMatcher 对象,它是一个在 MessageToMessageEncoder 构造函数中初始化的属性:
protected MessageToMessageEncoder() {
matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
}
复制代码
这里的 I 就是要匹配的 msg 类型。
如果不匹配,则继续调用ctx.write(msg, promise);
将消息不做任何转换的写入到 channel 中,供下一个 handler 调用。
如果匹配成功,则会调用核心的 encode 方法:encode(ctx, cast, out);
注意,encode 方法在 MessageToMessageEncoder 中是一个抽象方法,需要用户在继承类中自行扩展。
encode 方法实际上是将 msg 对象转换成为要转换的对象,然后添加到 out 中。这个 out 是一个 list 对象,具体而言是一个 CodecOutputList 对象,作为一个 list,out 是一个可以存储多个对象的列表。
那么 out 是什么时候写入到 channel 中去的呢?
别急,在 write 方法中最后有一个 finally 代码块,在这个代码块中,会将 out 写入到 channel 里面。
因为 out 是一个 List,可能会出现 out 中的对象部分写成功的情况,所以这里需要特别处理。
首先判断 out 中是否只有一个对象,如果是一个对象,那么直接写到 channel 中即可。如果 out 中多于一个对象,那么又分成两种情况,第一种情况是传入的 promise 是一个 voidPromise,那么调用 writeVoidPromise 方法。
什么是 voidPromise 呢?
我们知道 Promise 有多种状态,可以通过 promise 的状态变化了解到数据写入的情况。对于 voidPromise 来说,它只关心一种失败的状态,其他的状态都不关心。
如果用户关心 promise 的其他状态,则会调用 writePromiseCombiner 方法,将多个对象的状态合并为一个 promise 返回。
事实上,在 writeVoidPromise 和 writePromiseCombiner 中,out 中的对象都是一个一个的取出来,写入到 channel 中的,所以才会生成多个 promise 和需要将 promise 进行合并的情况:
private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {
final ChannelPromise voidPromise = ctx.voidPromise();
for (int i = 0; i < out.size(); i++) {
ctx.write(out.getUnsafe(i), voidPromise);
}
}
private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {
final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
for (int i = 0; i < out.size(); i++) {
combiner.add(ctx.write(out.getUnsafe(i)));
}
combiner.finish(promise);
}
复制代码
MessageToMessageDecoder
和 encoder 对应的就是 decoder 了,MessageToMessageDecoder 的逻辑和 MessageToMessageEncoder 差不多。
首先也是需要判断读取的消息类型,这里也定义了一个 TypeParameterMatcher 对象,用来检测传入的消息类型:
protected MessageToMessageDecoder() {
matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
}
复制代码
decoder 中重要的方法是 channelRead 方法,我们看下它的实现:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
CodecOutputList out = CodecOutputList.newInstance();
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
decode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
} else {
out.add(msg);
}
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
int size = out.size();
for (int i = 0; i < size; i++) {
ctx.fireChannelRead(out.getUnsafe(i));
}
} finally {
out.recycle();
}
}
}
复制代码
首先检测 msg 的类型,只有接受的类型才进行 decode 处理,否则将 msg 加入到 CodecOutputList 中。
最后在 finally 代码块中将 out 中的对象一个个取出来,调用 ctx.fireChannelRead 进行读取。
消息转换的关键方法是 decode,这个方法也是一个抽象方法,需要在继承类中实现具体的功能。
MessageToMessageCodec
前面讲解了一个编码器和一个解码器,他们都是单向的。最后要讲解的 codec 叫做 MessageToMessageCodec,这个 codec 是一个双向的,即可以接收消息,也可以发送消息。
先看下它的定义:
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler
复制代码
MessageToMessageCodec 继承自 ChannelDuplexHandler,接收两个泛型参数分别是 INBOUND_IN 和 OUTBOUND_IN。
它定义了两个 TypeParameterMatcher,分别用来过滤 inboundMsg 和 outboundMsg:
protected MessageToMessageCodec() {
inboundMsgMatcher = TypeParameterMatcher.find(this, MessageToMessageCodec.class, "INBOUND_IN");
outboundMsgMatcher = TypeParameterMatcher.find(this, MessageToMessageCodec.class, "OUTBOUND_IN");
}
复制代码
分别实现了 channelRead 和 write 方法,用来读写消息:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
decoder.channelRead(ctx, msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
encoder.write(ctx, msg, promise);
}
复制代码
这里的 decoder 和 encoder 实际上就是前面我们讲到的 MessageToMessageDecoder 和 MessageToMessageEncoder:
private final MessageToMessageEncoder<Object> encoder = new MessageToMessageEncoder<Object>() {
@Override
public boolean acceptOutboundMessage(Object msg) throws Exception {
return MessageToMessageCodec.this.acceptOutboundMessage(msg);
}
@Override
@SuppressWarnings("unchecked")
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg, out);
}
};
private final MessageToMessageDecoder<Object> decoder = new MessageToMessageDecoder<Object>() {
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
return MessageToMessageCodec.this.acceptInboundMessage(msg);
}
@Override
@SuppressWarnings("unchecked")
protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg, out);
}
};
复制代码
可以看到 MessageToMessageCodec 实际上就是对 MessageToMessageDecoder 和 MessageToMessageEncoder 的封装,如果需要对 MessageToMessageCodec 进行扩展的话,需要实现下面两个方法:
protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
throws Exception;
复制代码
总结
netty 中提供的 MessageToMessage 的编码框架是后面对编码解码器进行扩展的基础。只有深入了解其中的原理,我们对于新的编码解码器运用起来才能得心应手。
本文已收录于 http://www.flydean.com/14-0-1-netty-codec-msg-to-msg/
最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!
欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!
评论