写点什么

netty 系列之:EventExecutor,EventExecutorGroup 和 netty 中的实现

作者:程序那些事
  • 2022 年 2 月 21 日
  • 本文字数:5442 字

    阅读完需:约 18 分钟

netty系列之:EventExecutor,EventExecutorGroup和netty中的实现

简介

netty 作为一个异步 NIO 框架,多线程肯定是它的基础,但是对于 netty 的实际使用者来说,一般是不需要接触到多线程的,我们只需要按照 netty 框架规定的流程走下去,自定义 handler 来处理对应的消息即可。


那么有朋友会问了,作为一个 NIO 框架,netty 的多线程到底体现在什么地方呢?它的底层原理是什么呢?


今天带大家来看看 netty 中的任务执行器 EventExecutor 和 EventExecutorGroup。

EventExecutorGroup

因为 EventExecutor 继承自 EventExecutorGroup,这里我们先来详细讲解一下 EventExecutorGroup。


先看下 EventExecutorGroup 的定义:


public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor>
复制代码


EventExecutorGroup 继承自 JDK 的 ScheduledExecutorService,即可以执行定时任务,也可以像普通的任务执行器一样提交任务去执行。


同时 EventExecutorGroup 还继承了 Iterable 接口,表示 EventExecutorGroup 是可遍历的,它的遍历对象是 EventExecutor。


EventExecutorGroup 有两个和 Iterable 相关的方法,分别是 next 和 iterator:


    EventExecutor next();
@Override Iterator<EventExecutor> iterator();
复制代码


在 EventExecutorGroup 中调用 next 方法会返回一个 EventExecutor 对象,那么 EventExecutorGroup 和 EventExecutor 是什么关系呢?


我们再来看一下 EventExecutor 的定义:


public interface EventExecutor extends EventExecutorGroup
复制代码


可以看到 EventExecutor 实际上是 EventExecutorGroup 的子类。但是在父类 EventExecutorGroup 中居然有对子类 EventExecutor 的引用。


这种在父类的 Group 中引用返回子类的设计模式在 netty 中非常常见,大家可以自行体会一下这样的设计到底是好还是坏。


EventExecutorGroup 作为一个 EventExecutor 的 Group 对象,它是用来管理 group 中的 EventExecutor 的。所以在 EventExecutorGroup 中设计了一些对 EventExecutor 的统一管理接口。


比如boolean isShuttingDown()方法用来判断这个 group 中的所有 EventExecutor 全都在被 shutdown 或者已经被 shutdown。


另外 EventExecutorGroupt 提供了 shutdown group 中所有 EventExector 的方法:Future<?> shutdownGracefully() 和 terminate 方法:Future<?> terminationFuture()


这两个方法都返回了 Future,所以我们可以认为这两个方法是异步方法。


EventExecutorGroup 中其他的方法都是一些对 JDK 中 ScheduledExecutorService 方法的重写,比如 submit,schedule,scheduleAtFixedRate,scheduleWithFixedDelay 等。

EventExecutor

接下来我们再研究一下 EventExecutor,在上一节中,我们简单的提到了 EventExecutor 继承自 EventExecutorGroup,和 EventExecutorGroup 相比,EventExecutor 有哪些新的方法呢?


我们知道 EventExecutorGroup 继承了 Iterable,并且定义了一个 next 方法用来返回 Group 中的一个 EventExecutor 对象。


因为 Group 中有很多个 EventExecutor,至于具体返回哪一个 EventExecutor,还是由具体的实现类来实现的。


在 EventExecutor 中,它重写了这个方法:


@OverrideEventExecutor next();
复制代码


这里的 next 方法,返回的是 EventExecutor 本身。


另外,因为 EventExecutor 是由 EventExecutorGroup 来管理的,所以 EventExecutor 中还存在一个 parent 方法,用来返回管理 EventExecutor 的 EventExecutorGroup:


EventExecutorGroup parent();
复制代码


EventExecutor 中新加了两个 inEventLoop 方法,用来判断给定的线程是否在 event loop 中执行。


    boolean inEventLoop();
boolean inEventLoop(Thread thread);
复制代码


EventExecutor 还提供两个方法可以返回 Promise 和 ProgressivePromise.


<V> Promise<V> newPromise();<V> ProgressivePromise<V> newProgressivePromise();
复制代码


熟悉 ECMAScript 的朋友可能知道,Promise 是 ES6 引入的一个新的语法功能,用来解决回调地狱的问题。这里的 netty 引入的 Promise 继承自 Future,并且添加了两个 success 和 failure 的状态。


