写点什么

【Netty】「萌新入门」(五)Pipeline 与 ChannelHandler

作者:sidiot
  • 2023-06-24
    浙江
  • 本文字数:4696 字

    阅读完需:约 15 分钟

前言


本篇博文是《从 0 到 1 学习 Netty》中入门系列的第五篇博文,主要内容是介绍 Netty 中 Pipeline 与 ChannelHandler 的概念和作用,通过源码分析和应用案例进行详细讲解,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;


Pipeline


在 Netty 中,pipeline 是一种机制,它由一系列的 ChannelHandler 组成。pipeline 负责处理进入或离开 Channel 的数据,并且将事件(比如连接建立、数据读取等)转发给正确的 handler 进行处理。


handlerpipeline 的节点,每个 handler 会接收来自前一个 handler 的处理结果,并进行自己的处理。然后,它将处理结果传递给下一个 handler,直到最终达到 pipeline 的尾部。pipeline 的头部和尾部都是特殊的 handler,头部负责处理 Inbound 操作,尾部则负责处理 Outbound 操作。





接下来进行具体操作:


1、通过 channel 获取 pipeline


ChannelPipeline pipeline = ch.pipeline();
复制代码


2、添加处理器:


pipeline.addLast(){    ...};
复制代码


3、服务端代码如下,完整代码见 TestPipeline.java


.childHandler(new ChannelInitializer<NioSocketChannel>() {    @Override    protected void initChannel(NioSocketChannel ch) throws Exception {        ChannelPipeline pipeline = ch.pipeline();        pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {            @Override            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {                log.debug("Inbound1");                super.channelRead(ctx, msg);            }        });        // h2        // h3        ...                pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() {            @Override            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {                log.debug("Outbound4");                super.write(ctx, msg, promise);            }        });        // h5        // h6        ...    }})
复制代码


4、客户端在博文 ChannelFuture 与 CloseFuture 中有详细讲解,完整代码见 CloseFutureClient.java


.handler(new ChannelInitializer<NioSocketChannel>() {    @Override    protected void initChannel(NioSocketChannel ch) throws Exception {        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));        ch.pipeline().addLast(new StringEncoder());    }})
复制代码


5、服务端运行结果:



6、客户端运行结果:





看到服务端的运行结果,可能有小伙伴会感到疑惑,按照 handler 的添加顺序,运行结果不应该是 In1 -> In2 -> In3 -> Out4 -> Out5 -> Out6 吗?


这是因为,当 Inbound 操作发生时,Pipeline 会从头部开始向后调用 Handler,直到找到能够处理该操作的 Handler。一旦找到,该 Handler 将处理数据并将其传递给下一个 Handler,直到达到尾部为止。



同样地,当 Outbound 操作发生时,Pipeline 会从尾部开始向前调用 Handler,直到找到能够处理该操作的 Handler。一旦找到,该 Handler 将处理数据并将其传递给上一个 Handler,直到达到头部为止。



ChannelHandler


在 Netty 中,ChannelHandler 是处理 IO 事件的最基本组件之一。ChannelHandler 位于 Netty 的核心位置,并负责接收入站事件 Inbound 和转发出站事件 Outbound


具体而言,ChannelHandler 主要有两个作用:


  1. 处理各种类型的 IO 事件,包括连接建立、连接关闭、数据读写等。

  2. 实现业务逻辑,对网络数据进行处理,例如编解码、协议解析、消息过滤、消息转发等。

Inbound


Inbound 是一种 ChannelHandler 的类型,它主要用于处理从网络接收到的数据。具体来说,当数据到达 Netty 应用程序的网络层时,Inbound 处理程序会被触发并开始处理这些数据。


Inbound 处理程序通常会执行以下操作:


  1. 解码:将二进制数据转换为 Java 对象。

  2. 验证:确保数据格式正确以及发送方有权进行操作。

  3. 处理:执行实际的业务逻辑,可能包括修改状态、创建响应等。

  4. 转发:将处理后的数据传递给下一个处理程序或写回到网络中。


在处理完所有 Inbound 处理程序之后,Netty 应用程序通常会将处理结果传递给 Outbound 处理程序,让其对数据进行编码、加密等操作,并发送回网络。


举个例子,接下来将使用三个 Inbound,第一个 handler 用于接收 name 属性,第二个 handler 用于生成 Person 类,第三个 handler 返回该结果,代码如下:


pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        log.debug("Inbound1, value: {}, class: {}", msg, msg.getClass());        ByteBuf buf = (ByteBuf) msg;        String name = buf.toString(Charset.defaultCharset());        super.channelRead(ctx, name);    }});pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {    @Override    public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {        log.debug("Inbound2, value: {}, class: {}", name, name.getClass());        Person person = new Person(name.toString());        super.channelRead(ctx, person);    }});pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        log.debug("Inbound3, value: {}, class: {}", msg, msg.getClass());        ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));    }});
复制代码


运行结果:



在上述代码中,super.channelRead(ctx, msg) 方法用于将接收到的消息传递给下一个 ChannelInboundHandler 处理器进行处理,实现了消息在处理链条中的流转。


