写点什么

【Netty】「优化进阶」(三)Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起

作者:sidiot
  • 2023-06-20
    浙江
  • 本文字数:7495 字

    阅读完需:约 25 分钟

前言


本篇博文是《从 0 到 1 学习 Netty》中进阶系列的第三篇博文,主要内容是从 Redis、HTTP 和自定义协议三个方面来探讨了 Netty 通信协议的设计,结合应用案例加深理解,根据实际情况优化协议,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;


介绍


当今互联网软件系统中,常常需要使用多种协议进行通信。例如,在一个分布式存储系统中,可能需要同时支持 Redis 协议、HTTP 协议以及自定义协议等。而对于这些不同的协议,如何实现跨协议通信也成为了亟待解决的问题。


Netty 作为一种高性能的网络通信框架,在处理不同协议的通信方面具有很大优势。在使用 Netty 实现跨协议通信之前,我们首先需要进行协议特征的分析和比较。我们以 Redis 协议、HTTP 协议和自定义协议为例:


  1. Redis 协议:Redis 协议是一种基于 TCP 连接的二进制协议,用于与 Redis 数据库进行交互。它采用简单的请求/响应模型,并且支持异步执行命令(通过 MULTI / EXEC 命令)。

  2. HTTP 协议:HTTP 协议是一种基于 TCP 连接的文本协议,用于 Web 服务中的客户端-服务器通信。它采用请求/响应模型,并且支持状态码、HeaderCookie 等功能。

  3. 自定义协议:自定义协议是指根据业务需求自定义的协议。它可以是基于二进制格式或者文本格式,通常需要定义消息头、消息体以及校验码等字段。


通过以上分析,我们可以看出不同协议的特点和差异。在使用 Netty 实现跨协议通信时,需要根据每种协议的特点进行针对性开发,下面将介绍如何使用 Netty 实现跨协议通信。


Redis 协议实现


Redis 使用一种基于文本的协议来进行与客户端的通信,该协议被称为 RESP(REdis Serialization Protocol)。


RESP 协议定义了一组规则和格式,用于描述在 Redis 服务器和 Redis 客户端之间交换数据的方式。它支持多种数据类型,包括字符串、数字、数组和错误消息。RESP 协议采用简单而直观的格式,以提高通信效率和可读性。例如,对于字符串类型,RESP 协议使用以下格式:


+OK\r\n
复制代码


其中,"+" 表示状态回复,"OK" 表示字符串内容,"\r\n" 表示行结束符。这个例子中的字符串只包含一个单词,但 RESP 协议同样适用于包含多个单词或者更复杂结构的字符串。如果在此之前对 Redis 没有了解,欢迎移步博主的 Redis 专栏


例如,我们给 Redis 发送一条指令,新增一个键值对 set name sidiot,那么需要遵守以下协议:


# 由于 set name sidiot 指令一共有 3 部分,因此用 *3 表示*3\r\n# 第一个指令的长度是 3$3\r\n# 第一个指令是 set 指令set\r\n# 以此类推$4\r\n name\r\n$6\r\n sidiot\r\n
复制代码


注意,每条指令之后都要添加回车与换行符 \r\n


测试代码:


@Override  public void channelActive(ChannelHandlerContext ctx) throws Exception {      ByteBuf buffer = ctx.alloc().buffer();      String command = "*3\r\n" +              "$3\r\nSET\r\n" +              "$4\r\nname\r\n" +              "$6\r\nsidiot\r\n";      buffer.writeBytes(command.getBytes());      ctx.writeAndFlush(buffer);  }
复制代码


运行结果:



再例如使用 GET 指令,获取刚刚新增的键值对:



或者组合使用指令:



需要完整代码的读者请访问博主的 Github:TestRedis.java


HTTP 协议实现


HTTP 是一种常用的网络协议,它定义了客户端和服务器之间如何进行通信。在 HTTP 通信中,请求行和请求头包含了大量的信息,这些信息对于处理 HTTP 请求来说非常重要。然而,手动实现 HTTP 请求的解析和编码是一项相当复杂的任务。


为了简化这个过程,可以使用 HttpServerCodec 作为服务器端的解码器与编码器,来处理 HTTP 请求,它能够将 HTTP 请求解析为可读的数据,并将响应数据编码为 HTTP 格式。


