写点什么

netty 系列之:channelPipeline 详解

作者:程序那些事
  • 2022 年 2 月 17 日
  • 本文字数:4346 字

    阅读完需:约 14 分钟

netty系列之:channelPipeline详解

简介

我们在介绍 channel 的时候提到过,几乎 channel 中所有的实现都是通过 channelPipeline 进行的,作为一个 pipline,它到底是如何工作的呢?


一起来看看吧。

ChannelPipeline

ChannelPipeline 是一个 interface,它继承了三个接口,分别是 ChannelInboundInvoker,ChannelOutboundInvoker 和 Iterable:


public interface ChannelPipeline        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> 
复制代码


继承自 ChannelInboundInvoker,表示 ChannelPipeline 可以触发 channel inboud 的一些事件,比如:


ChannelInboundInvoker fireChannelRegistered();ChannelInboundInvoker fireChannelUnregistered();ChannelInboundInvoker fireChannelActive();ChannelInboundInvoker fireChannelInactive();ChannelInboundInvoker fireExceptionCaught(Throwable cause);ChannelInboundInvoker fireUserEventTriggered(Object event);ChannelInboundInvoker fireChannelRead(Object msg);ChannelInboundInvoker fireChannelReadComplete();ChannelInboundInvoker fireChannelWritabilityChanged();
复制代码


继承自 ChannelOutboundInvoker,表示 ChannelPipeline 可以进行一些 channel 的主动操作,如:bind,connect,disconnect,close,deregister,read,write,flush 等操作。


继承自 Iterable,表示 ChannelPipeline 是可遍历的,为什么 ChannelPipeline 是可遍历的呢?


因为 ChannelPipeline 中可以添加一个或者多个 ChannelHandler,ChannelPipeline 可以看做是一个 ChannelHandler 的集合。


比如 ChannelPipeline 提供了一系列的添加 ChannelHandler 的方法:


ChannelPipeline addFirst(String name, ChannelHandler handler);ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);ChannelPipeline addFirst(ChannelHandler... handlers);
ChannelPipeline addLast(String name, ChannelHandler handler);ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);ChannelPipeline addLast(ChannelHandler... handlers);ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
复制代码


可以从前面添加,也可以从后面添加,或者从特定的位置添加 handler。


另外还可以从 pipeline 中删除特定的 channelHandler,或者移出和替换特定位置的 handler:


ChannelPipeline remove(ChannelHandler handler);ChannelHandler remove(String name);ChannelHandler removeFirst();ChannelHandler removeLast();ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
复制代码


当然,更少不了对应的查询操作:


ChannelHandler first();ChannelHandler last();ChannelHandler get(String name);List<String> names();
复制代码


还可以根据传入的 ChannelHandler 获得 handler 对应的 ChannelHandlerContext。


ChannelHandlerContext context(ChannelHandler handler);
复制代码


ChannelPipeline 中还有一些触发 channel 相关的事件,如:


    ChannelPipeline fireChannelRegistered();    ChannelPipeline fireChannelUnregistered();    ChannelPipeline fireChannelActive();    ChannelPipeline fireChannelInactive();    ChannelPipeline fireExceptionCaught(Throwable cause);    ChannelPipeline fireUserEventTriggered(Object event);    ChannelPipeline fireChannelRead(Object msg);    ChannelPipeline fireChannelReadComplete();    ChannelPipeline fireChannelWritabilityChanged();
复制代码

事件传递

那么有些朋友可能会问了,既然 ChannelPipeline 中包含了很多个 handler,那么 handler 中的事件是怎么传递的呢?


其实这些事件是通过调用 ChannelHandlerContext 中的相应方法来触发的。


对于 Inbound 事件来说,可以调用下面的方法,进行事件的传递:


ChannelHandlerContext.fireChannelRegistered()ChannelHandlerContext.fireChannelActive()ChannelHandlerContext.fireChannelRead(Object)ChannelHandlerContext.fireChannelReadComplete()ChannelHandlerContext.fireExceptionCaught(Throwable)ChannelHandlerContext.fireUserEventTriggered(Object)ChannelHandlerContext.fireChannelWritabilityChanged()ChannelHandlerContext.fireChannelInactive()ChannelHandlerContext.fireChannelUnregistered()
复制代码


