写点什么

netty 系列之:netty 对 marshalling 的支持

作者:程序那些事
  • 2022 年 4 月 26 日
  • 本文字数:5263 字

    阅读完需:约 17 分钟

netty系列之:netty对marshalling的支持

简介

在之前的文章中我们讲过了,jboss marshalling 是一种非常优秀的 java 对象序列化的方式,它可以兼容 JDK 自带的序列化,同时也提供了性能和使用上的优化。


那么这么优秀的序列化工具可不可以用在 netty 中作为消息传递的方式呢?


答案当然是肯定的,在 netty 中一切皆有可能。

netty 中的 marshalling provider

回顾一下 jboss marshalling 的常用用法,我们需要从 MarshallerFactory 中创建出 Marshaller,因为 mashaller 有不同的实现,所以需要指定具体的实现来创建 MarshallerFactory,如下所示:


MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("river");
复制代码


这个 MarshallerFactory 实际上就是一个 MarshallerProvider。


netty 中定义了这样的一个接口:


public interface MarshallerProvider {
Marshaller getMarshaller(ChannelHandlerContext ctx) throws Exception;}
复制代码


MarshallerProvider 实际上就做了和 MarshallerFactory 等同的工作。


既然 MarshallerProvider 是一个接口,那么它有哪些实现呢?


在 netty 中它有两个实现类,分别是 DefaultMarshallerProvider 和 ThreadLocalMarshallerProvider。


两者有什么区别呢?


先来看一下 DefaultMarshallerProvider:


public class DefaultMarshallerProvider implements MarshallerProvider {
private final MarshallerFactory factory; private final MarshallingConfiguration config;
public DefaultMarshallerProvider(MarshallerFactory factory, MarshallingConfiguration config) { this.factory = factory; this.config = config; }
public Marshaller getMarshaller(ChannelHandlerContext ctx) throws Exception { return factory.createMarshaller(config); }
}
复制代码


顾名思义,DefaultMarshallerProvider 就是 marshallerProvider 的默认实现,从具体的实现代码中,我们可以看出,DefaultMarshallerProvider 实际上需要传入 MarshallerFactory 和 MarshallingConfiguration 作为参数,然后使用传入的 MarshallerFactory 来创建具体的 marshaller Provider,和我们手动创建 marshaller 的方式是一致的。


但是上面的实现中每次 getMarshaller 都需要重新从 factory 中创建一个新的,性能上可能会有问题。所以 netty 又实现了一个新的 ThreadLocalMarshallerProvider:


public class ThreadLocalMarshallerProvider implements MarshallerProvider {    private final FastThreadLocal<Marshaller> marshallers = new FastThreadLocal<Marshaller>();
private final MarshallerFactory factory; private final MarshallingConfiguration config;
public ThreadLocalMarshallerProvider(MarshallerFactory factory, MarshallingConfiguration config) { this.factory = factory; this.config = config; }
@Override public Marshaller getMarshaller(ChannelHandlerContext ctx) throws Exception { Marshaller marshaller = marshallers.get(); if (marshaller == null) { marshaller = factory.createMarshaller(config); marshallers.set(marshaller); } return marshaller; }}
复制代码


ThreadLocalMarshallerProvider 和 DefaultMarshallerProvider 的不同之处在于,ThreadLocalMarshallerProvider 中保存了一个 FastThreadLocal 的对象,FastThreadLocal 是 JDK 中 ThreadLocal 的优化版本,比 ThreadLocal 更快。


在 getMarshaller 方法中,先从 FastThreadLocal 中 get 出 Marshaller 对象,如果 Marshaller 对象不存在,才从 factory 中创建出一个 Marshaller 对象,最后将 Marshaller 对象放到 ThreadLocal 中。


有 MarshallerProvider 就有和他对应的 UnMarshallerProvider:


public interface UnmarshallerProvider {
Unmarshaller getUnmarshaller(ChannelHandlerContext ctx) throws Exception;}
复制代码


netty 中的 UnmarshallerProvider 有三个实现类,分别是 DefaultUnmarshallerProvider,ThreadLocalUnmarshallerProvider 和 ContextBoundUnmarshallerProvider.


