写点什么

【Netty】「萌新入门」(二)剖析 EventLoop

作者:sidiot
  • 2023-06-14
    浙江
  • 本文字数:6002 字

    阅读完需:约 20 分钟

前言


本篇博文是《从 0 到 1 学习 Netty》中入门系列的第二篇博文,主要内容是介绍 Netty 中 EventLoop 的使用,优化及源码解析,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;


概述


事件循环对象 EventLoop


在 Netty 中,EventLoop 是用于处理 I/O 事件的线程。它允许我们在单线程中同时处理多个连接,避免了阻塞和等待。


简单来说,Netty 的 EventLoop 具有以下几个特点:


  1. 单线程执行:每个 EventLoop 都是由一个线程负责执行,这样可以减少线程切换开销,提高网络应用程序的性能。

  2. 多路复用:EventLoop 使用底层操作系统提供的 I/O 模型(例如 Java NIO)实现多路复用,即一条线程可以监听多个 Channel,从而实现并发处理多个连接的能力。

  3. 事件驱动:EventLoop 通过监听与 Channel 相关的 I/O 事件(例如读、写、连接等),并将其转化为事件对象(例如 ChannelReadEventChannelWriteEvent 等)进行处理。这种事件驱动模型可以让 Netty 高效地响应 I/O 事件,避免了轮询等不必要的操作。

  4. 非阻塞操作:EventLoop 中所有的操作都是非阻塞的,包括 I/O 操作和异步任务的执行。这样可以确保整个系统始终处于高效运行状态,并提高了系统的可伸缩性。


其中,EventLoop 的继承关系如下:


public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {    EventLoopGroup parent();}
public interface EventLoopGroup extends EventExecutorGroup {...}
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {...}
复制代码


  • 继承自 java.util.concurrent.ScheduledExecutorService; 因此包含了线程池中所有的方法;

  • 继承自 Netty 自己的 OrderedEventExecutor

  • 提供了 boolean inEventLoop(Thread var1) 方法判断一个线程是否属于此 EventLoop;

  • 提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup;


总之,Netty 中的 EventLoop 提供了一种高性能、高可靠性的网络编程模型,它解决了网络应用程序中的并发、高负载等问题,是构建高性能网络应用程序的重要组成部分。




事件循环组 EventLoopGroup


在 Netty 中,EventLoopGroup 是用于处理 I/O 操作和任务执行的线程池。它负责管理一个或多个 EventLoop 实例,每个 EventLoop 负责处理其分配的所有 Channel 的生命周期中发生的事件。


EventLoopGroup 通常会创建两种类型的线程池:Boss GroupWorker Group。Boss Group 负责监听传入连接的请求,而 Worker Group 则负责处理已经建立的连接的读写操作


当一个新的连接到来时,Boss Group 会将连接请求注册到某个 EventLoop 的 Selector 上,并将其关联到对应的 Channel 对象。随后,Worker Group 将负责处理新连接上的所有 I/O 操作。


EventLoopGroup 还提供了一些方法,例如 shutdownGracefully(),可以优雅关闭 EventLoopGroup 的所有线程,并等待它们完成未完成的任务。这对于保证程序的安全关闭非常有用。


其中,EventLoopGroup 的继承关系如下:


public interface EventLoopGroup extends EventExecutorGroup {...}
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {...}
复制代码


  • 继承自 Netty 自己的 EventExecutorGroup

  • 实现了 Iterable 接口提供遍历 EventLoop 的能力;

  • 另有 next 方法获取集合中下一个 EventLoop;


执行任务


1、创建事件循环组,线程数 nThreads 可采用默认设置或者根据需求自定义:


EventLoopGroup group = new NioEventLoopGroup(2);
复制代码


2、获取下一个事件循环对象:


group.next();
复制代码


这里采用的是轮询的方式进行分配,主要是为了任务分工比较均匀:


System.out.println(group.next());  System.out.println(group.next());  System.out.println(group.next());  System.out.println(group.next());
复制代码


运行结果:


