写点什么

Netty 源码—Pipeline 和 Handler

  • 2025-03-25
    福建
  • 本文字数:12984 字

    阅读完需:约 43 分钟

1.Pipeline 和 Handler 的作用和构成


(1)Pipeline 和 Handler 的作用


可以在处理复杂的业务逻辑时避免 if else 的泛滥,可以实现对业务逻辑的模块化处理,不同的逻辑放置到单独的类中进行处理。最后将这些逻辑串联起来,形成一个完整的逻辑处理链。

 

Netty 通过责任链模式来组织代码逻辑,能够支持逻辑的动态添加和删除,能够支持各类协议的扩展。

 

(2)Pipeline 和 Handler 的构成


在 Netty 里,一个连接对应着一个 Channel。这个 Channel 的所有处理逻辑都在一个叫 ChannelPipeline 的对象里,ChannelPipeline 是双向链表结构,它和 Channel 之间是一对一的关系。

 

ChannelPipeline 里的每个结点都是一个 ChannelHandlerContext 对象,这个 ChannelHandlerContext 对象能够获得和 Channel 相关的所有上下文信息。每个 ChannelHandlerContext 对象都包含一个逻辑处理器 ChannelHandler,每个逻辑处理器 ChannelHandler 都处理一块独立的逻辑。

 

2.ChannelHandler 的分类


ChannelHandler 有两大子接口,分别为 Inbound 和 Outbound 类型:第一个子接口是 ChannelInboundHandler,用于处理读数据逻辑,最重要的方法是 channelRead()。第二个子接口是 ChannelOutboundHandler,用于处理写数据逻辑,最重要的方法是 write()。

 

这两个子接口默认的实现分别是:ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter。它们分别实现了两个子接口的所有功能,在默认情况下会把读写事件传播到下一个 Handler。

 

InboundHandler 的事件通常只会传播到下一个 InboundHandler,OutboundHandler 的事件通常只会传播到下一个 OuboundHandler,InboundHandler 的执行顺序与实际 addLast 的添加顺序相同,OutboundHandler 的执行顺序与实际 addLast 的添加顺序相反。

 

Inbound 事件通常由 IO 线程触发,如 TCP 链路的建立事件、关闭事件、读事件、异常通知事件等。其触发方法一般带有 fire 字眼,如下所示:

ctx.fireChannelRegister()、

ctx.fireChannelActive()、

ctx.fireChannelRead()、

ctx.fireChannelReadComplete()、

ctx.fireChannelInactive()。

 

Outbound 事件通常由用户主动发起的网络 IO 操作触发,如用户发起的连接操作、绑定操作、消息发送等操作。其触发方法一般如:ctx.bind()、ctx.connect()、ctx.write()、ctx.flush()、ctx.read()、ctx.disconnect()、ctx.close()。

 

3.几个特殊的 ChannelHandler


(1)ChannelInboundHandlerAdapter


ChannelInboundHandlerAdapter 主要用于实现 ChannelInboundHandler 接口的所有方法,这样我们在继承它编写自己的 ChannelHandler 时就不需要实现 ChannelHandler 里的每种方法了,从而避免了直接实现 ChannelHandler 时需要实现其所有方法而导致代码显得冗余和臃肿。


//Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in its ChannelPipeline.public interface ChannelHandler {    //Gets called after the ChannelHandler was added to the actual context and it's ready to handle events.    void handlerAdded(ChannelHandlerContext ctx) throws Exception;
//Gets called after the ChannelHandler was removed from the actual context and it doesn't handle events anymore. void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
//Gets called if a Throwable was thrown. @Deprecated void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
//Indicates that the same instance of the annotated ChannelHandler can be added to one or more ChannelPipelines multiple times without a race condition. @Inherited @Documented @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @interface Sharable { // no value }}
//Skeleton implementation of a ChannelHandler.public abstract class ChannelHandlerAdapter implements ChannelHandler { // Not using volatile because it's used only for a sanity check. boolean added;
//Return true if the implementation is Sharable and so can be added to different ChannelPipelines. public boolean isSharable() { Class<?> clazz = getClass(); Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache(); Boolean sharable = cache.get(clazz); if (sharable == null) { sharable = clazz.isAnnotationPresent(Sharable.class); cache.put(clazz, sharable); } return sharable; }
//Do nothing by default, sub-classes may override this method. @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // NOOP }
//Do nothing by default, sub-classes may override this method. @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // NOOP }
//Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline. @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); }}
//ChannelHandler which adds callbacks for state changes. //This allows the user to hook in to state changes easily.public interface ChannelInboundHandler extends ChannelHandler { //The Channel of the ChannelHandlerContext was registered with its EventLoop void channelRegistered(ChannelHandlerContext ctx) throws Exception;
//The Channel of the ChannelHandlerContext was unregistered from its EventLoop void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
//The Channel of the ChannelHandlerContext is now active void channelActive(ChannelHandlerContext ctx) throws Exception;
//The Channel of the ChannelHandlerContext was registered is now inactive and reached its end of lifetime. void channelInactive(ChannelHandlerContext ctx) throws Exception;
//Invoked when the current Channel has read a message from the peer. void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
//Invoked when the last message read by the current read operation has been consumed by #channelRead(ChannelHandlerContext, Object). //If ChannelOption#AUTO_READ is off, no further attempt to read an inbound data from the current Channel will be made until ChannelHandlerContext#read() is called. void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
//Gets called if an user event was triggered. void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
//Gets called once the writable state of a Channel changed. //You can check the state with Channel#isWritable(). void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
//Gets called if a Throwable was thrown. @Override @SuppressWarnings("deprecation") void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;}
//Abstract base class for ChannelInboundHandler implementations which provide implementations of all of their methods.public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { //Calls ChannelHandlerContext#fireChannelRegistered() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); }
//Calls ChannelHandlerContext#fireChannelUnregistered() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); }
//Calls ChannelHandlerContext#fireChannelActive() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); }
//Calls ChannelHandlerContext#fireChannelInactive() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); }
//Calls ChannelHandlerContext#fireChannelRead(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); }
//Calls ChannelHandlerContext#fireChannelReadComplete() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); }
//Calls ChannelHandlerContext#fireUserEventTriggered(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); }
//Calls ChannelHandlerContext#fireChannelWritabilityChanged() to forward to the next ChannelInboundHandler in the ChannelPipeline. @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); }
//Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline. @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); }}
复制代码


(2)ChannelOutboundHandlerAdapter


ChannelOutboundHandlerAdapter 主要用于实现 ChannelOutboundHandler 接口的所有方法,这样我们在继承它编写自己的 ChannelHandler 时就不需要实现 ChannelHandler 里的每种方法了,从而避免了直接实现 ChannelHandler 时需要实现其所有方法而导致代码显得冗余和臃肿。


//ChannelHandler which will get notified for IO-outbound-operations.public interface ChannelOutboundHandler extends ChannelHandler {    //Called once a bind operation is made.    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
//Called once a connect operation is made. void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception;
//Called once a disconnect operation is made. void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
//Called once a close operation is made. void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
//Called once a deregister operation is made from the current registered EventLoop. void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
//Intercepts ChannelHandlerContext#read(). void read(ChannelHandlerContext ctx) throws Exception;
//Called once a write operation is made. The write operation will write the messages through the ChannelPipeline. //Those are then ready to be flushed to the actual Channel once Channel#flush() is called. void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
//Called once a flush operation is made. The flush operation will try to flush out all previous written messages that are pending. void flush(ChannelHandlerContext ctx) throws Exception;}
//Skeleton implementation of a ChannelOutboundHandler. This implementation just forwards each method call via the ChannelHandlerContext.public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler { //Calls ChannelHandlerContext#bind(SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.bind(localAddress, promise); }
//Calls ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.connect(remoteAddress, localAddress, promise); }
//Calls ChannelHandlerContext#disconnect(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.disconnect(promise); }
//Calls ChannelHandlerContext#close(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.close(promise); }
//Calls ChannelHandlerContext#deregister(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.deregister(promise); }
//Calls ChannelHandlerContext#read() to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); }
//Calls ChannelHandlerContext#write(Object, ChannelPromise)} to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); }
//Calls ChannelHandlerContext#flush() to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Override public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); }}
复制代码


(3)ByteToMessageDecoder


基于这个 ChannelHandler 可以实现自定义解码,而不用关心 ByteBuf 的强转和解码结果的传递。Netty 里的 ByteBuf 默认下使用的是堆外内存,ByteToMessageDecoder 会自动进行内存的释放,不用操心内存管理。我们自定义的 ChannelHandler 继承了 ByteToMessageDecoder 后,需要实现 decode()方法。

 

(4)SimpleChannelInboundHandler


基于这个 ChannelHandler 可以实现每一种指令的处理,不再需要强转、不再有冗长的 if else 逻辑、不再需要手动传递对象。

 

同时还可以自动释放没有往下传播的 ByteBuf,因为我们编写指令处理 ChannelHandler 时,可能会编写不用关心的 if else 判断,然后手动传递无法处理的对象至下一个指令处理器。