前面的两个 DefaultUnmarshallerProvider,ThreadLocalUnmarshallerProvider 跟 marshaller 的是实现是一样的,这里就不重复讲解了。


我们主要来看一下 ContextBoundUnmarshallerProvider 的实现。


从名字上我们可以看出,这个 unmarshaller 是和 ChannelHandlerContext 相关的。


ChannelHandlerContext 表示的是 channel 的上下文环境,它里面有一个方法叫做 attr,可以保存和 channel 相关的属性:


    <T> Attribute<T> attr(AttributeKey<T> key);
复制代码


ContextBoundUnmarshallerProvider 的做法就是将 Unmarshaller 存放到 context 中,每次使用的时候先从 context 中获取,如果没有取到再从 factroy 中获取。


我们来看下 ContextBoundUnmarshallerProvider 的实现:


public class ContextBoundUnmarshallerProvider extends DefaultUnmarshallerProvider {
private static final AttributeKey<Unmarshaller> UNMARSHALLER = AttributeKey.valueOf( ContextBoundUnmarshallerProvider.class, "UNMARSHALLER");
public ContextBoundUnmarshallerProvider(MarshallerFactory factory, MarshallingConfiguration config) { super(factory, config); }
@Override public Unmarshaller getUnmarshaller(ChannelHandlerContext ctx) throws Exception { Attribute<Unmarshaller> attr = ctx.channel().attr(UNMARSHALLER); Unmarshaller unmarshaller = attr.get(); if (unmarshaller == null) { unmarshaller = super.getUnmarshaller(ctx); attr.set(unmarshaller); } return unmarshaller; }}
复制代码


ContextBoundUnmarshallerProvider 继承自 DefaultUnmarshallerProvider,在 getUnmarshaller 方法首先从 ctx 取出 unmarshaller,如果没有的话,则调用 DefaultUnmarshallerProvider 中的 getUnmarshaller 方法取出 unmarshaller。

Marshalling 编码器

上面的章节中我们获取到了 marshaller,接下来看一下如何使用 marshaller 来进行编码和解码操作。


首先来看一下编码器 MarshallingEncoder,MarshallingEncoder 继承自 MessageToByteEncoder,接收的泛型是 Object:


public class MarshallingEncoder extends MessageToByteEncoder<Object>
复制代码


是将 Object 对象编码成为 ByteBuf。回顾一下之前我们讲到的通常对象的编码都需要用到一个对象长度的字段,用来分割对象的数据,同样的 MarshallingEncoder 也提供了一个 4 个字节的 LENGTH_PLACEHOLDER,用来存储对象的长度。


具体的看一下它的 encode 方法:


    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {        Marshaller marshaller = provider.getMarshaller(ctx);        int lengthPos = out.writerIndex();        out.writeBytes(LENGTH_PLACEHOLDER);        ChannelBufferByteOutput output = new ChannelBufferByteOutput(out);        marshaller.start(output);        marshaller.writeObject(msg);        marshaller.finish();        marshaller.close();
out.setInt(lengthPos, out.writerIndex() - lengthPos - 4); }
复制代码


encode 的逻辑很简单,首先从 provider 中拿到 marshaller 对象,然后先向 out 中写入 4 个字节的 LENGTH_PLACEHOLDER,接着使用 marshaller 向 out 中写入编码的对象,最后根据写入对象长度填充 out,得到最后的输出。


因为 encode 的数据保存的有长度数据,所以 decode 的时候就需要用到一个 frame decoder 叫做 LengthFieldBasedFrameDecoder。


通常有两种方式来使用 LengthFieldBasedFrameDecoder,一种是将 LengthFieldBasedFrameDecoder 加入到 pipline handler 中,decoder 只需要处理经过 frame decoder 处理过后的对象即可。


还有一种方法就是这个 decoder 本身就是一个 LengthFieldBasedFrameDecoder。


这里 netty 选择的是第二种方法,我们看下 MarshallingDecoder 的定义:


public class MarshallingDecoder extends LengthFieldBasedFrameDecoder
复制代码


