写点什么

netty 系列之:Event、Handler 和 Pipeline

发布于: 1 小时前

简介上一节我们讲解了 netty 中的 Channel,知道了 channel 是事件处理器和外部联通的桥梁。今天本文将会详细讲解 netty 的剩下几个非常总要的部分 Event、Handler 和 PipeLine。


ChannelPipelinepipeLine 是连接 Channel 和 handler 的桥梁,它实际上是一个 filter 的实现,用于控制其中 handler 的处理方式。


当一个 channel 被创建的时候,和它对应的 ChannelPipeline 也会被创建。


先看下 ChannelPipeline 的定义:


public interface ChannelPipelineextends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable


首先 ChannelPipeline 继承自 Iterable,表示它是可遍历的,而遍历的结果就是其中一个个的 Handler。


作为一个合格的 Iterable,ChannelPipeline 提供了一系列的 add 和 remote 方法,通过这些方法就可以向 ChannelPipeline 中添加或者移除 Handler。因为 ChannelPipeline 是一个 filter,而过滤器是需要指定对应的 filter 的顺序的,所以 ChannelPipeline 中有 addFirst 和 addLast 这种添加不同顺序的方法。


然后可以看到 ChannelPipeline 继承了 ChannelInboundInvoker 和 ChannelOutboundInvoker 两个接口。


先看一张 channelPipeline 的工作流程图:


可以看出 ChannelPipeline 主要有两种操作,一种是读入 Inbound,一种是写出 OutBound。


对于 Socket.read()这样的读入操作,调用的实际上就是 ChannelInboundInvoker 中的方法。对于外部的 IO 写入的请求,调用的就是 ChannelOutboundInvoker 中的方法。


注意,Inbound 和 outbound 的处理顺序是相反的,比如下面的例子:


ChannelPipeline p = ...;
复制代码


p.addLast("1", new InboundHandlerA());p.addLast("2", new InboundHandlerB());p.addLast("3", new OutboundHandlerA());p.addLast("4", new OutboundHandlerB());p.addLast("5", new InboundOutboundHandlerX());


上面的代码中我们向 ChannelPipeline 添加了 5 个 handler,其中 2 个 InboundHandler,2 个 OutboundHandler 和一个同时处理 In 和 Out 的 Handler。


那么当 channel 遇到 inbound event 的时候,就会按照 1,2,3,4,5 的顺序进行处理,但是只有 InboundHandler 才能处理 Inbound 事件,所以,真正执行的顺序是 1,2,5。


同样的当 channel 遇到 outbound event 的时候,会按照 5,4,3,2,1 的顺序进行执行,但是只有 outboundHandler 才能处理 Outbound 事件,所以真正执行的顺序是 5,4,3.


简单的说,ChannelPipeline 指定了 Handler 的执行顺序。


ChannelHandlernetty 是一个事件驱动的框架,所有的 event 都是由 Handler 来进行处理的。ChannelHandler 可以处理 IO、拦截 IO 或者将 event 传递给 ChannelPipeline 中的下一个 Handler 进行处理。


ChannelHandler 的结构很简单,只有三个方法,分别是:


void handlerAdded(ChannelHandlerContext ctx) throws Exception;void handlerRemoved(ChannelHandlerContext ctx) throws Exception;void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;


根据 inbound 和 outbound 事件的不同,ChannelHandler 可以分为两类,分别是 ChannelInboundHandler 和 ChannelOutboundHandler.


因为这两个都是 interface,实现起来比较麻烦,所以 netty 为大家提供了三个默认的实现:ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter 和 ChannelDuplexHandler。前面两个很好理解,分别是 inbound 和 outbound,最后一个可以同时处理 inbound 和 outbound。


ChannelHandler 是由 ChannelHandlerContext 提供的,并且和 ChannelPipeline 的交互也是通过 ChannelHandlerContext 来进行的。


ChannelHandlerContextChannelHandlerContext 可以让 ChannelHandler 和 ChannelPipeline 或者其他的 Handler 进行交互。它就是一个上下文环境,使得 Handler 和 Channel 可以相互作用。


如可以在 ChannelHandlerContext 中,调用 channel()获得绑定的 channel。可以通过调用 handler()获得绑定的 Handler。通过调用 fire*方法来触发 Channel 的事件。


看下 ChannelHandlerContext 的定义:


public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker


可以看到他是一个 AttributeMap 用来存储属性,还是一个 ChannelInboundInvoker 和 ChannelOutboundInvoker 用来触发和传播相应的事件。


对于 Inbound 来说传播事件的方法有:


ChannelHandlerContext.fireChannelRegistered()ChannelHandlerContext.fireChannelActive()ChannelHandlerContext.fireChannelRead(Object)ChannelHandlerContext.fireChannelReadComplete()ChannelHandlerContext.fireExceptionCaught(Throwable)ChannelHandlerContext.fireUserEventTriggered(Object)ChannelHandlerContext.fireChannelWritabilityChanged()ChannelHandlerContext.fireChannelInactive()ChannelHandlerContext.fireChannelUnregistered()


对于 Outbound 来说传播事件的方法有:


ChannelHandlerContext.bind(SocketAddress, ChannelPromise)ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)ChannelHandlerContext.write(Object, ChannelPromise)ChannelHandlerContext.flush()ChannelHandlerContext.read()ChannelHandlerContext.disconnect(ChannelPromise)ChannelHandlerContext.close(ChannelPromise)ChannelHandlerContext.deregister(ChannelPromise)这些方法,在一个 Handler 中调用,然后将事件传递给下一个 Handler,如下所示:


public class MyInboundHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {System.out.println("Connected!");ctx.fireChannelActive();}}


public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise promise) {System.out.println("Closing ..");ctx.close(promise);}}


ChannelHandler 中的状态变量 ChannelHandler 是一个 Handler 类,一般情况下,这个类的实例是可以被多个 channel 共同使用的,前提是这个 ChannelHandler 没有共享的状态变量。


但有时候,我们必须要在 ChannelHandler 中保持一个状态,那么就涉及到 ChannelHandler 中的状态变量的问题,看下面的一个例子:


public interface Message {// your methods here}


public class DataServerHandler extends SimpleChannelInboundHandler<Message> {


   private boolean loggedIn;
@Override public void channelRead0(ChannelHandlerContext ctx, Message message) { if (message instanceof LoginMessage) { authenticate((LoginMessage) message); loggedIn = true; } else (message instanceof GetDataMessage) { if (loggedIn) { ctx.writeAndFlush(fetchSecret((GetDataMessage) message)); } else { fail(); } } } ...
复制代码


}这个例子中,我们需要在收到 LoginMessage 之后,对消息进行认证,并保存认证状态,因为业务逻辑是这样的,所以必须要有一个状态变量。


那么这样带有状态变量的 Handler 就只能绑定一个 channel,如果绑定多个 channel 就有可能出现状态不一致的问题。一个 channel 绑定一个 Handler 实例,很简单,只需要在 initChannel 方法中使用 new 关键字新建一个对象即可。


public class DataServerInitializer extends ChannelInitializer<Channel> {@Overridepublic void initChannel(Channel channel) {channel.pipeline().addLast("handler", new DataServerHandler());}}


那么除了新建 handler 实例之外,还有没有其他的办法呢?当然是有的,那就是 ChannelHandlerContext 中的 AttributeKey 属性。还是上面的例子,我们看一下使用 AttributeKey 应该怎么实现:


public interface Message {// your methods here}


@Sharable
复制代码


public class DataServerHandler extends SimpleChannelInboundHandler<Message> {private final AttributeKey<Boolean> auth =AttributeKey.valueOf("auth");


    @Override   public void channelRead(ChannelHandlerContext ctx, Message message) {       Attribute<Boolean> attr = ctx.attr(auth);       if (message instanceof LoginMessage) {           authenticate((LoginMessage) o);           attr.set(true);       } else (message instanceof GetDataMessage) {           if (Boolean.TRUE.equals(attr.get())) {               ctx.writeAndFlush(fetchSecret((GetDataMessage) o));           } else {               fail();           }       }   }   ...
复制代码


}


上例中,首先定义了一个 AttributeKey,然后使用 ChannelHandlerContext 的 attr 方法将 Attribute 设置到 ChannelHandlerContext 中,这样该 Attribute 绑定到这个 ChannelHandlerContext 中了。后续即使使用同一个 Handler 在不同的 Channel 中该属性也是不同的。


下面是使用共享 Handler 的例子:


public class DataServerInitializer extends ChannelInitializer<Channel> {


   private static final DataServerHandler SHARED = new DataServerHandler();
@Override public void initChannel(Channel channel) { channel.pipeline().addLast("handler", SHARED); }
复制代码


}注意,在定义 DataServerHandler 的时候,我们加上了 @Sharable 注解,如果一个 ChannelHandler 使用了 @Sharable 注解,那就意味着你可以只创建一次这个 Handler,但是可以将其绑定到一个或者多个 ChannelPipeline 中。


注意,@Sharable 注解是为 java 文档准备的,并不会影响到实际的代码执行效果。


异步 Handler 之前介绍了,可以通过调用 pipeline.addLast 方法将 handler 加入到 pipeline 中,因为 pipeline 是一个 filter 的结构,所以加入的 handler 是顺序进行处理的。


但是,我希望某些 handler 是在新的线程中执行该怎么办?如果我们希望这些新的线程中执行的 Handler 是无序的又该怎么办?


比如我们现在有 3 个 handler 分别是 MyHandler1,MyHandler2 和 MyHandler3。


顺序执行的写法是这样的:


ChannelPipeline pipeline = ch.pipeline();


pipeline.addLast("MyHandler1", new MyHandler1());pipeline.addLast("MyHandler2", new MyHandler2());pipeline.addLast("MyHandler3", new MyHandler3());


如果要让 MyHandler3 在新的线程中执行,则可以加入 group 选项,从而让 handler 在新的 group 中运行:


static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);ChannelPipeline pipeline = ch.pipeline();


pipeline.addLast("MyHandler1", new MyHandler1());pipeline.addLast("MyHandler2", new MyHandler2());pipeline.addLast(group,"MyHandler3", new MyHandler3());


但是上例中 DefaultEventExecutorGroup 加入的 Handler 也是会顺序执行的,如果确实不想顺序执行,那么可以尝试考虑使用 UnorderedThreadPoolEventExecutor 。


总结本文讲解了 Event、Handler 和 PipeLine,并举例说明他们之间的关系和相互作用。后续会从 netty 的具体实践出发,进一步加深对 netty 的理解和应用,希望大家能够喜欢。


本文已收录于 http://www.flydean.com/05-netty-channelevent/


最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!


欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

发布于: 1 小时前阅读数: 4
用户头像

关注公众号:程序那些事,更多精彩等着你! 2020.06.07 加入

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧,尽在公众号:程序那些事!

评论

发布
暂无评论
netty系列之:Event、Handler和Pipeline