//xxxHandler.javaif (packet instanceof xxxPacket) {    //进行处理} else {    ctx.fireChannelRead(packet);}
复制代码


(5)MessageToByteEncoder


基于这个 ChannelHandler 可以实现自定义编码,而不用关心 ByteBuf 的创建,不用把创建完的 ByteBuf 进行返回。

 

4.ChannelHandler 的生命周期


(1)ChannelHandler 回调方法的执行顺序


ChannelHandler 回调方法的执行顺序可以称为 ChannelHandler 的生命周期。

 

新建连接时 ChannelHandler 回调方法的执行顺序是:handlerAdded() -> channelRegistered() -> channelActive() -> channelRead() -> channelReadComplete()。

 

关闭连接时 ChannelHandler 回调方法的执行顺序是:channelInactive() -> channelUnregistered() -> handlerRemoved()。

 

接下来是 ChannelHandler 具体的回调方法说明,其中一二三的顺序可以参考 AbstractChannel 的内部类 AbstractUnsafe 的 register0()方法。

 

一.handlerAdded()


检测到新连接后调用"ch.pipeline().addLast(...)"之后的回调,表示当前 Channel 已成功添加一个 ChannelHandler。

 

二.channelRegistered()


表示当前 Channel 已和某个 NioEventLoop 线程建立了绑定关系,已经创建了一个 Reactor 线程来处理当前这个 Channel 的读写。

 

三.channelActive()


当 Channel 的 Pipeline 已经添加完所有的 ChannelHandler 以及绑定好一个 NioEventLoop 线程,这个 Channel 对应的连接才算真正被激活,接下来就会回调该方法。

 

四.channelRead()


服务端每次收到客户端发送的数据时都会回调该方法,表示有数据可读。

 

五.channelReadComplete()


服务端每次读完一条完整的数据都会回调该方法,表示数据读取完毕。

 

六.channelInactive()


表示这个连接已经被关闭,该连接在 TCP 层已经不再是 ESTABLISH 状态。

 

七.channelUnregister()


表示与这个连接对应的 NioEventLoop 线程移除了对这个连接的处理。

 

八.handlerRemoved()


表示给这个连接添加的所有的 ChannelHandler 都被移除了。

 

(2)ChannelHandler 回调方法的应用场景


一.handlerAdded()方法与 handlerRemoved()方法通常可用于一些资源的申请和释放。

 

二.channelActive()方法与 channelInactive()方法表示的是 TCP 连接的建立与释放,可用于统计单机连接数或 IP 过滤。

 

三.channelRead()方法可用于根据自定义协议进行拆包。每次读到一定数据就累加到一个容器里,然后看看能否拆出完整的包。

 

四.channelReadComplete()方法可用于实现批量刷新。如果每次向客户端写数据都通过 writeAndFlush()方法写数据并刷新到底层,其实并不高效。所以可以把调用 writeAndFlush()方法的地方换成调用 write()方法,然后再在 channelReadComplete()方法里调用 ctx.channel().flush()。

 

5.ChannelPipeline 的事件处理


(1)消息读取和发送被 Pipeline 处理的过程


消息的读取和发送被 ChannelPipeline 的 ChannelHandler 链拦截和处理的全过程:

一.首先 AbstractNioChannel 内部类 NioUnsafe 的 read()方法读取 ByteBuf 时会触发 ChannelRead 事件,也就是由 NioEventLoop 线程调用 ChannelPipeline 的 fireChannelRead()方法将 ByteBuf 消息传输到 ChannelPipeline 中。

二.然后 ByteBuf 消息会依次被 HeadContext、xxxChannelHandler、...、TailContext 拦截处理。在这个过程中,任何 ChannelHandler 都可以中断当前的流程,结束消息的传递。

三.接着用户可能会调用 ChannelHandlerContext 的 write()方法发送 ByteBuf 消息。此时 ByteBuf 消息会从 TailContext 开始,途径 xxxChannelHandler、...、HeadContext,最终被添加到消息发送缓冲区中等待刷新和发送。在这个过程中,任何 ChannelHandler 都可以中断当前的流程,中断消息的传递。

 

(2)ChannelPipeline 的主要特征


一.ChannelPipeline 支持运行时动态地添加或者删除 ChannelHandler

例如业务高峰时对系统做拥塞保护。处于业务高峰期时,则动态地向当前的 ChannelPipeline 添加 ChannelHandler。高峰期过后,再移除 ChannelHandler。

 

二.ChannelPipeline 是线程安全的

多个业务线程可以并发操作 ChannelPipeline,因为使用了 synchronized 关键字。但 ChannelHandler 却不一定是线程安全的,这由用户保证。

 

