写点什么

Netty 源码—Reactor 线程模型二

  • 2025-03-22
    福建
  • 本文字数:26985 字

    阅读完需:约 89 分钟

5.NioEventLoop 的执行总体框架


(1)Reactor 线程所做的三件事情


NioEventLoop 的 run()方法里有个无限 for 循环,for 循环里便是 Reactor 线程所要做的 3 件事情。

 

一.首先是调用 select()方法进行一次事件轮询


由于一个 NioEventLoop 对应一个 Selector,所以该 select()方法便是轮询注册到这个 Reactor 线程对应的 Selector 上的所有 Channel 的 IO 事件。注意,select()方法里也有一个无限 for 循环,但是这个无限 for 循环可能会被某些条件中断。

 

二.然后调用 processSelectedKeys()方法处理轮询出来的 IO 事件

 

三.最后调用 runAllTasks()方法来处理外部线程放入 TaskQueue 的任务


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    private volatile int ioRatio = 50;    ...    @Override    protected void run() {        for (;;) {            ...            //1.调用select()方法执行一次事件轮询            select(wakenUp.getAndSet(false));            if (wakenUp.get()) {                selector.wakeup();            }            ...            //2.处理产生IO事件的Channel            processSelectedKeys();            ...            //3.执行外部线程放入TaskQueue的任务            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);        }    }
private void select(boolean oldWakenUp) throws IOException { for(;;) { //1.定时任务截止时间快到了,中断本次轮询 //2.轮询过程中发现有任务加入,中断本次轮询 //3.阻塞式select操作: selector.select(timeoutMills) //4.避免JDK空轮询Bug } } ...}
复制代码


(2)处理多久 IO 事件就执行多久任务


在 NioEventLoop 的 run()方法中,有个 ioRatio 默认是 50,代表处理 IO 事件的时间和执行任务的时间是 1:1。也就是执行了多久的 processSelectedKeys()方法后,紧接着就执行多久的 runAllTasks()方法。


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    private volatile int ioRatio = 50;    ...    @Override    protected void run() {        for (;;) {            ...            //1.调用select()方法执行一次事件轮询            select(wakenUp.getAndSet(false));            if (wakenUp.get()) {                selector.wakeup();            }            ...            final int ioRatio = this.ioRatio;            if (ioRatio == 100) {                try {                    processSelectedKeys();                } finally {                    // Ensure we always run tasks.                    runAllTasks();                }            } else {                final long ioStartTime = System.nanoTime();                try {                    processSelectedKeys();                } finally {                    // Ensure we always run tasks.                    final long ioTime = System.nanoTime() - ioStartTime;                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                }            }            ...        }    }    ...}
复制代码


(3)NioEventLoop.run()方法的执行流程


NioEventLoop.run() -> for(;;)  select() //执行一次事件轮询检查是否有IO事件  processSelectedKeys() //处理产生IO事件的Channel  runAllTasks() //处理异步任务队列//这3步放在一个线程处理应该是为了节约线程,因为不是总会有IO事件和异步任务的
复制代码


6.Reactor 线程执行一次事件轮询


(1)执行 select 操作前设置 wakeUp 变量


NioEventLoop 有个 wakenUp 成员变量表示是否应该唤醒正在阻塞的 select 操作。NioEventLoop 的 run()方法准备执行 select()方法进行一次新的循环逻辑之前,都会将 wakenUp 设置成 false,标志新一轮循环的开始。


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    //Boolean that controls determines if a blocked Selector.select should break out of its selection process.     //In our case we use a timeout for the select method and the select method will block for that time unless waken up.    private final AtomicBoolean wakenUp = new AtomicBoolean();    ...    @Override    protected void run() {        for (;;) {            ...            //1.调用select()方法执行一次事件轮询            select(wakenUp.getAndSet(false));            if (wakenUp.get()) {                selector.wakeup();            }            ...        }    }    ...}
复制代码


如下是 NioEventLoop 的 select()方法的执行逻辑,也就是 Netty 关于事件循环的 4 段逻辑。


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    Selector selector;    ...    private void select(boolean oldWakenUp) throws IOException {        Selector selector = this.selector;        for(;;) {            //1.定时任务截止时间快到了,中断本次轮询            //2.轮询过程中发现有任务加入,中断本次轮询            //3.阻塞式select操作: selector.select(timeoutMills)            //4.避免JDK空轮询Bug        }    }    ...}
复制代码


(2)定时任务快开始了则中断本次轮询


NioEventLoop 中的 Reactor 线程的 select 操作也是一个 for 循环。

 

在 for 循环第一步,如果发现当前定时任务队列中某个任务的开始时间快到了(小于 0.5ms),那么就跳出循环。在跳出循环之前,如果发现目前为止还没有进行过 select 操作,就调用一次 selectNow()方法执行非阻塞式 select 操作。

 

Netty 里的定时任务队列是按照延迟时间从小到大进行排序的,所以 delayNanos()方法返回的第一个定时任务的延迟时间便是最早截止的时间。


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    Selector selector;    ...    private void select(boolean oldWakenUp) throws IOException {        Selector selector = this.selector;        int selectCnt = 0;        long currentTimeNanos = System.nanoTime();//当前时间        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);//当前时间 + 定时任务的最早截止时间        for(;;) {            //1.定时任务截止时间快到了,中断本次轮询            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;            if (timeoutMillis <= 0) {                if (selectCnt == 0) {                    selector.selectNow();//非阻塞执行select操作                    selectCnt = 1;                }                break;            }            ...        }    }    ...}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { ... protected long delayNanos(long currentTimeNanos) { ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); if (scheduledTask == null) { return SCHEDULE_PURGE_INTERVAL; } return scheduledTask.delayNanos(currentTimeNanos); } ...}
//Abstract base class for EventExecutors that want to support scheduling.public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { Queue<ScheduledFutureTask<?>> scheduledTaskQueue;//定时任务队列 ... final ScheduledFutureTask<?> peekScheduledTask() { Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; if (scheduledTaskQueue == null) { return null; } return scheduledTaskQueue.peek(); } ...}
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> { ... public long delayNanos(long currentTimeNanos) { return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME)); } public long deadlineNanos() { return deadlineNanos; } ...}
复制代码


(3)轮询中发现有任务加入则中断本次轮询


注意:Netty 的任务队列包括普通任务和定时任务。定时任务快开始时需要中断本次轮询,普通任务队列非空时也需要中断本次轮询。

 

Netty 为了保证普通任务队列里的普通任务能够及时执行,在调用 selector.select()方法进行阻塞式 select 操作前会判断普通任务队列是否为空。如果不为空,那么就调用 selector.selectNow()方法执行一次非阻塞 select 操作,然后跳出循环。


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    Selector selector;    ...    private void select(boolean oldWakenUp) throws IOException {        Selector selector = this.selector;        int selectCnt = 0;        ...        for(;;) {            ...            //2.轮询过程中发现有任务加入,中断本次轮询            if (hasTasks() && wakenUp.compareAndSet(false, true)) {                selector.selectNow();//非阻塞式执行select操作                selectCnt = 1;                break;            }            ...        }    }    ...}
//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { private final Queue<Runnable> tailTasks; ... @Override protected boolean hasTasks() { return super.hasTasks() || !tailTasks.isEmpty(); } ...}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { private final Queue<Runnable> taskQueue;//普通任务队列 ... protected boolean hasTasks() { assert inEventLoop(); return !taskQueue.isEmpty(); } ...}
复制代码


(4)执行阻塞式 select 操作


一.最多阻塞到第一个定时任务的开始时间


执行到这一步,说明 Netty 的普通任务队列里的队列为空,并且所有定时任务的开始时间还未到(大于 0.5ms)。于是便进行一次阻塞式 select 操作,一直阻塞到第一个定时任务的开始时间,也就是把 timeoutMills 作为参数传入 select()方法中。


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    Selector selector;    ...    private void select(boolean oldWakenUp) throws IOException {        Selector selector = this.selector;        ...        for(;;) {            ...            //3.阻塞式select操作: selector.select(timeoutMills),最多阻塞timeoutMills时间            int selectedKeys = selector.select(timeoutMillis);            selectCnt ++;            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {                // - Selected something,                // - waken up by user, or                // - the task queue has a pending task.                // - a scheduled task is ready for processing                break;            }            ...        }    }    ...}
复制代码


二.外部线程提交任务会唤醒 Reactor 线程


如果第一个定时任务的延迟时间非常长,比如一小时,那么有可能线程会一直阻塞在 select 操作(select 完还是会返回的)。但只要这段时间内有新任务加入,该阻塞就会被释放。

 

比如当有外部线程执行 NioEventLoop 的 execute()方法添加任务时,就会调用 NioEventLoop 的 wakeUp()方法来通过 selector.wakeup()方法,去唤醒正在执行 selector.select(timeoutMills)而被阻塞的线程。


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    //Boolean that controls determines if a blocked Selector.select should break out of its selection process.     //In our case we use a timeout for the select method and the select method will block for that time unless waken up.    private final AtomicBoolean wakenUp = new AtomicBoolean();    Selector selector;    ...    @Override    protected void wakeup(boolean inEventLoop) {        if (!inEventLoop && wakenUp.compareAndSet(false, true)) {            selector.wakeup();        }    }}
//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { ...}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { ... @Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); }
boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } }
if (!addTaskWakesUp && wakesUpForTask(task)) { //调用NioEventLoop.wakeup()方法唤醒正在执行selector.select(timeoutMills)而被阻塞的线程 wakeup(inEventLoop); } } ...}
复制代码


三.是否中断本次轮询的判断条件


阻塞式 select 操作结束后,Netty 又会做一系列状态判断来决定是否中断本次轮询,如果满足如下条件就中断本次轮询:


条件一:检测到 IO 事件

条件二:被用户主动唤醒

条件三:普通任务队列里有任务需要执行

条件四:第一个定时任务即将要被执行


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    Selector selector;    ...    private void select(boolean oldWakenUp) throws IOException {        Selector selector = this.selector;        ...        for(;;) {            ...            //3.阻塞式select操作: selector.select(timeoutMills),最多阻塞timeoutMills时间            int selectedKeys = selector.select(timeoutMillis);            selectCnt ++;            //阻塞式select操作结束后,Netty又会做一系列状态判断来决定是否中断本次轮询            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {                // - Selected something, 检测到IO事件                // - waken up by user, 被用户主动唤醒                // - the task queue has a pending task. 普通任务队列里有任务需要执行                // - a scheduled task is ready for processing 第一个定时任务即将要被执行                break;            }            ...        }    }    ...}
复制代码


(5)避免 JDK 的空轮询 Bug


JDK 空轮询 Bug 会导致 selector 一直空轮询,最终导致 CPU 的利用率 100%。

 

一.Netty 避免 JDK 空轮询的方法


首先每次执行 selector.select(timeoutMillis)之前都会记录开始时间,在阻塞式 select 操作后记录结束时间。

 

然后判断阻塞式 select 操作是否持续了至少 timeoutMillis 时间。如果阻塞式 select 操作持续的时间大于等于 timeoutMillis,说明这是一次有效的轮询,于是重置 selectCnt 为 1。如果阻塞式 select 操作持续的时间小于 timeoutMillis,则说明可能触发了 JDK 的空轮询 Bug,于是自增 selectCnt。当持续时间很短的 select 操作的次数 selectCnt 超过了 512 次,那么就重建 Selector。


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    Selector selector;    private static final int SELECTOR_AUTO_REBUILD_THRESHOLD = 512;    ...    private void select(boolean oldWakenUp) throws IOException {        Selector selector = this.selector;        intselectCnt = 0;        long currentTimeNanos = System.nanoTime();//记录开始时间        ...        for(;;) {            ...            int selectedKeys = selector.select(timeoutMillis);//进行阻塞式select操作            selectCnt++;//select操作持续时间很短,可能出现空轮询,selectCnt需要自增            long time = System.nanoTime();//记录结束时间            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {                //如果select操作持续的时间大于timeoutMillis,说明这是一次有效的轮询,重置selectCnt为1                selectCnt = 1;            } else if (selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {                //如果持续时间很短的select操作的次数超过了512次,就重建selector                rebuildSelector();//重建Selector                selector = this.selector;                selector.selectNow();                selectCnt = 1;                break;            }            currentTimeNanos = time;            ...        }    }    ...}
复制代码


二.重建 Selector 的逻辑


重建 Selector 的逻辑就是通过 openSelector()方法创建一个新的 Selector,然后执行一个无限的 for 循环,只要执行过程中出现一次并发修改 SelectionKeys 异常,那么就重新开始转移,直到转移完成。

 

具体的转移步骤为:首先拿到有效的 key,然后取消该 key 在旧 Selector 上的事件注册。接着将该 key 对应的 Channel 注册到新的 Selector 上,最后重新绑定 Channel 和新的 key。


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    Selector selector;    ...    //Replaces the current Selector of this event loop with newly created Selectors to work around the infamous epoll 100% CPU bug.    public void rebuildSelector() {        final Selector oldSelector = selector;        final Selector newSelector = openSelector();        int nChannels = 0;        for (;;) {            try {                for (SelectionKey key: oldSelector.keys()) {                    Object a = key.attachment();                    //1.拿到有效的key                    if (!key.isValid() || key.channel().keyFor(newSelector) != null) {                        continue;                    }                            int interestOps = key.interestOps();                    //2.取消该key在旧Selector上的事件注册                    key.cancel();                    //3.将该key对应的Channel注册到新的Selector上                    SelectionKey newKey = key.channel().register(newSelector, interestOps, a);                    if (a instanceof AbstractNioChannel) {                        //4.重新绑定Channel和新的key                        ((AbstractNioChannel) a).selectionKey = newKey;                    }                    nChannels++;                }                break;            } catch(ConcurrentModificationException e) {                continue;            }        }        selector = newSelector;        oldSelector.close();    }    ...}
复制代码


(6)执行一次事件轮询的总结


关于 Reactor 线程的 select 操作所做的事情:

 

简单来说就是:

不断轮询是否有 IO 事件发生,并且在轮询过程中不断检查是否有任务需要执行,从而保证 Netty 任务队列中的任务都能够及时执行,以及在轮询过程中会巧妙地使用一个计数器来避开 JDK 的空轮询 Bug。

 

详细来说就是:

NioEventLoop 的 select()方法首先会判断有没有定时任务快到要开始的时间了、普通任务队列 taskQueue 里是否存在任务。如果有就调用 selector.selectNow()进行非阻塞式的 select 操作,如果都没有就调用 selector.select(timeoutMillis)进行阻塞式 select 操作。在阻塞式 select 操作结束后,会判断这次 select 操作是否阻塞了 timeoutMillis 这么长时间。如果没有阻塞那么长时间就表明可能触发了 JDK 的空轮询 Bug,接下来就会继续判断可能触发空轮询 Bug 的次数是否达到了 512 次,如果达到了就通过替换原来 Selector 的方式去避开空轮询 Bug。

 

7.Reactor 线程处理产生 IO 事件的 Channel


(1)处理 IO 事件的关键逻辑


Reactor 线程执行的第一步是轮询出注册在 Selector 上的 IO 事件,第二步便是处理这些 IO 事件了。

 

processSelectedKeys()的关键逻辑包含两部分:


一.针对 selectedKeys 的优化

二.processSelectedKeysOptimized()方法真正处理 IO 事件


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    Selector selector;    private SelectedSelectionKeySet selectedKeys;    ...    @Override    protected void run() {        for (;;) {            ...            //1.调用select()方法执行一次事件轮询            select(wakenUp.getAndSet(false));            if (wakenUp.get()) {                selector.wakeup();            }            ...            //2.处理产生IO事件的Channel            processSelectedKeys();            ...            //3.执行外部线程放入TaskQueue的任务            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);        }    }
private void processSelectedKeys() { if (selectedKeys != null) { //selectedKeys.flip()会返回一个数组 processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } } ...}
复制代码


(2)Netty 对 selectedKeys 的优化


Netty 对 selectedKeys 的所有优化都是在 NioEventLoop 的 openSelector()方法中体现的。

 

这个优化指的是:

Selector.select()操作每次都会把就绪状态的 IO 事件添加到 Selector 底层的两个 HashSet 成员变量中,而 Netty 会通过反射的方式将 Selector 中用于存放 SelectionKey 的 HashSet 替换成数组,使得添加 SelectionKey 的时间复杂度由 HashSet 的 O(n)降为数组的 O(1)。

 

具体来说就是:

NioEventLoop 的成员变量 selectedKeys 是一个 SelectedSelectionKeySet 对象,会在 NioEventLoop 的 openSelector()方法中创建。之后 openSelector()方法会通过反射将 selectedKeys 与 Selector 的两个成员变量绑定。SelectedSelectionKeySet 继承了 AbstractSet,但底层是使用数组来存放 SelectionKey 的。


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    Selector selector;    private SelectedSelectionKeySet selectedKeys;    ...    private Selector openSelector() {        final Selector selector = provider.openSelector();        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();        ...        //下面的selectorImplClass对应于sun.nio.ch.SelectorImpl        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");        selectedKeysField.setAccessible(true);        publicSelectedKeysField.setAccessible(true);        //这两个set才是优化中的精华,一句话总结就是:        //用数组来替换Selector中用于存放SelectionKey的HashSet的实现,做到add()方法的时间复杂度为O(1)        selectedKeysField.set(selector, selectedKeySet);        publicSelectedKeysField.set(selector, selectedKeySet);        ...        selectedKeys = selectedKeySet;        ...    }    ...}
//SelectedSelectionKeySet继承了AbstractSet,说明该类可以当作一个Set来用,但是底层使用两个数组来交替使用//在add()方法中,首先判断当前应该使用哪个数组,然后找到对应的数组执行如下3个步骤://步骤一:将SelectionKey放入数组的尾部//步骤二:更新该数组的逻辑长度+1//步骤三:如果该数组的逻辑长度等于该数组的物理长度,就将该数组扩容
//待程序运行一段时间后,等数组的长度足够长,每次轮询到NIO事件的时候,//调用这里的add()方法只需要O(1)的时间复杂度就能将SelectionKey放入到Set中,//而JDK底层使用的HashSet的put()方法的时间复杂度最小是O(1)、最大是O(n),//使用数组替换HashSet还有一个好处就是遍历的时候非常高效final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { private SelectionKey[] keysA; private int keysASize; private SelectionKey[] keysB; private int keysBSize; private boolean isA = true;
SelectedSelectionKeySet() { keysA = new SelectionKey[1024]; keysB = keysA.clone(); }
@Override public boolean add(SelectionKey o) { if (o == null) { return false; } if (isA) { int size = keysASize; keysA[size ++] = o; keysASize = size; if (size == keysA.length) { doubleCapacityA(); } } else { int size = keysBSize; keysB[size ++] = o; keysBSize = size; if (size == keysB.length) { doubleCapacityB(); } } return true; } //返回一个数组 SelectionKey[] flip() { if (isA) { isA = false; keysA[keysASize] = null; keysBSize = 0; return keysA; } else { isA = true; keysB[keysBSize] = null; keysASize = 0; return keysB; } } ...}
//可以看到,SelectorImpl的两个成员变量selectedKeys和keys都是HashSetpublic abstract class SelectorImpl extends AbstractSelector { protected Set<SelectionKey> selectedKeys = new HashSet(); protected HashSet<SelectionKey> keys = new HashSet(); private Set<SelectionKey> publicKeys; private Set<SelectionKey> publicSelectedKeys;
protected SelectorImpl(SelectorProvider var1) { super(var1); if (Util.atBugLevel("1.4")) { this.publicKeys = this.keys; this.publicSelectedKeys = this.selectedKeys; } else { this.publicKeys = Collections.unmodifiableSet(this.keys); this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys); } } ...}
复制代码


(3)处理 IO 事件的过程说明


说明一:


首先取出 IO 事件。IO 事件是以数组的形式从 selectedKeys 中取的,其对应的 Channel 则由 SelectionKey 的 attachment()方法返回。

 

此时可以体会到优化过的 selectedKeys 的好处。因为遍历时遍历的是数组,相对 JDK 原生的 HashSet,效率有所提高。

 

拿到当前的 SelectionKey 之后,便将 selectedKeys[i]设置为 null,这样做是为了方便 GC。因为假设一个 NioEventLoop 平均每次轮询出 N 个 IO 事件,高峰期轮询出 3N 个事件,那么 selectedKeys 的物理长度要大于等于 3N。如果每次处理这些 key 时不设置 selectedKeys[i]为 null,那么高峰期一过,这些保存在数组尾部的 selectedKeys[i]对应的 SelectionKey 将一直无法被回收,虽然 SelectionKey 对应的对象可能不大,但其关联的 attachment 则可能很大。这些对象如果一直存活无法回收,就可能发生内存泄露。

 

说明二:


然后获取当前 SelectionKey 对应的 attachment。这个 attachement 就是取出的 IO 事件对应的 Channel 了,于是接下来就可以处理该 Channel 了。

 

由于 Netty 在注册服务端 Channel 时,会将 AbstractNioChannel 内部的 SelectableChannel 对象注册到 Selector 对象上,并且将 AbstractNioChannel 作为 SelectableChannel 对象的一个 attachment 附属。所以当 JDK 轮询出某个 SelectableChannel 有 IO 事件时,就可以通过 attachment()方法直接取出 AbstractNioChannel 进行操作了。

 

说明三:


接着便会调用 processSelectedKey()方法对 SelectionKey 和 AbstractNioChannel 进行处理。Netty 有两大类 Channel:一个是 NioServerSocketChannel,由 bossGroup 处理。另一个是 NioSocketChannel,由 workerGroup 处理。对于 boss 的 NioEventLoop 来说,轮询到的是连接事件。对于 worker 的 NioEventLoop 来说,轮询到的是读写事件。

 

说明四:


最后会判断是否再进行一次轮询。NioEventLoop 的 run()方法每次在轮询到 IO 事件后,都会将 needsToSelectAgain 设置为 false。只有当 Channel 从 Selector 上移除时,也就是调用 NioEventLoop 的 cancel()方法时,发现被取消的 key 已经达到 256 次了,才会将 needsToSelectAgain 设置为 true。当 needsToSelectAgain 为 true,就会调用 selectAgain()方法再进行一次轮询。


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    Selector selector;    private SelectedSelectionKeySet selectedKeys;    private boolean needsToSelectAgain;    private int cancelledKeys;    private static final int CLEANUP_INTERVAL = 256;    ...    @Override    protected void run() {        for (;;) {            ...            //1.调用select()方法执行一次事件轮询            select(wakenUp.getAndSet(false));            if (wakenUp.get()) {                selector.wakeup();            }            ...            //2.处理产生IO事件的Channel            needsToSelectAgain = false;            processSelectedKeys();            ...            //3.执行外部线程放入TaskQueue的任务            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);        }    }
private void processSelectedKeys() { if (selectedKeys != null) { //selectedKeys.flip()会返回一个数组 processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { //1.首先取出IO事件 final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null;//Help GC //2.然后获取对应的Channel和处理该Channel //默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { //网络事件的处理 processSelectedKey(k, (AbstractNioChannel) a); } else { //NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } //3.最后判断是否应该再进行一次轮询 if (needsToSelectAgain) { for (;;) { i++; if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; } selectAgain(); //selectedKeys.flip()会返回一个数组 selectedKeys = this.selectedKeys.flip(); i = -1; } } }
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { //If the channel implementation throws an exception because there is no event loop, //we ignore this because we are only trying to determine if ch is registered to this event loop and thus has authority to close ch. return; } //Only close ch if ch is still registerd to this EventLoop. //ch could have deregistered from the event loop and thus the SelectionKey could be cancelled as part of the deregistration process, //but the channel is still healthy and should not be closed. if (eventLoop != this || eventLoop == null) { return; } //close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; }
try { int readyOps = k.readyOps(); //We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise //the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { //remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); }
//Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { //Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }
//Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { //Connection already closed - no need to handle write. return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
void cancel(SelectionKey key) { key.cancel(); cancelledKeys ++; if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0; needsToSelectAgain = true; } } ...}
复制代码


(4)处理 IO 事件的总结


Netty 默认情况下会通过反射,将 Selector 底层用于存放 SelectionKey 的两个 HashSet,转化成一个数组来提升处理 IO 事件的效率。

 

在处理每一个 SelectionKey 时都会拿到对应的一个 attachment,而这个 attachment 就是在服务端 Channel 注册 Selector 时所绑定的一个 AbstractNioChannel。所以在处理每一个 SelectionKey 时,都可以找到对应的 AbstractNioChannel,然后通过 Pipeline 将处理串行到 ChannelHandler,回调到用户的方法。

 

8.Reactor 线程处理任务队列之添加任务


(1)Reactor 线程执行一次事件轮询的过程


Reactor 线程通过 NioEventLoop 的 run()方法每进行一次事件轮询,首先会调用 select()方法尝试检测出 IO 事件,然后会调用 processSelectedKeys()方法处理检测出的 IO 事件。其中 IO 事件主要包括新连接接入事件和连接的数据读写事件,最后会调用 runAllTasks()方法处理任务队列中的异步任务。

 

(2)任务的分类和添加说明


runAllTasks()方法中的 Task 包括普通任务和定时任务,分别存放于 NioEventLoop 不同的队列里。一个是普通的任务队列 MpscQueue,另一个是定时的任务队列 PriorityQueue。

 

普通的任务队列 MpscQueue 在创建 NioEventLoop 时创建的,然后在外部线程调用 NioEventLoop 的 execute()方法时,会调用 addTask()方法将 Task 保存到普通的任务队列里。

 

定时的任务队列 PriorityQueue 则是在添加定时任务时创建的,然后在外部线程调用 NioEventLoop 的 schedule()方法时,会调用 scheduleTaskQueue().add()方法将 Task 保存到定时的任务队列里。

 

(3)普通任务的添加

 

当通过 ctx.channel().eventLoop().execute(...)自定义普通任务,或者通过非 Reactor 线程(外部线程)调用 Channel 的各类方法时,最后都会执行到 SingleThreadEventExecutor 的 execute()方法。

 

场景一:用户自定义普通任务


不管是外部线程还是 Reactor 线程执行 NioEventLoop 的 execute()方法,都会调用 NioEventLoop 的 addTask()方法,然后调用 offerTask()方法。而 offerTask()方法会使用一个 taskQueue 将 Task 保存起来。这个 taskQueue 其实就是一个 MPSC 队列,每一个 NioEventLoop 都会有一个 MPSC 队列。

 

Netty 使用 MPSC 队列可以方便地将外部线程的异步任务进行聚集,然后在 Reactor 线程内部用单线程来批量执行以提升性能。可以借鉴 Netty 的这种任务执行模式来处理类似多线程数据聚合,定时上报应用。


//场景一:用户自定义普通任务ctx.channel().eventLoop().execute(new Runnable() {    @Override    public void run() {        ...        }});
public final class NioEventLoop extends SingleThreadEventLoop { ...}
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { ...}
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { //每一个NioEventLoop会有一个MPSC队列 private final Queue<Runnable> taskQueue; protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); //创建普通的任务队列MpscQueue taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); } ... @Override public void execute(Runnable task) { if (task == null) throw new NullPointerException("task"); boolean inEventLoop = inEventLoop(); //不管是外部线程还是Reactor线程执行NioEventLoop的execute()方法,都会调用NioEventLoop的addTask()方法 if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) wakeup(inEventLoop); } //Add a task to the task queue, or throws a RejectedExecutionException if this instance was shutdown before. protected void addTask(Runnable task) { if (task == null) throw new NullPointerException("task"); if (!offerTask(task)) reject(task); }
final boolean offerTask(Runnable task) { if (isShutdown()) reject(); return taskQueue.offer(task); } ...}
复制代码


场景二:外部线程调用 Channel 的方法


这个场景是在业务线程里,根据用户标识找到对应的 Channel,然后调用 Channel 的 write()方法向该用户推送消息。

 

外部线程在调用 Channel 的 write()方法时,executor.inEventLoop()会返回 false。于是会将 write 操作封装成一个 WriteTask,然后调用 safeExecute()方法来执行。默认情况下会获取 Channel 对应的 NIO 线程,然后作为参数传入 safeExecute()方法中进行执行,从而确保任务会由 Channel 对应的 NIO 线程执行,通过单线程执行来实现线程安全。


//场景二:当前线程为业务线程channel.write(...)
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { final EventExecutor executor;//一般初始化时默认为null AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) { this.pipeline = pipeline; this.executor = executor; ... } ... private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); //外部线程在调用Channel的write()方法时,executor.inEventLoop()会返回false if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { //将write操作封装成一个WriteTask AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } //调用safeExecute()来执行 safeExecute(executor, task, promise, m); } } @Override public EventExecutor executor() { if (executor == null) { return channel().eventLoop(); } else { return executor; } } private AbstractChannelHandlerContext findContextOutbound(int mask) { AbstractChannelHandlerContext ctx = this; EventExecutor currentExecutor = executor(); do { ctx = ctx.prev; } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND)); return ctx; } private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { try { //调用SingleThreadEventExecutor.execute()方法 executor.execute(runnable); } catch (Throwable cause) { try { promise.setFailure(cause); } finally { if (msg != null) { ReferenceCountUtil.release(msg); } } } } ...}
public class DefaultChannelPipeline implements ChannelPipeline { ... final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { //传入的executor为null super(pipeline, null, HEAD_NAME, HeadContext.class); unsafe = pipeline.channel().unsafe(); setAddComplete(); } ... } final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { //传入的executor为null super(pipeline, null, TAIL_NAME, TailContext.class); setAddComplete(); } ... } ...}
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { //每一个NioEventLoop会有一个MPSC队列 private final Queue<Runnable> taskQueue; protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); //创建普通的任务队列MpscQueue taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); } ... @Override public void execute(Runnable task) { if (task == null) throw new NullPointerException("task"); boolean inEventLoop = inEventLoop(); //不管是外部线程还是Reactor线程执行NioEventLoop的execute()方法,都会调用NioEventLoop的addTask()方法 if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) wakeup(inEventLoop); } //Add a task to the task queue, or throws a RejectedExecutionException if this instance was shutdown before. protected void addTask(Runnable task) { if (task == null) throw new NullPointerException("task"); if (!offerTask(task)) reject(task); }
final boolean offerTask(Runnable task) { if (isShutdown()) reject(); return taskQueue.offer(task); } ...}
复制代码


(4)定时任务的添加


通常使用 ctx.channel().eventLoop().schedule(..)自定义定时任务,其中 schedule()方法会通过 scheduledTaskQueue().add(task)来添加定时任务。首先 scheduledTaskQueue()方法会返回一个优先级队列,然后通过该优先级队列的 add()方法将定时任务对象加入到队列中。

 

注意,这里可以直接使用优先级队列而不用考虑多线程并发问题的原因如下。如果是外部线程调用 schedule()方法添加定时任务,那么 Netty 会将添加定时任务这个逻辑封装成一个普通的 Task。这个 Task 的任务是一个"添加某定时任务"的任务,而不是添加某定时任务。这样,对优先级队列的访问就变成单线程了,也就是只有 Reactor 线程会访问,从而不存在多线程并发问题。


//场景三:用户自定义定时任务,这也是用得最多的方法ctx.channel().eventLoop().schedule(new Runnable() {    @Override    public void run() {        ...        }}, 60, TimeUnit.SECONDS);
//Abstract base class for EventExecutors that want to support scheduling.public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { Queue<ScheduledFutureTask<?>> scheduledTaskQueue; ... //添加定时任务 <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { //如果当前线程是Reactor线程,则直接往PriorityQueue中添加任务 scheduledTaskQueue().add(task); } else { //如果是外部线程,则调用SingleThreadEventExecutor.execute()方法 //将添加定时任务这一动作也封装成一个普通任务 execute(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; } Queue<ScheduledFutureTask<?>> scheduledTaskQueue() { if (scheduledTaskQueue == null) { //创建定时的任务队列PriorityQueue scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>(); } return scheduledTaskQueue; } ...}
复制代码


(5)Netty 的定时任务机制补充


一.如何保证截止时间最近的任务优先执行


为什么定时任务要保存在优先级队列中?优先级队列的特性是会按照一定的顺序来排列内部元素,内部元素是可以比较的。由于优先级队列中的每个元素都是定时任务,所以定时任务也是可以比较的。比较的逻辑就是:先比较定时任务的截止时间,在截止时间相同的情况下再比较定时任务的添加顺序也就是 ID。

 

二.Netty 的定时任务有三种执行方式


方式一:定时任务不会被重复执行

ctx.channel().eventLoop().schedule(),传递的 periodNanos 为 0。


方式二:每隔一段时间执行一次

ctx.channel().eventLoop().scheduleAtFixedRate(),传递的 periodNanos 为正数。


方式三:隔相同时间再执行一次

ctx.channel().eventLoop().scheduleWithFixedDelay(),传递的 periodNanos 为负数。

 

Netty 的 3 种定时任务的执行逻辑是通过调整下一次任务的截止时间来运行的。首先修改完下一次执行的截止时间,然后把当前任务再次加入队列,这样就能确保任务在适当的时候执行。


final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {    //每个定时任务都有一个唯一的ID    private static final AtomicLong nextTaskId = new AtomicLong();    private final long id = nextTaskId.getAndIncrement();    private long deadlineNanos;    //标识一个任务是否重复执行,以及以何种方式执行    private final long periodNanos;    ...        @Override    public int compareTo(Delayed o) {        if (this == o) {            return 0;        }
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o; long d = deadlineNanos() - that.deadlineNanos(); if (d < 0) { return -1; } else if (d > 0) { return 1; } else if (id < that.id) { return -1; } else if (id == that.id) { throw new Error(); } else { return 1; } }
public long deadlineNanos() { return deadlineNanos; } @Override public void run() { ... if (periodNanos == 0) { //1.对应schedule()方法,表示一次性任务 V result = task.call(); setSuccessInternal(result); } else { task.call(); long p = periodNanos; if (p > 0) { //2.对应scheduleAtFixedRate()方法,表示以固定速率执行任务 deadlineNanos += p; } else { //3.对应scheduleWithFixedDelay()方法,表示以固定的延时执行任务 deadlineNanos = nanoTime() - p; } scheduledTaskQueue.add(this); } ... } ...}
复制代码


9.Reactor 线程处理任务队列之执行任务


(1)runAllTasks()方法需要传入超时时间


SingleThreadEventExecutor 的 runAllTasks()方法需要传入参数 timeoutNanos,表示尽量在 timeoutNanos 时间内将所有的任务都取出来执行一遍。因为如果 Reactor 线程在执行任务时停留的时间过长,那么将会累积许多 IO 事件无法及时处理,从而导致大量客户端请求阻塞。因此 Netty 会精细控制内部任务队列的执行时间。

 

(2)Reactor 线程执行任务的步骤


一.任务聚合

转移定时任务到 MPSC 队列,这里只是将快到期的定时任务转移到 MPSC 队列里。

 

二.时间计算

计算本轮任务执行的截止时间,此时所有截止时间已到达的定时任务均被填充到普通的任务队列(MPSC 队列)里了。

 

三.任务执行

首先不抛异常地同步执行任务,然后累加当前已执行的任务数,接着每隔 64 次计算一下当前时间是否已超截止时间,最后判断本轮任务是否已经执行完毕。


//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {    //每一个NioEventLoop会有一个MPSC队列    private final Queue<Runnable> taskQueue;    ...        //Poll all tasks from the task queue and run them via Runnable#run() method.    //This method stops running the tasks in the task queue and returns if it ran longer than timeoutNanos.    protected boolean runAllTasks(long timeoutNanos) {        //1.转移定时任务到MPSC队列,也就是任务聚合        fetchFromScheduledTaskQueue();        //从普通的任务队列(MPSC队列)里获取任务        Runnable task = pollTask();        if (task == null) {            afterRunningAllTasks();            return false;        }        //2.计算本轮任务执行的截止时间        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;        long runTasks = 0;        long lastExecutionTime;        //3.执行任务,通过for循环逐个执行pollTask()取出的任务        for (;;) {            //3.1 不抛异常地执行任务(同步阻塞),确保任务可以安全执行            safeExecute(task);            //3.2 累加当前已执行的任务数            runTasks ++;            //3.3 每隔64次计算一下当前时间是否已经超过截止时间,因为ScheduledFutureTask.nanoTime()也挺耗时的            if ((runTasks & 0x3F) == 0) {                lastExecutionTime = ScheduledFutureTask.nanoTime();                if (lastExecutionTime >= deadline) {                    break;                }            }            //3.4 判断本轮任务是否已经执行完毕            task = pollTask();            if (task == null) {                lastExecutionTime = ScheduledFutureTask.nanoTime();                break;            }        }        afterRunningAllTasks();        this.lastExecutionTime = lastExecutionTime;        return true;    }        private boolean fetchFromScheduledTaskQueue() {        long nanoTime = AbstractScheduledEventExecutor.nanoTime();        Runnable scheduledTask  = pollScheduledTask(nanoTime);        while (scheduledTask != null) {            if (!taskQueue.offer(scheduledTask)) {                scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);                return false;            }            scheduledTask  = pollScheduledTask(nanoTime);        }        return true;    }        protected Runnable pollTask() {        assert inEventLoop();        return pollTaskFrom(taskQueue);    }
protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) { for (;;) { Runnable task = taskQueue.poll(); if (task == WAKEUP_TASK) { continue; } return task; } } ...}
//Abstract base class for EventExecutors that want to support scheduling.public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { Queue<ScheduledFutureTask<?>> scheduledTaskQueue; ... //Return the Runnable which is ready to be executed with the given nanoTime. //You should use #nanoTime() to retrieve the the correct nanoTime. protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } if (scheduledTask.deadlineNanos() <= nanoTime) { scheduledTaskQueue.remove(); return scheduledTask; } return null; } ...}
//Abstract base class for EventExecutor implementations.public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor { ... //Try to execute the given Runnable and just log if it throws a Throwable. protected static void safeExecute(Runnable task) { try { task.run();//同步执行任务 } catch (Throwable t) { logger.warn("A task raised an exception. Task: {}", task, t); } } ...}
复制代码


(3)Netty 性能优化之间隔策略


假设任务队列里有海量的小任务,如果每次执行完任务都需要判断是否到截止时间,那么效率是比较低的。所以 Netty 选择通过每隔 64 个任务才判断一下是否到截止时间,那么效率就会高很多。


(4)NioEventLoop.run()方法执行任务总结

Netty 里的任务分两种:一种是普通的任务,另一种是定时的任务。Netty 在执行这些任务时首先会把定时任务聚合到普通任务队列里,然后再从普通任务队列里获取任务逐个执行,并且是每执行 64 个任务之后才判断一下当前时间是否超过最大允许执行时间。如果超过就直接中断,中断之后就会进行下一次 NioEventLoop.run()方法的 for 循环。

 

10.NioEventLoop 总结


(1)NioEventLoop 的执行流程总结


一.NioEventLoop 在执行过程中首先会不断检测是否有 IO 事件发生,然后如果检测出有 IO 事件就处理 IO 事件,接着处理完 IO 事件之后再处理外部线程提交过来的异步任务。

 

二.在检测是否有 IO 事件发生时,为了保证异步任务的及时处理,只要有任务要处理,那么就立即停止检测去处理任务。

 

三.外部线程异步执行的任务分为两种:普通任务和定时任务。这两种任务分别保存到 MPSC 队列和优先级队列,而优先级队列中的任务最终都会转移到 MPSC 队列里进行处理。

 

四.Netty 每处理完 64 个任务才会检查一次是否超时而退出执行任务的循环。

 

(2)Reactor 线程模型总结


一.NioEventLoopGroup 在用户代码中被创建,默认情况下会创建两倍 CPU 核数个 NioEventLoop。

 

二.NioEventLoop 是懒启动的,bossNioEventLoop 在服务端启动时启动,workerNioEventLoop 在新连接接入时启动。

 

三.当 CPU 核数为 2 的幂时,为每一个新连接绑定 NioEventLoop 之后,都会做一个取模运算转位与运算的优化。

 

四.每个连接都对应一个 Channel,每个 Channel 都绑定唯一一个 NioEventLoop,一个 NioEventLoop 可能会被绑定多个 Channel,每个 NioEventLoop 都对应一个 FastThreadLocalThread 线程实体和一个 Selector。因此单个连接的所有操作都是在一个线程上执行的,所以是线程安全的。

 

五.每个 NioEventLoop 都对应一个 Selector,这个 Selector 可以批量处理注册到它上面的 Channel。

 

六.每个 NioEventLoop 的执行过程都包括事件检测、事件处理以及异步任务的执行。

 

七.用户线程池在对 Channel 进行一些操作时均为线程安全的。这是因为 Netty 会把外部线程的操作都封装成一个 Task 放入这个 Channel 绑定的 NioEventLoop 中的 MPSC 队列,然后在该 NioEventLoop 的执行过程(事件循环)的第三个过程中进行串行执行。

 

八.所以 NioEventLoop 的职责不仅仅是处理网络 IO 事件,用户自定义的普通任务和定时任务也会统一由 NioEventLoop 处理,从而实现线程模型的统一。

 

九.从调度层看,也不存在从 NioEventLoop 线程中再启动其他类型的线程用于异步执行另外的任务,从而避免了多线程并发操作和锁竞争,提升了 IO 线程的处理性能和调度性能。

 

(3)NioEventLoop 创建启动执行的总结


一.用户在创建 bossGroup 和 workerGroup 时,NioEventLoopGroup 被创建,默认不传参时会创建两倍 CPU 核数个 NioEventLoop。

 

二.每个 NioEventLoopGroup 都有一个线程执行器 executor 和一个线程选择器 chooser。线程选择器 chooser 用于进行线程分配,它会针对 NioEventLoop 的个数进行优化。

 

三.NioEventLoop 在创建时会创建一个 Selector 和一个 MPSC 任务队列,创建 Selector 时 Netty 会通过反射的方式用数组去替换 Selector 里的两个 HashSet 数据结构。

 

四.Netty 的 NioEventLoop 在首次调用 execute()方法时会启动线程,这个线程是一个 FastThreadLocalThread 对象。启动线程后,Netty 会将创建完成的线程保存到成员变量,以便能判断执行 NioEventLoop 里的逻辑的线程是否是这个创建好的线程。

 

五.NioEventLoop 的执行逻辑在 run()方法里,主要包括 3 部分:第一是检测 IO 事件,第二是处理 IO 事件,第三是执行异步任务。


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18786049

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
Netty源码—Reactor线程模型二_Java_不在线第一只蜗牛_InfoQ写作社区