需要注意的是,如果类名上出现的是 Codec,那表示这个类既有解码又有编码。例如 HttpServerCodec 类,它就包括了 HttpRequestDecoderHttpResponseEncoder,源码如下所示:


public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>          implements HttpServerUpgradeHandler.SourceCodec { ... }
复制代码


测试代码:


@Override  protected void initChannel(SocketChannel ch) {      ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));      ch.pipeline().addLast(new HttpServerCodec());      ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {          @Override          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {              log.debug("{}", msg.getClass());          }      });  }
复制代码


运行结果:



同时,日志输出了两个类:DefaultHttpRequestLastHttpContent,这是因为由于 HttpServerCodec 将请求解析成了两个部分。其中,DefaultHttpRequest 包含了请求行和请求头,LastHttpContent 表示的是请求体:


17:24:49 [DEBUG] [nioEventLoopGroup-3-1] c.s.n.c4.TestHTTP - class io.netty.handler.codec.http.DefaultHttpRequest17:24:49 [DEBUG] [nioEventLoopGroup-3-1] c.s.n.c4.TestHTTP - class io.netty.handler.codec.http.LastHttpContent$1
复制代码


我们可以使用 SimpleChannelInboundHandler 来专注于某一种类型的消息,比如 HttpRequestSimpleChannelInboundHandler 是 Netty 中的一个入站处理程序,用于处理接收到的数据,它是一种特殊类型的 ChannelInboundHandler,可以自动释放消息资源,防止内存泄漏。