对于 Outbound 事件来说,可以调用下面的方法,进行事件的传递:


ChannelHandlerContext.bind(SocketAddress, ChannelPromise)ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)ChannelHandlerContext.write(Object, ChannelPromise)ChannelHandlerContext.flush()ChannelHandlerContext.read()ChannelHandlerContext.disconnect(ChannelPromise)ChannelHandlerContext.close(ChannelPromise)ChannelHandlerContext.deregister(ChannelPromise)
复制代码


具体而言,就是在 handler 中调用 ChannelHandlerContext 中对应的方法:


   public class MyInboundHandler extends ChannelInboundHandlerAdapter {        @Override       public void channelActive(ChannelHandlerContext ctx) {           System.out.println("Connected!");           ctx.fireChannelActive();       }   }     public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {        @Override       public void close(ChannelHandlerContext ctx, ChannelPromise promise) {           System.out.println("Closing ..");           ctx.close(promise);       }   }
复制代码

DefaultChannelPipeline

ChannelPipeline 有一个官方的实现叫做 DefaultChannelPipeline,因为对于 pipeline 来说,主要的功能就是进行 handler 的管理和事件传递,相对于而言功能比较简单,但是他也有一些特别的实现地方,比如它有两个 AbstractChannelHandlerContext 类型的 head 和 tail。


我们知道 ChannelPipeline 实际上是很多 handler 的集合,那么这些集合是怎么进行存储的呢?这种存储的数据结构就是 AbstractChannelHandlerContext。每个 AbstractChannelHandlerContext 中都有一个 next 节点和一个 prev 节点,用来组成一个双向链表。


同样的在 DefaultChannelPipeline 中使用 head 和 tail 来将封装好的 handler 存储起来。


注意,这里的 head 和 tail 虽然都是 AbstractChannelHandlerContext,但是两者有稍许不同。先看下 head 和 tail 的定义:


    protected DefaultChannelPipeline(Channel channel) {        this.channel = ObjectUtil.checkNotNull(channel, "channel");        succeededFuture = new SucceededChannelFuture(channel, null);        voidPromise =  new VoidChannelPromise(channel, true);
tail = new TailContext(this); head = new HeadContext(this);
head.next = tail; tail.prev = head; }
复制代码


在 DefaultChannelPipeline 的构造函数中,对 tail 和 head 进行初始化,其中 tail 是 TailContext,而 head 是 HeadContext。


其中 TailContext 实现了 ChannelInboundHandler 接口:


final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler
复制代码


而 HeadContext 实现了 ChannelOutboundHandler 和 ChannelInboundHandler 接口:


final class HeadContext extends AbstractChannelHandlerContext            implements ChannelOutboundHandler, ChannelInboundHandler 
复制代码


下面我们以 addFirst 方法为例,来看一下 handler 是怎么被加入 pipline 的:


    public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {        final AbstractChannelHandlerContext newCtx;        synchronized (this) {            checkMultiplicity(handler);            name = filterName(name, handler);
newCtx = newContext(group, name, handler);
addFirst0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; }
EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } callHandlerAdded0(newCtx); return this; }
复制代码


它的工作逻辑是首先根据传入的 handler 构建一个新的 context,然后调用 addFirst0 方法,将 context 加入 AbstractChannelHandlerContext 组成的双向链表中:


    private void addFirst0(AbstractChannelHandlerContext newCtx) {        AbstractChannelHandlerContext nextCtx = head.next;        newCtx.prev = head;        newCtx.next = nextCtx;        head.next = newCtx;        nextCtx.prev = newCtx;    }
复制代码


然后调用 callHandlerAdded0 方法来触发 context 的 handlerAdded 方法。

总结

channelPipeline 负责管理 channel 的各种 handler,在 DefaultChannelPipeline 中使用了 AbstractChannelHandlerContext 的 head 和 tail 来对多个 handler 进行存储,同时借用这个链式结构对 handler 进行各种管理,非常方便。


本文已收录于 http://www.flydean.com/04-3-netty-channelpipeline/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

用户头像

关注公众号:程序那些事,更多精彩等着你! 2020.06.07 加入

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧,尽在公众号:程序那些事!

评论

发布
暂无评论
netty系列之:channelPipeline详解