写点什么

【分布式技术专题】「探索高性能远程通信」基于 Netty 的分布式通信框架实现(Dispatcher 和 EventListener)(下)

作者:洛神灬殇
  • 2024-01-30
    江苏
  • 本文字数:3782 字

    阅读完需:约 12 分钟

【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)

前提介绍

经过阅读《【分布式技术专题】「探索高性能远程通信」基于 Netty 的分布式通信框架实现(附通信协议和代码)(上)》,相信您已经对网络通信框架的网络通信层的实现原理和协议模型有了一定的认识和理解。


整体框架如下图所示:



对应的组件的基本功能和功能实现范畴。



在上一节,我们主要讲对应的 Dispatcher 上面之前的逻辑操作实现,进行了对应的介绍和分析:



  • Boss 线程:接受连接流程,主要负责接受外部请求,这些请求可能是来自用户的操作或是其他服务的调用。一旦接收到请求,boss 会进行必要的处理,然后将请求分发给下面的线程池 worker 进行处理。

  • Worker 线程:系统中的工作执行者,负责接收 boss 分发的任务,然后执行具体的业务逻辑。这些任务可能涉及到数据的处理、服务的调用等。线程池 worker 通过 channel 与 boss 进行通信,确保任务能够准确无误地传递。

  • ChannelHandler 处理器 :ChannelHandler 接口是一个空接口,其中:ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter 是我们首先要实现和操作的基础。

本节重点

本节内容的重点是针对于 Dispatcher 分配和调度以及之后的操作流程的介绍和分析。


  • dispatcher 机制:在 worker 执行任务的过程中,需要有一个机制来调度和分配任务。这就是 dispatcher 的作用。


  • EventListener:基于在每个 worker 线程内部,eventListener 发挥着关键作用。它负责监听和处理线程中的事件,比如任务的完成、异常等。通过 eventListener,系统能够及时响应各种事件,进行必要的处理和反馈。

  • Service 业务逻辑实现:它代表了整个系统的核心业务逻辑。service 接收并处理来自 worker 线程的任务,完成具体的业务操作。这些操作可能涉及到数据的处理、服务的调用等。

Dispatcher(分派调度器)

Dispatcher 根据一定的策略和规则,将任务分配给合适的 worker 线程进行处理。这一环节保证了系统的负载均衡和高效运行。



消息经过 Pipline 链处理后,将由 Dispatcher 转发,并进入 EventListener 链进行处理。Dispatcher 内部使用了两个线程池:channelExecutor 和 dataExecutor。



  • netExecutor 用于处理通道事件和异常事件。由于通道事件可能需要同步调用远程服务,因此该线程池没有设定上限,因为同步调用会阻塞当前线程。

  • dataExecutor 用于处理消息事件。根据经验值,默认的最大线程数为 150,但可以通过选项参数进行修改。

EventListener

ChannelEventListener

ChannelInboundHandler 接口定义了一系列方法,用于处理 Channel 的入站事件。这些方法负责处理数据从外部系统(如网络)流入 Channel 的过程。这些方法都是将对应的事件(channelRegistered、channelUnregistered、channelActive、channelInactive)转发给 ChannelPipeline 中的下一个 ChannelInboundHandler,如下面的源码所示:


    /**     * Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.     *     * Sub-classes may override this method to change behavior.     */    @Skip    @Override    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {        ctx.fireChannelRegistered();    }
/** * Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Skip @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); }
/** * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Skip @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); }
/** * Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Skip @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); }
复制代码

Channel 通道事件

channelRegistered、channelUnregistered、channelActive 和 channelInactive 这几个方法是用于处理不同类型的通道事件。下面分别对这几个方法进行详细分析:


  • channelRegistered(ChannelHandlerContext ctx): 这个方法在通道被注册到 EventLoop(事件循环)后被调用。它的作用是将该事件转发给 ChannelPipeline 中的下一个 ChannelInboundHandler。

  • channelUnregistered(ChannelHandlerContext ctx):这个方法在通道从 EventLoop 中注销后被调用。它的作用是将该事件转发给 ChannelPipeline 中的下一个 ChannelInboundHandler。

  • channelActive(ChannelHandlerContext ctx):这个方法在通道变为活跃状态后被调用。它的作用是将该事件转发给 ChannelPipeline 中的下一个 ChannelInboundHandler。

  • channelInactive(ChannelHandlerContext ctx):这个方法在通道变为非活跃状态后被调用。 它的作用是将该事件转发给 ChannelPipeline 中的下一个 ChannelInboundHandler。

    我们只需将相应的实现注入并发布 ChannelActionEvent 对象模型事件。这样,ChannelActionEvent 对象的消费者就能够监听事件并执行相应的逻辑操作。通过这种方式,我们实现了事件的发布与订阅机制,以便实现松耦合的组件间通信,并能根据实际需求对事件进行灵活地处理和扩展。

定义 ChannelActionEvent

首先,定义一个自定义事件类 ChannelActionEventextends ,继承自 ApplicationEvent,主要作为通道变化的处理器事件。


public class ChannelActionEvent extends ApplicationEvent {    private Object data;    public ChannelActionEventextends (Object source, String data) {        super(source);        this.data = data;    }    public String getData() {        return data;    }}
@Componentpublic class ChannelActionEventListener extends implements ApplicationListener<ChannelActionEvent > {
@Override public void onApplicationEvent(MyEvent event) { Object data = event.getData(); // 执行对应的逻辑操作 System.out.println("Received event with data: " + data); }}
复制代码


因此,根据同样的逻辑,ExceptionEvent 事件也可以通过方法处理器的 exceptionCaught 方法进行处理。



 @Skip    @Override    @SuppressWarnings("deprecation")    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)            throws Exception {        ctx.fireExceptionCaught(cause);    }
复制代码


DataEvent 可以覆盖对应的 channelRead、channelReadComplete 的方法进行发布对应的事件处理即可。



    @Skip    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        ctx.fireChannelRead(msg);    }  @Skip    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        ctx.fireChannelReadComplete();    }
复制代码


框架会预先在 XXEventListener 链末端注册 ServiceMessageEventListener,该 Listener 负责调用被注册的 Service,并将返回值或异常回传。


Heartbeat、超时及重连机制

Netty 提供了读空闲和写空闲的功能来处理网络连接的空闲状态。


读空闲(Read Idle):当连接在指定的时间内没有接收到任何数据时,就会触发读空闲事件。这个事件可以用来检测连接是否处于空闲状态,或者判断通信对方是否还与服务器保持连接。通过设置 ChannelOption.READ_IDLE_TIME 参数来定义读空闲的时间。


写空闲(Write Idle):当连接在指定的时间内没有发送任何数据时,就会触发写空闲事件。这个事件可以用来定期发送心跳消息或其他需要保持连接的数据。通过设置 ChannelOption.WRITE_IDLE_TIME 参数来定义写空闲的时间。



在 Netty 中,可以通过 ChannelOption 设置读空闲和写空闲的时间,然后通过 ChannelHandler 的回调方法来处理空闲事件。常用的回调方法包括:


  • channelIdle(ChannelHandlerContext ctx, IdleStateEvent stateEvent):当发生空闲事件时调用该方法,可以在该方法中执行相应的逻辑操作。


通常,通过在管道中配置 IdleStateHandler 来启用空闲事件的检测和处理。


IdleStateHandler 是 Netty 提供的一个特殊的 ChannelHandler,用于检测并处理读空闲和写空闲事件。例如,可以在初始化管道时添加以下代码:


pipeline.addLast(new IdleStateHandler(0, 0, idleTime)); // 设置读写空闲时间pipeline.addLast(new MyIdleHandler()); // 自定义的空闲事件处理器
复制代码


在自定义的空闲事件处理器中,可以根据读空闲或写空闲事件执行相应的操作。例如,发送心跳消息、关闭连接等。

发布于: 刚刚阅读数: 3
用户头像

洛神灬殇

关注

🏆 InfoQ写作平台-签约作者 🏆 2020-03-25 加入

👑 后端技术架构师,前优酷资深工程师 📕 个人著作《深入浅出Java虚拟机—JVM原理与实战》 💻 10年开发经验,参与过多个大型互联网项目,定期分享技术干货和项目经验

评论

发布
暂无评论
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)_分布式_洛神灬殇_InfoQ写作社区