写点什么

Netty 中序列化框架 MessagePack 的简单实现,java 技术面试完了复试

作者:Java高工P7
  • 2021 年 11 月 10 日
  • 本文字数:2498 字

    阅读完需:约 8 分钟

  • @param ctx 上下文

  • @param msg 需要编码的对象

  • @param out 编码后的数据


*/


@Override


protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {


MessagePack msgpack = new MessagePack();


// 对对象进行序列化


byte[] raw = msgpack.write(msg);


// 返回序列化的数据


out.writeBytes(raw);


}


解码器


/**


  • @param ctx 上下文

  • @param msg 需要解码的数据

  • @param out 解码列表


*/


@Override


protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {


final byte[] array;


final int length = msg.readableBytes();


array = new byte[length];


// 获取需要解码的字节数组


msg.getBytes(msg.readerIndex(), array,0,length);


MessagePack msgpack = new MessagePack();


// 反序列化并将结果保存到了解码列表中


out.add(msgpack.read(array));


}


3.客户端




EchoClient


/**


  • MsgPack 编解码器

  • @author 波波烤鸭

  • @email dengpbs@163.com


*/


public class EchoClient {


public static void main(String[] args) throws Exception {


int port = 8080;


if (args != null && args.length > 0) {


try {


port = Integer.valueOf(args[0]);


} catch (NumberFormatException e) {


// 采用默认值


}


}


new EchoClient().connector(port, "127.0.0.1",10);


}


public void connector(int port, String host,final int sendNumber) throws Exception {


// 配置客户端 NIO 线程组


EventLoopGroup group = new NioEventLoopGroup();


try {


Bootstrap b = new Bootstrap();


b.group(group).channel(NioSocketChannel.class)


.option(ChannelOption.TCP_NODELAY, true)


.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)


.handler(new ChannelInitializer<SocketChannel>() {


@Override


protected void initChannel(SocketChannel ch) throws Exception {


//这里设置通过增加包头表示报文长度来避免粘包


ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(1024, 0, 2,0,2));


//增加解码器


ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());


//这里设置读取报文的包头长度来避免粘包


ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));


//增加编码器


ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());


// 4.添加自定义的处理器


ch.pipeline().addLast(new EchoClientHandler(sendNumber));


}


});


// 发起异步连接操作


ChannelFuture f = b.connect(host, port).sync();


// 等待客户端链路关闭


f.channel().closeFuture().sync();


}catch(Exception e){


e.printStackTrace();


} finally {


// 优雅退出,释放 NIO 线程组


group.shutdownGracefully();


}


}


}


EchoClientHandler


/**


  • DelimiterBasedFrameDecoder 案例

  • 自定义处理器

  • @author 波波烤鸭

  • @email dengpbs@163.com


*/


public class EchoServerHandler extends ChannelHandlerAdapter{


@Override


public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {


//UserInfo user = (UserInfo) msg;


System.out.println("server receive the msgpack message :"+msg);


//ctx.writeAndFlush(user);


ctx.writeAndFlush(msg);


}


@Override


public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {


cause.printStackTrace();


ctx.close(); // 发生异常关闭链路


}


}


4.服务端




EchoServer


/**


  • MsgPack 编解码器

  • @author 波波烤鸭

  • @email dengpbs@163.com


*/


public class EchoServer {


public void bind(int port) throws Exception {


// 配置服务端的 NIO 线程组


// 服务端接受客户端的连接


NioEventLoopGroup bossGroup = new NioEventLoopGroup();


// 进行 SocketChannel 的网络读写


NioEventLoopGroup workerGroup = new NioEventLoopGroup();


try {


ServerBootstrap b = new ServerBootstrap();


b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)


.option(ChannelOption.SO_BACKLOG, 100)


.handler(new LoggingHandler(LogLevel.INFO))


.childHandler(new ChannelInitializer<SocketChannel>() {


@Override


protected void initChannel(SocketChannel ch) throws Exception {


ch.pipeline().addLast("frameDecoder",new LengthFi


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


eldBasedFrameDecoder(65535, 0, 2,0,2));


// 添加 msgpack 的编码和解码器


ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());


ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));


ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());


// 添加自定义的处理器


ch.pipeline().addLast(new EchoServerHandler());


}


});


// 绑定端口,同步等待成功


ChannelFuture f = b.bind(port).sync();


// 等待服务端监听端口关闭


f.channel().closeFuture().sync();


}catch(Exception e){


e.printStackTrace();


} finally {


// 优雅退出,释放线程池资源


bossGroup.shutdownGracefully();


workerGroup.shutdownGracefully();


}


}


public static void main(String[] args) throws Exception {


int port = 8080;


if(args!=null && args.length > 0){


try{


port = Integer.valueOf(args[0]);


}catch(NumberFormatException e){


// 采用默认值


}


}


new EchoServer().bind(port);


}


}


EchoServerHandler


/**


  • DelimiterBasedFrameDecoder 案例

  • 自定义处理器

  • @author 波波烤鸭

  • @email dengpbs@163.com


*/


public class EchoServerHandler extends ChannelHandlerAdapter{


@Override


public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {


//UserInfo user = (UserInfo) msg;


System.out.println("server receive the msgpack message :"+msg);


//ctx.writeAndFlush(user);


ctx.writeAndFlush(msg);


}


@Override


public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {


cause.printStackTrace();


ctx.close(); // 发生异常关闭链路


}


}


5.注意点(POJO)




用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
Netty中序列化框架MessagePack的简单实现,java技术面试完了复试