6.关于 ChannelPipeline 的问题整理


一.Netty 是如何判断 ChannelHandler 类型的?


即如何判断一个 ChannelHandler 是 Inbound 类型还是 Outbound 类型?

 

答:当调用 Pipeline 去添加一个 ChannelHandler 结点时,旧版 Netty 会使用 instanceof 关键字来判断该结点是 Inbound 类型还是 Outbound 类型,并分别用一个布尔类型的变量来进行标识。新版 Netty 则使用一个整形的 executionMask 来具体区分详细的 Inbound 事件和 Outbound 事件。这个 executionMask 对应一个 16 位的二进制数,是哪一种事件就对应哪一个 Mask。


//Inbound事件的MaskMASK_EXEPTION_CAUGHT = 1;MASK_CHANNEL_REGISTER = 1 << 1;MASK_CHANNEL_UNREGISTER = 1 << 2;MASK_CHANNEL_ACTIVE = 1 << 3;MASK_CHANNEL_INACTIVE = 1 << 4;MASK_CHANNEL_READ = 1 << 5;MASK_CHANNEL_READ_COMPLETE = 1 << 6;MASK_CHANNEL_USER_EVENT_TRIGGERED = 1 << 7;MASK_CHANNEL_WRITABLITY_CHANGED = 1 << 8;
//Outbound事件的MaskMASK_BIND = 1 << 9;MASK_CONNECT = 1 << 10;MASK_DISCONNECT = 1 << 11;MASK_CLOSE = 1 << 12;MASK_DEREGISTER = 1 << 13;MASK_READ = 1 << 14;MASK_WRITE = 1 << 15;MASK_FLUSH = 1 << 16;
复制代码


二.添加 ChannelHandler 时应遵循什么样的顺序?


答:Inbound 类型的事件传播跟添加 ChannelHandler 的顺序一样,Outbound 类型的事件传播跟添加 ChannelHandler 的顺序相反。

 

三.用户手动触发事件传播的两种方式有什么区别?


这两种方式是分别是:ctx.writeAndFlush()和 ctx.channel().writeAndFlush()。

 

答:当通过 Channel 去触发一个事件时,那么该事件会沿整个 ChannelPipeline 传播。如果是 Inbound 类型事件,则从 HeadContext 结点开始向后传播到最后一个 Inbound 类型的结点。如果是 Outbound 类型事件,则从 TailContext 结点开始向前传播到第一个 Outbound 类型的结点。当通过当前结点去触发一个事件时,那么该事件只会从当前结点开始传播。如果是 Inbound 类型事件,则从当前结点开始一直向后传播到最后一个 Inbound 类型的结点。如果是 Outbound 类型事件,则从当前结点开始一直向前传播到第一个 Outbound 类型的结点。

 

7.ChannelPipeline 主要包括三部分内容


一.ChannelPipeline 的初始化


服务端 Channel 和客户端 Channel 在何时初始化 ChannelPipeline?在初始化时又做了什么事情?

 

二.添加和删除 ChannelHandler


Netty 是如何实现业务逻辑处理器动态编织的?

 

三.事件和异常的传播


读写事件和异常在 ChannelPipeline 中的传播。

 

8.ChannelPipeline 的初始化


(1)ChannelPipeline 的初始化时机


在服务端启动和客户端连接接入的过程中,在创建 NioServerSocketChannel 和 NioSocketChannel 时,会逐层执行父类的构造方法,最后执行到 AbstractChannel 的构造方法。AbstractChannel 的构造方法会将 Netty 的核心组件创建出来。而核心组件中就包含了 DefaultChannelPipeline 类型的 ChannelPipeline 组件。


//A skeletal Channel implementation.public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {     private final Channel parent;    private final ChannelId id;    private final Unsafe unsafe;    private final DefaultChannelPipeline pipeline;    ...    //Creates a new instance.    protected AbstractChannel(Channel parent) {        this.parent = parent;        id = newId();        unsafe = newUnsafe();        pipeline = newChannelPipeline();    }        //Returns a new DefaultChannelPipeline instance.    protected DefaultChannelPipeline newChannelPipeline() {        //创建ChannelPipeline组件        return new DefaultChannelPipeline(this);    }    ...}
//The default ChannelPipeline implementation. //It is usually created by a Channel implementation when the Channel is created.public class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; private final Channel channel;//保存了Channel的引用 ... protected DefaultChannelPipeline(Channel channel) { //保存Channel的引用到Pipeline组件的成员变量 this.channel = ObjectUtil.checkNotNull(channel, "channel"); ... tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; } ...}
复制代码