io.netty.channel.nio.NioEventLoop@4b14c583io.netty.channel.nio.NioEventLoop@65466a6aio.netty.channel.nio.NioEventLoop@4b14c583io.netty.channel.nio.NioEventLoop@65466a6a
复制代码


3、执行普通任务,使用 submit


group.next().submit(() -> {    try {        Thread.sleep(1000);    } catch (InterruptedException e) {        throw new RuntimeException(e);    }    log.debug(Thread.currentThread().getName());});
log.debug(Thread.currentThread().getName());
复制代码


运行结果:


15:49:31 [DEBUG] [main] c.s.n.c.TestEventLoop - main15:49:32 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - nioEventLoopGroup-2-1
复制代码


4、执行定时任务,使用 scheduleAtFixedRate


group.next().scheduleAtFixedRate(() -> {    log.debug("sidiot.");}, 0, 1, TimeUnit.SECONDS);
log.debug(Thread.currentThread().getName());
复制代码


运行结果:


16:26:44 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.16:26:44 [DEBUG] [main] c.s.n.c.TestEventLoop - main16:26:45 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.16:26:46 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.16:26:47 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.
复制代码


5、关闭 EventLoopGroup


group.shutdownGracefully();
复制代码


shutdownGracefully 方法会优雅地关闭 EventLoopGroup。该方法首先会切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行,从而确保整体应用是在正常有序的状态下退出的。


IO 任务


在上篇博文 从0到1(六):入门-Hello World 中有详细的解析,因此这里就不展开叙述,直接给出代码;


服务端


@Slf4jpublic class EventLoopServer {    public static void main(String[] args) {        new ServerBootstrap()                .group(new NioEventLoopGroup())                .channel(NioServerSocketChannel.class)                .childHandler(new ChannelInitializer<NioSocketChannel>() {                    @Override                    protected void initChannel(NioSocketChannel ch) throws Exception {                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {                            @Override                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {                                ByteBuf buf = (ByteBuf) msg;                                log.debug(buf.toString(StandardCharsets.UTF_8));                            }                        });                    }                })                .bind(7999);    }}
复制代码


客户端


public class EventLoopClient {    public static void main(String[] args) throws InterruptedException {        Channel channel = new Bootstrap()                .group(new NioEventLoopGroup())                .channel(NioSocketChannel.class)                .handler(new ChannelInitializer<NioSocketChannel>() {                    @Override                    protected void initChannel(NioSocketChannel ch) throws Exception {                        ch.pipeline().addLast(new StringEncoder());                    }                })                .connect(new InetSocketAddress(7999))                .sync()                .channel();
System.out.println(channel); }}
复制代码


运行结果:


17:57:12 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.EventLoopServer - C1: sidiot.17:57:34 [DEBUG] [nioEventLoopGroup-2-3] c.s.n.c.EventLoopServer - C2: sidiot.17:57:50 [DEBUG] [nioEventLoopGroup-2-3] c.s.n.c.EventLoopServer - C2: sid10t.17:57:58 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.EventLoopServer - C1: sid10t.
复制代码


从运行结果中不难发现,每个客户端都会绑定一个固定的线程进行处理,这是因为当一个客户端连接到服务器时,Netty 会创建一个新的 Channel,并将其注册到一个 EventLoop 上。


每个客户端所对应的 Channel 将由同一个 EventLoop 来处理,这样可以保证处理消息的顺序,并且避免了线程上下文切换的开销。



细化分工


在上述 IO 任务中,我们只使用了一个 NioEventLoopGroup 对所有事件进行处理。为了提高效率,需要进一步的细化分工,我们将使用两个 EventLoopGroup 分别作为 BossWorkerBoss 负责处理 Accept 事件,而 Worker 负责 Read & Write 事件;


.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
复制代码


同时考虑到,如果 handler 的执行时间过长,则会占用 Worker 的 NIO 线程,即会影响 Worker 的读写效率,因此还需要再次进行细分,创建一个独立的 EventLoopGroup 用来处理比较耗时的 handler;


在不细分的情况下,运行结果如下:


10:30:19 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C1: sid10t.10:30:20 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: C3: sid10t.10:30:24 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h2: C1: sid10t.10:30:24 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C2: sid10t.10:30:25 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h2: C3: sid10t.10:30:29 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h2: C2: sid10t.10:30:29 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C4: sid10t.10:30:34 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h2: C4: sid10t.
复制代码


修改后的代码如下:


EventLoopGroup group = new DefaultEventLoopGroup();
.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast("h1", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.debug("h1: {}.", buf.toString(StandardCharsets.UTF_8)); ctx.fireChannelRead(msg); } }).addLast(group, "h2", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } ByteBuf buf = (ByteBuf) msg; log.debug("h2: {}.", buf.toString(StandardCharsets.UTF_8)); } }); }})
复制代码


运行结果:


10:35:31 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C1: sidiot..10:35:32 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: C2: sidiot..10:35:32 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C3: sidiot..10:35:33 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: C4: sidiot..10:35:36 [DEBUG] [defaultEventLoopGroup-2-2] c.s.n.c.EventLoopServer - h2: C1: sidiot..10:35:37 [DEBUG] [defaultEventLoopGroup-2-3] c.s.n.c.EventLoopServer - h2: C2: sidiot..10:35:37 [DEBUG] [defaultEventLoopGroup-2-4] c.s.n.c.EventLoopServer - h2: C3: sidiot..10:35:38 [DEBUG] [defaultEventLoopGroup-2-1] c.s.n.c.EventLoopServer - h2: C4: sidiot..
复制代码


通过前后的结果对比,可以看出,在不使用独立的 EventLoopGroup 进行细分的情况下,耗时的 handler 会一直占用 NIO 线程,从而使得其他的 channel 需要进行等待,导致效率低下;



源码浅析


在上述过程中,多个 handler 用的是不同的 EventLoop,那它们是如何进行切换的呢?


在 Netty 框架中,当有数据需要被读取时,会将读取操作封装成一个 ChannelRead 事件,并通过 ChannelHandlerContext 传递给后续的处理器来处理。


其中,有一个重要的方法 invokeChannelRead,用于调用下一个 ChannelHandler 的 channelRead 方法,即实现数据的传递和处理的过程。


static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        next.invokeChannelRead(m);    } else {        executor.execute(new Runnable() {            @Override            public void run() {                next.invokeChannelRead(m);            }        });    }}
复制代码


在上述代码中,抽象类 AbstractChannelHandlerContext 表示了一个 ChannelHandler 上下文对象,它记录了处理当前 ChannelHandler 的信息,以及与之前后的 ChannelHandler 的关系等。


接着,使用了 pipeline.touch() 方法来标记并检查消息对象 msg 是否为空。该方法可以避免在处理过程中重复处理同一个消息,提高处理效率。如果 msg 为空,则会抛出异常提示 “msg 不能为空”。


然后,通过 next.executor() 获取到 EventExecutor,即执行 ChannelHandler 的线程池。如果当前线程为 Netty 线程(即 IO 线程),则直接调用 next.invokeChannelRead(m) 执行下一个 ChannelHandler 的 channelRead 方法;否则,将该任务加入到线程池中异步执行。


最后,使用匿名内部类实现 Runnable 接口,定义 run 方法来执行下一个 ChannelHandler 的 channelRead 方法。


总之,invokeChannelRead 方法的主要作用就是封装了 ChannelHandlerContext 的读取操作,根据当前线程的执行情况选择是否异步执行。这种机制可以提高网络读取的效率和性能,避免了阻塞 IO 操作带来的性能问题,并使得处理过程更加灵活和可控。


后记


以上就是 剖析 EventLoop 的所有内容了,希望本篇博文对大家有所帮助!


参考:



📝 上篇精讲:「萌新入门」(一)Hello, World!

💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注;

👍 创作不易,请多多支持;

发布于: 2 小时前阅读数: 2
用户头像

sidiot

关注

还未添加个人签名 2023-06-04 加入

还未添加个人简介

评论

发布
暂无评论
【Netty】「萌新入门」(二)剖析 EventLoop_Java_sidiot_InfoQ写作社区