写点什么

netty 系列之: 使用 POJO 替代 buf

发布于: 4 小时前
netty系列之:使用POJO替代buf

简介在之前的文章中我们提到了,对于 NioSocketChannel 来说,它不接收最基本的 string 消息,只接收 ByteBuf 和 FileRegion。但是 ByteBuf 是以二进制的形式进行处理的,对于程序员来说太不直观了,处理起来也比较麻烦,有没有可能直接处理 java 简单对象呢?本文将会探讨一下这个问题。


decode 和 encode 比如我们需要直接向 channel 中写入一个字符串,在之前的文章中,我们知道这是不可以的,会报下面的错误:


DefaultChannelPromise@57f5c075(failure: java.lang.UnsupportedOperationException: unsupported message type: String (expected: ByteBuf, FileRegion))也就说 ChannelPromise 只接受 ByteBuf 和 FileRegion,那么怎么做呢?


既然 ChannelPromise 只接受 ByteBuf 和 FileRegion,那么我们就需要把 String 对象转换成 ByteBuf 即可。


也就是说在写入 String 之前把 String 转换成 ByteBuf,当要读取数据的时候,再把 ByteBuf 转换成 String。


我们知道 ChannelPipeline 中可以添加多个 handler,并且控制这些 handler 的顺序。


那么我们的思路就出来了,在 ChannelPipeline 中添加一个 encode,用于数据写入的是对数据进行编码成 ByteBuf,然后再添加一个 decode,用于在数据写出的时候对数据进行解码成对应的对象。


encode,decode 是不是很熟悉?对了,这就是对象的序列化。


对象序列化 netty 中对象序列化是要把传输的对象和 ByteBuf 直接互相转换,当然我们可以自己实现这个转换对象。但是 netty 已经为我们提供了方便的两个转换类:ObjectEncoder 和 ObjectDecoder。


先看 ObjectEncoder,他的作用就是将对象转换成为 ByteBuf。


这个类很简单,我们对其分析一下:


public class ObjectEncoder extends MessageToByteEncoder<Serializable> {private static final byte[] LENGTH_PLACEHOLDER = new byte[4];


@Overrideprotected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {    int startIdx = out.writerIndex();
ByteBufOutputStream bout = new ByteBufOutputStream(out); ObjectOutputStream oout = null; try { bout.write(LENGTH_PLACEHOLDER); oout = new CompactObjectOutputStream(bout); oout.writeObject(msg); oout.flush(); } finally { if (oout != null) { oout.close(); } else { bout.close(); } }
int endIdx = out.writerIndex();
out.setInt(startIdx, endIdx - startIdx - 4);}
复制代码


}ObjectEncoder 继承了 MessageToByteEncoder,而 MessageToByteEncoder 又继承了 ChannelOutboundHandlerAdapter。为什么是 OutBound 呢?这是因为我们是要对写入的对象进行转换,所以是 outbound。


首先使用 ByteBufOutputStream 对 out ByteBuf 进行封装,在 bout 中,首先写入了一个 LENGTH_PLACEHOLDER 字段,用来表示 stream 中中 Byte 的长度。然后用一个 CompactObjectOutputStream 对 bout 进行封装,最后就可以用 CompactObjectOutputStream 写入对象了。


对应的,netty 还有一个 ObjectDecoder 对象,用于将 ByteBuf 转换成对应的对象,ObjectDecoder 继承自 LengthFieldBasedFrameDecoder,实际上他是一个 ByteToMessageDecoder,也是一个 ChannelInboundHandlerAdapter,用来对数据读取进行处理。


我们看下 ObjectDecoder 中最重要的 decode 方法:


protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {    ByteBuf frame = (ByteBuf) super.decode(ctx, in);    if (frame == null) {        return null;    }
ObjectInputStream ois = new CompactObjectInputStream(new ByteBufInputStream(frame, true), classResolver); try { return ois.readObject(); } finally { ois.close(); }}
复制代码


上面的代码可以看到,将输入的 ByteBuf 转换为 ByteBufInputStream,最后转换成为 CompactObjectInputStream,就可以直接读取对象了。


使用编码和解码器有了上面两个编码解码器,直接需要将其添加到 client 和 server 端的 ChannelPipeline 中就可以了。


对于 server 端,其核心代码如下:


//定义 bossGroup 和 workerGroupEventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(// 添加 encoder 和 decodernew ObjectEncoder(),new ObjectDecoder(ClassResolvers.cacheDisabled(null)),new PojoServerHandler());}});


        // 绑定端口,并准备接受连接        b.bind(PORT).sync().channel().closeFuture().sync();
复制代码


同样的,对于 client 端,我们其核心代码如下:


EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(// 添加 encoder 和 decodernew ObjectEncoder(),new ObjectDecoder(ClassResolvers.cacheDisabled(null)),new PojoClientHandler());}});


        // 建立连接        b.connect(HOST, PORT).sync().channel().closeFuture().sync();
复制代码


可以看到上面的逻辑就是将 ObjectEncoder 和 ObjectDecoder 添加到 ChannelPipeline 中即可。


最后,就可以在客户端和浏览器端通过调用:


ctx.write("加油!");直接写入字符串对象了。


总结有了 ObjectEncoder 和 ObjectDecoder,我们就可以不用受限于 ByteBuf 了,程序的灵活程度得到了大幅提升。


本文的例子可以参考:learn-netty4


本文已收录于 http://www.flydean.com/08-netty-pojo-buf/


最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!


欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

发布于: 4 小时前阅读数: 3
用户头像

关注公众号:程序那些事,更多精彩等着你! 2020.06.07 加入

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧,尽在公众号:程序那些事!

评论

发布
暂无评论
netty系列之:使用POJO替代buf