(2)ChannelPipeline 的初始化内容


ChannelPipeline 的初始化主要涉及三部分内容:

一.Pipeline 在创建 Channel 时被创建

二.Pipeline 的结点是 ChannelHandlerContext

三.Pipeline 两大哨兵 HeadContext 和 TailContext

 

(3)ChannelPipeline 的说明


ChannelPipeline 中保存了 Channel 的引用,ChannelPipeline 中每个结点都是一个 ChannelHandlerContext 对象,每个 ChannelHandlerContext 结点都包裹着一个 ChannelHandler 执行器,每个 ChannelHandlerContext 结点都保存了它包裹的执行器 ChannelHandler 执行操作时所需要的上下文 ChannelPipeline。由于 ChannelPipeline 又保存了 Channel 的引用,所以每个 ChannelHandlerContext 结点都可以拿到所有的上下文信息。

 

ChannelHandlerContext 接口多继承自 AttributeMap、ChannelInboundInvoker、ChannelOutboundInvoker。

 

ChannelHandlerContext 的关键方法有:channel()、executor()、handler()、pipeline()、alloc()。ChannelHandlerContext 默认是由 AbstractChannelHandlerContext 去实现的,它实现了大部分功能。

 

ChannelPipeline 初始化时会初始化两个结点:HeadContext 和 TailContext,并构成双向链表。HeadContext 结点会比 TailContext 结点多一个 unsafe 成员变量。


public class DefaultChannelPipeline implements ChannelPipeline {    //ChannelPipeline中每个结点都是一个ChannelHandlerContext对象    final AbstractChannelHandlerContext head;    final AbstractChannelHandlerContext tail;    private final Channel channel;//ChannelPipeline中保存了Channel的引用    ...    protected DefaultChannelPipeline(Channel channel) {        //保存Channel的引用到Pipeline组件的成员变量        this.channel = ObjectUtil.checkNotNull(channel, "channel");        ...        //ChannelPipeline初始化时会初始化两个结点:HeadContext和TailContext,并构成双向链表        tail = new TailContext(this);        head = new HeadContext(this);        head.next = tail;        tail.prev = head;    }        final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {        //HeadContext结点会比TailContext结点多一个unsafe成员变量        private final Unsafe unsafe;        HeadContext(DefaultChannelPipeline pipeline) {            super(pipeline, null, HEAD_NAME, false, true);            unsafe = pipeline.channel().unsafe();            setAddComplete();        }        ...    }        final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {        TailContext(DefaultChannelPipeline pipeline) {            super(pipeline, null, TAIL_NAME, true, false);            setAddComplete();        }        ...    }    ...}
//ChannelHandlerContext默认是由AbstractChannelHandlerContext去实现的,它实现了大部分功能abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { //每个ChannelHandlerContext结点都保存了它包裹的执行器ChannelHandler执行操作时所需要的上下文ChannelPipeline private final DefaultChannelPipeline pipeline; ... AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; ordered = executor == null || executor instanceof OrderedEventExecutor; }}
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker { //Return the Channel which is bound to the ChannelHandlerContext. Channel channel(); //Returns the EventExecutor which is used to execute an arbitrary task. EventExecutor executor();
//The unique name of the ChannelHandlerContext. //The name was used when then ChannelHandler was added to the ChannelPipeline. //This name can also be used to access the registered ChannelHandler from the ChannelPipeline. String name();
//The ChannelHandler that is bound this ChannelHandlerContext. ChannelHandler handler();
//Return true if the ChannelHandler which belongs to this context was removed from the ChannelPipeline. //Note that this method is only meant to be called from with in the EventLoop. boolean isRemoved();
ChannelHandlerContext fireChannelRegistered(); ChannelHandlerContext fireChannelUnregistered(); ChannelHandlerContext fireChannelActive(); ChannelHandlerContext fireChannelInactive(); ChannelHandlerContext fireExceptionCaught(Throwable cause); ChannelHandlerContext fireUserEventTriggered(Object evt); ChannelHandlerContext fireChannelRead(Object msg); ChannelHandlerContext fireChannelReadComplete(); ChannelHandlerContext fireChannelWritabilityChanged(); ChannelHandlerContext read(); ChannelHandlerContext flush(); //Return the assigned ChannelPipeline ChannelPipeline pipeline(); //Return the assigned ByteBufAllocator which will be used to allocate ByteBufs. ByteBufAllocator alloc(); ...}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18790452

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
Netty源码—Pipeline和Handler_Netty_不在线第一只蜗牛_InfoQ写作社区