首先需要在构造函数中指定 LengthFieldBasedFrameDecoder 的字段长度,这里调用了 super 方法来实现:


    public MarshallingDecoder(UnmarshallerProvider provider, int maxObjectSize) {        super(maxObjectSize, 0, 4, 0, 4);        this.provider = provider;    }
复制代码


并且重写了 extractFrame 方法:


    protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {        return buffer.slice(index, length);    }
复制代码


最后再看下 decode 方法:


    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {        ByteBuf frame = (ByteBuf) super.decode(ctx, in);        if (frame == null) {            return null;        }        Unmarshaller unmarshaller = provider.getUnmarshaller(ctx);        ByteInput input = new ChannelBufferByteInput(frame);        try {            unmarshaller.start(input);            Object obj = unmarshaller.readObject();            unmarshaller.finish();            return obj;        } finally {            unmarshaller.close();        }    }
复制代码


decode 的逻辑也很简单,首先调用 super 方法 decode 出 frame ByteBuf。然后再调用 unmarshaller 实现对象的读取,最后将改对象返回。

Marshalling 编码的另外一种实现

上面我们讲到对对象的编码使用的是 LengthFieldBasedFrameDecoder,根据对象实际数据之前的一个 length 字段来确定字段的长度,从而读取真实的数据。


那么可不可以不指定对象长度也能够准确的读取对象呢?


其实也是可以的,我们可以不断的尝试读取数据,直到找到合适的对象数据为止。


看过我之前文章的朋友可能就想到了,ReplayingDecoder 不就是做这个事情的吗?在 ReplayingDecoder 中会不断的重试,直到找到符合条件的消息为止。


于是 netty 基于 ReplayingDecoder 也有一个 marshalling 编码解码的实现,叫做 CompatibleMarshallingEncoder 和 CompatibleMarshallingDecoder。


CompatibleMarshallingEncoder 很简单,因为不需要对象的实际长度,所以直接使用 marshalling 编码即可。


public class CompatibleMarshallingEncoder extends MessageToByteEncoder<Object> {
private final MarshallerProvider provider;
public CompatibleMarshallingEncoder(MarshallerProvider provider) { this.provider = provider; }
@Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { Marshaller marshaller = provider.getMarshaller(ctx); marshaller.start(new ChannelBufferByteOutput(out)); marshaller.writeObject(msg); marshaller.finish(); marshaller.close(); }}
复制代码


CompatibleMarshallingDecoder 继承了 ReplayingDecoder:


public class CompatibleMarshallingDecoder extends ReplayingDecoder<Void> 
复制代码


它的 decode 方法的核心就是调用 unmarshaller 的方法:


Unmarshaller unmarshaller = provider.getUnmarshaller(ctx);        ByteInput input = new ChannelBufferByteInput(buffer);        if (maxObjectSize != Integer.MAX_VALUE) {            input = new LimitingByteInput(input, maxObjectSize);        }        try {            unmarshaller.start(input);            Object obj = unmarshaller.readObject();            unmarshaller.finish();            out.add(obj);        } catch (LimitingByteInput.TooBigObjectException ignored) {            discardingTooLongFrame = true;            throw new TooLongFrameException();        } finally {            unmarshaller.close();        }
复制代码


注意,这里解码的时候会有两种异常,第一种异常就是 unmarshaller.readObject 时候的异常,这种异常会被 ReplayingDecoder 捕获从而重试。


还有一种就是字段太长的异常,这种异常无法处理只能放弃:


    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        if (cause instanceof TooLongFrameException) {            ctx.close();        } else {            super.exceptionCaught(ctx, cause);        }    }
复制代码

总结

以上就是在 netty 中使用 marshalling 进行编码解码的实现。原理和对象编码解码是很类似的,大家可以对比分析一下。


本文已收录于 http://www.flydean.com/17-1-netty-marshalling/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

发布于: 2022 年 04 月 26 日阅读数: 15
用户头像

关注公众号:程序那些事,更多精彩等着你! 2020.06.07 加入

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧,尽在公众号:程序那些事!

评论

发布
暂无评论
netty系列之:netty对marshalling的支持_Java_程序那些事_InfoQ写作社区