super.channelRead(ctx, msg) 源码如下:


@Skip  @Override  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {      ctx.fireChannelRead(msg);  }
@Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg); return this; }
复制代码


通过 super.channelRead(ctx, msg) 的源码可以获知,在该方法中,通过调用 ctx.fireChannelRead(msg) 将数据传递给下一个 ChannelInboundHandler,从而实现事件的传播。


fireChannelRead 是在 ChannelHandlerContext 接口中定义的,默认实现是在当前 ChannelHandlerContext 中查找与 MASK_CHANNEL_READ 相应类型的 ChannelInboundHandler,并将数据传递给它的 channelRead 方法。这个方法返回当前的 ChannelHandlerContext 对象,可以链式调用其他方法。其中,invokeChannelRead() 方法在博文 从0到1(七):入门-EventLoop 进行过详细讲解。

Outbound


Outbound 是一种 ChannelHandler 的类型,它主要用于处理将数据发送到网络的操作。具体来说,当应用程序需要向网络发送数据时,会触发 Outbound 处理程序,并让其对数据进行编码、加密等处理后再发送出去。


Outbound 处理程序通常会执行以下操作:


  1. 编码:将 Java 对象转换为二进制数据。

  2. 加密:对数据进行加密以保证安全性。

  3. 写入:将处理后的数据写入网络中发送出去。


在处理完所有 Outbound 处理程序之后,Netty 应用程序通常会将数据传递给底层的传输层(如 TCP)并发送到远程端点。


这里需要注意的是,socketChannel.writeAndFlush()ctx.writeAndFlush() 这两个方法,两个方法的作用都是写入数据并刷新缓冲区,但还是有所不同的,接下来通过实例进行讲解,为了效果更加明显,将原先代码中的 h3 与 h4 互换:


pipeline.addLast("h3", new ChannelOutboundHandlerAdapter() {    @Override    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        log.debug("Outbound4");        super.write(ctx, msg, promise);    }});pipeline.addLast("h4", new ChannelInboundHandlerAdapter() {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        log.debug("Inbound3, value: {}, class: {}", msg, msg.getClass());        ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));    }});
复制代码


使用 socketChannel.writeAndFlush() 的运行结果:



使用 ctx.writeAndFlush() 的运行结果:



可以发现,socketChannel.writeAndFlush() 的运行结果包含了三个 Outbound,但是 ctx.writeAndFlush() 的运行结果只有一个 Outbound4,这是为什么呢?接下来我们通过源码进一步分析。


socketChannel.writeAndFlush() 的源码如下:


@Override  public ChannelFuture writeAndFlush(Object msg) {      return pipeline.writeAndFlush(msg);  }
@Override public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); }
复制代码


ctx.writeAndFlush() 的源码如下:


@Override  public ChannelFuture writeAndFlush(Object msg) {      return writeAndFlush(msg, newPromise());  }
复制代码


从源码中可以看出,socketChannel.writeAndFlush() 是从尾部开始向前寻找 Outbound,而 ctx.writeAndFlush() 则是从当前位置开始向前寻找 Outbound。



因此,socketChannel.writeAndFlush() 的运行结果包含了三个 Outbound,而 ctx.writeAndFlush() 的运行结果只有一个 Outbound4。


EmbeddedChannel


EmbeddedChannel 是 Netty 提供的工具类,用于在单元测试中模拟 Netty Channel 的行为。它可以被用于测试 ChannelHandler、ChannelPipeline 等模块。


通常来说,在使用 Netty 进行网络编程的时候,我们需要连接远程服务器或者监听本地端口以接收请求。这个过程需要真实的网络环境,即需要实际建立连接和发送数据,这样会增加测试的复杂性和不稳定性。而使用 EmbeddedChannel,我们可以通过将 ChannelHandler 添加到 EmbeddedChannel 对象上,来模拟整个请求/响应的流程,从而达到测试的目的,避免了对网络环境的依赖。


另外,使用 EmbeddedChannel 还可以轻松测试多个 ChannelHandler 之间的协作情况。例如,在进行 WebSocket 消息处理的时候,我们可能需要多个 ChannelHandler 协同工作才能完成消息的解析和转发,此时使用 EmbeddedChannel 就非常方便。


测试代码如下,完整代码见 TestEmbeddedChannel.java


public class TestEmbeddedChannel {    public static void main(String[] args) {        ...
// 用于测试Handler的Channel EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 执行Inbound操作 channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
// 执行Outbound操作 channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8))); }}
复制代码


运行结果:


1243
复制代码


后记


以上就是 Pipeline 与 ChannelHandler 的所有内容了,希望本篇博文对大家有所帮助!


参考:


发布于: 刚刚阅读数: 2
用户头像

sidiot

关注

还未添加个人签名 2023-06-04 加入

还未添加个人简介

评论

发布
暂无评论
【Netty】「萌新入门」(五)Pipeline 与 ChannelHandler_Java_sidiot_InfoQ写作社区