ProgressivePromise 更进一步,在 Promise 基础上,提供了一个 progress 来表示进度。


除此之外,EventExecutor 还提供了对 Succeeded 的结果和 Failed 异常封装成为 Future 的方法。


    <V> Future<V> newSucceededFuture(V result);
<V> Future<V> newFailedFuture(Throwable cause);
复制代码

EventExecutorGroup 在 netty 中的基本实现

EventExecutorGroup 和 EventExecutor 在 netty 中有很多非常重要的实现,其中最常见的就是 EventLoop 和 EventLoopGroup,鉴于 EventLoop 和 EventLoopGroup 的重要性,我们会在后面的章节中重点讲解他们。这里先来看下 netty 中的其他实现。


netty 中 EventExecutorGroup 的默认实现叫做 DefaultEventExecutorGroup,它的继承关系如下所示:


<img src="https://img-blog.csdnimg.cn/ae242899a8234b668045716daa2eec1a.png" style="zoom:67%;" />


可以看到 DefaultEventExecutorGroup 继承自 MultithreadEventExecutorGroup,而 MultithreadEventExecutorGroup 又继承自 AbstractEventExecutorGroup。


先看下 AbstractEventExecutorGroup 的逻辑。AbstractEventExecutorGroup 基本上是对 EventExecutorGroup 中接口的一些实现。


我们知道 EventExecutorGroup 中定义了一个 next()方法,可以返回 Group 中的一个 EventExecutor。


在 AbstractEventExecutorGroup 中,几乎所有 EventExecutorGroup 中的方法实现,都是调用 next()方法来完成的,以 submit 方法为例:


public Future<?> submit(Runnable task) {        return next().submit(task);    }
复制代码


可以看到 submit 方法首先调用 next 获取到的 EventExecutor,然后再调用 EventExecutor 中的 submit 方法。


AbstractEventExecutorGroup 中的其他方法都是这样的实现。但是 AbstractEventExecutorGroup 中并没有实现 next()方法,具体如何从 Group 中获取到 EventExecutor,还需要看底层的具体实现。


MultithreadEventExecutorGroup 继承自 AbstractEventExecutorGroup,提供了多线程任务的支持。


MultithreadEventExecutorGroup 有两类构造函数,在构造函数中可以指定多线程的个数,还有任务执行器 Executor,如果没有提供 Executor 的话,可以提供一个 ThreadFactory,MultithreadEventExecutorGroup 会调用new ThreadPerTaskExecutor(threadFactory)来为每一个线程构造一个 Executor:


    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {        this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);    }
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }
复制代码


MultithreadEventExecutorGroup 对多线程的支持是怎么实现的呢?


首先 MultithreadEventExecutorGroup 提供了两个 children,分别是 children 和 readonlyChildren:


    private final EventExecutor[] children;    private final Set<EventExecutor> readonlyChildren;
复制代码


children 和 MultithreadEventExecutorGroup 中的线程个数是一一对应的,有多少个线程,children 就有多大。


children = new EventExecutor[nThreads];
复制代码


然后通过调用 newChild 方法,将传入的 executor 构造成为 EventExecutor 返回:


children[i] = newChild(executor, args);
复制代码


看一下 newChild 方法的定义:


 protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
复制代码


这个方法在 MultithreadEventExecutorGroup 中并没有实现,需要在更具体的类中实现它。


readonlyChildren 是 child 的只读版本,用来在遍历方法中返回:


readonlyChildren = Collections.unmodifiableSet(childrenSet);
public Iterator<EventExecutor> iterator() { return readonlyChildren.iterator(); }
复制代码


我们现在有了 Group 中的所有 EventExecutor,那么在 MultithreadEventExecutorGroup 中,next 方法是怎么选择具体返回哪一个 EventExecutor 呢?


先来看一下 next 方法的定义:



private final EventExecutorChooserFactory.EventExecutorChooser chooser;
chooser = chooserFactory.newChooser(children);
public EventExecutor next() { return chooser.next(); }
复制代码


next 方法调用的是 chooser 的 next 方法,看一下 chooser 的 next 方法具体实现:


public EventExecutor next() {            return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];        }
复制代码


可以看到,其实是一个很简单的根据 index 来获取对象的操作。


最后看一下 DefaultEventExecutorGroup 中对 newChild 方法的实现:


    protected EventExecutor newChild(Executor executor, Object... args) throws Exception {        return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]);    }
复制代码