测试代码:


ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {      @Override      protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {          // 获得请求uri          log.debug(msg.uri());          // 获得完整响应,设置版本号与状态码          DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);          // 设置响应内容          byte[] bytes = "<h1>Hello, sidiot!</h1>".getBytes(StandardCharsets.UTF_8);          // 设置响应体          response.content().writeBytes(bytes);          // 写回响应          ctx.writeAndFlush(response);      }  });
复制代码


运行结果:



这里浏览器一直在转动的原因是,我们并没有告诉浏览器响应数据的长度是多少,因此浏览器认为后续还会有数据到来,所以一直处于等待状态,我们仅需设置响应体长度即可,代码如下:


import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
response.headers().setInt(CONTENT_LENGTH, bytes.length);
复制代码


运行结果:



总的来说,在服务器收到来自浏览器的 HTTP 请求后,它需要向浏览器返回一个响应。因此,使用 DefaultFullHttpResponse 类创建一个响应对象,在创建响应对象时,需要设置 HTTP 协议版本号和状态码来表示服务器处理该请求的结果。


此外,为了避免浏览器在接收到响应后一直处于等待状态,我们需要通过添加 CONTENT_LENGTH 字段来指定响应正文的长度,以便浏览器知道何时可以停止等待并开始处理响应数据。


因此,在构建 HTTP 响应时,必须确保包括正确的 HTTP 版本号、状态码和内容长度信息,以确保服务器和客户端之间的通信正确无误。


需要完整代码的读者请访问博主的 Github:TestHTTP.java


自定义协议实现


除了 Redis 和 HTTP 协议,我们还可以使用自定义协议来实现 Netty 应用程序之间的通信。自定义协议可以根据应用程序的特定需求进行设计,从而使得 Netty 应用程序的通信更加高效和安全。


自定义协议一般由以下要素组成:


  1. 魔数:魔数是一个特定的数字或字符串,在数据包的开头位置出现,作为识别标志。接收方可以根据魔数判断数据包是否有效,如果无效则直接丢弃,从而提高通信效率和安全性。

  2. 版本号:版本号表示自定义协议的版本信息,当协议发生变化时,可以通过版本号来区分不同的协议版本,这样旧版本的客户端和服务器也能够兼容,即使协议发生变化也能正确处理数据。

  3. 序列化算法:序列化算法指的是将消息正文转换为二进制数据的方式。因为网络传输只能传输二进制数据,所以需要将消息正文序列化为二进制数据,发送方在发送数据时需要对消息正文进行序列化,接收方在接收数据时需要对消息正文进行反序列化,才能正确地还原消息。序列化方法有 json、protobuf、hessian、jdk 等。

  4. 指令类型:指令类型表示发送方要执行的具体业务操作,例如登录、注册、单聊、群聊等,接收方可以根据指令类型来分发消息,将不同的消息转发给相应的业务处理模块。

  5. 请求序号:请求序号是发送方用来标识一个请求的唯一标识符,接收方在返回响应时会携带相同的请求序号,以便发送方能够正确地将响应和请求匹配起来。请求序号还可以用于实现异步通信,发送方可以通过请求序号来判断是否收到了对应的响应,从而实现异步能力。

  6. 正文长度:消息正文的长度,用于接收方正确地读取数据。由于网络传输中数据包大小是有限制的,因此发送方需要对消息正文的长度进行限制,同时也需要将消息正文的长度信息发送给接收方,以便接收方能够正确地读取数据。

  7. 消息正文:包含具体的业务信息。消息正文是自定义协议中最重要的部分,它包含具体的业务信息,例如用户的登录信息、聊天内容等。




接下来以聊天室为业务场景,获取相关业务消息请访问博主的 Github:Message


创建 MessageCodec 类,继承 ByteToMessageCodec 类,它实现了将字节流转换为消息对象并进行解码的功能,该类是一个抽象类,需要通过继承并实现其中的抽象方法来完成具体的解码逻辑。


在实现过程中,需要重写以下两个方法:


  • decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out):该方法表示从入站字节流 ByteBuf 中解码出消息对象,并将结果存储到出站列表 List<Object> 中。在该方法中需要完成字节流的读取和消息对象的构建工作。代码实现如下:


    @Override      public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {         // 设置魔数          out.writeBytes("IDIOT".getBytes());          // 设置版本号          out.writeByte(1);          // 设置序列化方式          out.writeByte(1);          // 设置指令类型          out.writeByte(msg.getMessageType());          // 设置请求序号          out.writeInt(msg.getSequenceId());  
// 获得序列化后的 msg ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(msg); byte[] bytes = bos.toByteArray();
// 获得并设置正文长度 out.writeInt(bytes.length); // 设置消息正文 out.writeBytes(bytes); }
复制代码


  • encode(ChannelHandlerContext ctx, Message msg, ByteBuf out):该方法表示将消息对象编码成字节流并写入出站 ByteBuf 中。在该方法中需要根据不同的协议规范将消息对象转换为字节流,并写入 ByteBuf 中。代码实现如下:


    @Override      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {          // 获取魔数          byte[] magic = new byte[5];          in.readBytes(magic, 0, 5);          // 获取版本号          byte version = in.readByte();          // 获得序列化方式          byte seqType = in.readByte();          // 获得指令类型          byte messageType = in.readByte();          // 获得请求序号          int sequenceId = in.readInt();          // 获得正文长度          int length = in.readInt();          // 获得正文          byte[] bytes = new byte[length];          in.readBytes(bytes, 0, length);          ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));          Message message = (Message) ois.readObject();          // 将信息放入 List 中,传递给下一个 handler          out.add(message);    }
复制代码


测试代码:


public class TestMessageCodec {      public static void main(String[] args) throws Exception {          EmbeddedChannel channel = new EmbeddedChannel(                  new LoggingHandler(LogLevel.DEBUG),                  new MessageCodec()          );  
// encode LoginRequestMessage msg = new LoginRequestMessage("sidiot", "123456"); channel.writeOutbound(msg);
// decode ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); new MessageCodec().encode(null, msg, buf);
// 入站 channel.writeInbound(buf); } }
复制代码


运行结果:



需要完整代码的读者请访问博主的 Github:MessageCodec.java

协议设计优化

避免半包现象


如果消息发送时,出现了半包现象,系统是否能解析呢?这里使用 ByteBuf 的逻辑切片 slice 来伪造半包现象,忘记的同学可以回看博文 ByteBuf 的性能优化


修改代码如下所示:


channel.writeInbound(buf.slice(0, 121));
复制代码


运行结果:



为了避免上述情况,可以使用博主在上篇博文讲解的帧解码器 LengthFieldBasedFrameDecoder


修改代码如下所示:


EmbeddedChannel channel = new EmbeddedChannel(          new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),          new LoggingHandler(),          new MessageCodec()  );
复制代码


运行结果:



当然,如果后续的包能够在连接关闭前到来,那么 Netty 将会合并这些包,整合成一个完整的包,使得系统能够解析,运行结果如下:



需要完整代码的读者请访问博主的 Github:TestMessageCodec.java


提高 handler 的复用率


在使用 Netty 进行网络编程时,我们经常需要使用 handler 来处理数据流,并将其传递给下一个 handler 或者业务逻辑。为提高 handler 的复用率,可以将 handler 创建为 handler 对象,并在不同的 channel 中使用该 handler 对象进行数据处理操作。


例如,我们可以使用 LoggingHandler 来记录不同 channel 中的日志信息。为了提高复用率,我们可以创建一个 LoggingHandler 对象,并将其添加到多个 channel 的 pipeline 中。这样一来,不同 channel 中产生的日志信息都会传递给同一个 LoggingHandler 对象进行处理,从而实现了代码复用和资源节约。


LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);channel1.pipeline().addLast(loggingHandler);channel2.pipeline().addLast(loggingHandler);
复制代码


然而,并非所有的 handler 都能通过这种方式来提高复用率。例如,在多个 channel 中共享同一个 LengthFieldBasedFrameDecoder 对象时,可能会发生以下问题:


假设 channel1 中收到了一个半包,LengthFieldBasedFrameDecoder 发现它不是一条完整的数据,因此不会向下传播该数据。但此时,如果 channel2 中也收到了一个半包,由于两个 channel 使用了同一个 LengthFieldBasedFrameDecoder 对象,存储在其中的数据刚好组成了一条完整的数据包,LengthFieldBasedFrameDecoder 就会让该数据包继续向下传播,最终导致错误。


为了避免这种问题,Netty 中提供了 @Sharable 注解来标识一个 handler 是否可被多个 channel 共享。只有被标记为 @Sharable 的 handler 才能够安全地被多个 channel 共享,并使得复用率得以提升,同时保证了内部状态的正确性和线程安全性。因此,在使用 Netty 中的 handler 时,需要注意其是否标记了 @Sharable 注解,以确保安全高效地进行代码复用。



那么我们自定义的协议 MessageCodec 可以直接加上 @Sharable 注解来实现共享吗?



答案是不能的。这是因为 ByteToMessageCodec 是一种处理网络数据的 handler,它将 ByteBuf 转化为特定的 Message 对象,使得数据更加易于处理和解析,但是在使用 ByteToMessageCodec 时,需要注意到传入该 handler 的 ByteBuf 可能并不是完整的数据包,而只是数据包的一部分或者多个数据包拼接而成的。


因此,如果多个 channel 共享同一个 ByteToMessageCodec 对象,则可能会引发一些并发问题。比如,一个 handler 尝试读取未完成的数据,并且在读取过程中修改了 ByteBuf 中的内容,那么其他 handler 也会受到这个修改的影响,从而导致程序出现异常或错误的行为。


不过这里可以使用 MessageToMessageDecoder 来实现共享,MessageToMessageDecoder 主要用于将已经被处理过的数据再次进行处理。因为 MessageToMessageDecoder 接收到的是已经被处理过的完整数据,所以即使被多个 channel 共享,也不会造成数据处理上的错误。代码如下所示:


@Slf4j  @ChannelHandler.Sharable  public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {    @Override      protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception { ... }        @Override      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { ... }}
复制代码


要注意的是,MessageToMessageCodec 必须与 LengthFieldBasedFrameDecoder 一起使用,确保接收到的 ByteBuf 的完整性。


需要完整代码的读者请访问博主的 Github:MessageCodecSharable.javaChatServer.java

后记


在本文中,我们从 Redis、HTTP 和自定义协议三个方面分析了 Netty 通信协议的设计。对于 Redis 协议,我们了解了其基于字符串的设计和多条命令组合的方式,以及如何基于 Netty 构建自己的 Redis 协议解析器。对于 HTTP 协议,我们讲解了 HTTP 协议的基本结构、状态码、请求方法和报文格式,并演示了如何使用 Netty 发送和接收 HTTP 请求和响应。最后,我们介绍了自定义协议的设计,包括协议头和协议体的格式、编解码方式等关键要素,并给出了具体的实现代码。


当然,在实际的应用场景中,通信协议的设计也需要根据具体的业务需求进行优化和调整。但是,无论采用哪种协议,都需要遵守一定的规范和标准,以确保通信的正确性和稳定性。


以上就是 Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起 的所有内容了,希望本篇博文对大家有所帮助!


参考:


发布于: 刚刚阅读数: 4
用户头像

sidiot

关注

还未添加个人签名 2023-06-04 加入

还未添加个人简介

评论

发布
暂无评论
【Netty】「优化进阶」(三)Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起_Java_sidiot_InfoQ写作社区