newChild 返回的 EventExecutor 使用的是 DefaultEventExecutor。这个类是 EventExecutor 在 netty 中的默认实现,我们在下一小结中详细进行讲解。

EventExecutor 在 netty 中的基本实现

EventExecutor 在 netty 中的默认实现是 DefaultEventExecutor,先看下它的继承结构:


<img src="https://img-blog.csdnimg.cn/67a3434b3dc14e1d98baf8cb5dbf07cf.png" style="zoom:67%;" />


DefaultEventExecutor 继承自 SingleThreadEventExecutor,而 SingleThreadEventExecutor 又继承自 AbstractScheduledEventExecutor,AbstractScheduledEventExecutor 继承自 AbstractEventExecutor。


先来看一下 AbstractEventExecutor 的定义:


public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor
复制代码


AbstractEventExecutor 继承了 AbstractExecutorService,并且实现了 EventExecutor 接口。


AbstractExecutorService 是 JDK 中的类,它提供了 ExecutorService 的一些实现,比如 submit, invokeAny and invokeAll 等方法。


AbstractEventExecutor 作为 ExecutorGroup 的一员,它提供了一个 EventExecutorGroup 类型的 parent 属性:


private final EventExecutorGroup parent;
public EventExecutorGroup parent() { return parent; }
复制代码


对于 next 方法来说,AbstractEventExecutor 返回的是它本身:


public EventExecutor next() {        return this;    }
复制代码


AbstractScheduledEventExecutor 继承自 AbstractEventExecutor,它内部使用了一个 PriorityQueue 来存储包含定时任务的 ScheduledFutureTask,从而实现定时任务的功能:


PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
复制代码


接下来是 SingleThreadEventExecutor,从名字可以看出,SingleThreadEventExecutor 使用的是单线程来执行提交的 tasks,SingleThreadEventExecutor 提供了一个默认的 pending 执行 task 的任务大小:DEFAULT_MAX_PENDING_EXECUTOR_TASKS,还定义了任务执行的几种状态:


    private static final int ST_NOT_STARTED = 1;    private static final int ST_STARTED = 2;    private static final int ST_SHUTTING_DOWN = 3;    private static final int ST_SHUTDOWN = 4;    private static final int ST_TERMINATED = 5;
复制代码


之前提到了 EventExecutor 中有一个特有的 inEventLoop 方法,判断给定的 thread 是否在 eventLoop 中,在 SingleThreadEventExecutor 中,我们看一下具体的实现:


    public boolean inEventLoop(Thread thread) {        return thread == this.thread;    }
复制代码


具体而言就是判断给定的线程和 SingleThreadEventExecutor 中定义的 thread 属性是不是同一个 thread,SingleThreadEventExecutor 中的 thread 是这样定义的:


 
复制代码


这个 thread 是在 doStartThread 方法中进行初始化的:


executor.execute(new Runnable() {            @Override            public void run() {                thread = Thread.currentThread();
复制代码


所以这个 thread 是任务执行的线程,也就是 executor 中执行任务用到的线程。


再看一下非常关键的 execute 方法:


private void execute(Runnable task, boolean immediate) {        boolean inEventLoop = inEventLoop();        addTask(task);        if (!inEventLoop) {            startThread();
复制代码


这个方法首先将 task 添加到任务队列中,然后调用 startThread 开启线程来执行任务。


最后来看一下 DefaultEventExecutor,这个 netty 中的默认实现:


public final class DefaultEventExecutor extends SingleThreadEventExecutor 
复制代码


DefaultEventExecutor 继承自 SingleThreadEventExecutor,这个类中,它定义了 run 方法如何实现:


    protected void run() {        for (;;) {            Runnable task = takeTask();            if (task != null) {                task.run();                updateLastExecutionTime();            }
if (confirmShutdown()) { break; } } }
复制代码


在 SingleThreadEventExecutor 中,我们会把任务加入到 task queue 中,在 run 方法中,会从 task queue 中取出对应的 task,然后调用 task 的 run 方法执行。

总结

DefaultEventExecutorGroup 继承了 MultithreadEventExecutorGroup,MultithreadEventExecutorGroup 中实际调用的是 SingleThreadEventExecutor 来执行具体的任务。


本文已收录于 http://www.flydean.com/05-1-netty-event…entexecutorgroup/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

发布于: 17 小时前阅读数: 38
用户头像

关注公众号:程序那些事,更多精彩等着你! 2020.06.07 加入

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧,尽在公众号:程序那些事!

评论

发布
暂无评论
netty系列之:EventExecutor,EventExecutorGroup和netty中的实现