写点什么

Netty 核心源码解读 —— EventLoop 篇

用户头像
然行
关注
发布于: 2021 年 04 月 04 日

本文我们将一起探究一下 EventLoop 的实现原理,让大家对 Netty 的线程模型有更加深入的了解。在上一篇里(ServerBootstrap 篇),Netty Server 在初始化时,会将 bossGroup 和 workerGroup 赋值给 ServerBootstrap 的 group,那么这个 EventLoopGroup 是什么呢?

EventLoopGroup

## TcpServer.java
private final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZ_GROUP_SIZE);private final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZ_THREAD_SIZE);
public void init() throws Exception { // Server 服务启动 ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
复制代码

在开篇里,我们了解到 Netty 是 Reactor 模式的一个实现,那 Reactor 模式与 EventLoopGroup 又有什么关系呢?其实,Netty 里 Reactor 模式的实现就是 EventLoopGroup(如果使用 NIO,那就是 NioEventLoopGroup)。

EventLoopGroup 继承于 EventExecutorGroup,NioEventLoopGroup 继承于 MultithreadEventExecutorGroup,MultithreadEventExecutorGroup 是 EventExecutorGroup 的一个实现。在实例化 NioEventLoopGroup 时,MultithreadEventExecutorGroup 中会构建了一个类型为 EventExecutor 的数组 children,并通过调用 newChild 进行初始化。

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); }
if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true;
复制代码

抽象方法 newChild 在 EventExecutorGroup 中实现的,它返回一个 NioEventLoop 实例。

public class NioEventLoopGroup extends MultithreadEventLoopGroup {
protected EventLoop newChild(Executor executor, Object... args) throws Exception { EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); }
复制代码

EventLoop

NioEventLoop 是 EventLoop 的一个实现,NioEventLoop 继承于 SingleThreadEventLoop,而 SingleThreadEventLoop 又继承于 SingleThreadEventExecutor。SingleThreadEventExecutor 实现了在 Netty 中对本地线程的抽象,在 SingleThreadEventExecutor 类中封装了一个 Thread thread 属性,也正因此,我们常说一个 NioEventLoop 其实是与一个线程绑定。由于 NioEventLoop 继承于 SingleThreadEventExecutor,因此 NioEventLoop 的启动其实就是 NioEventLoop 所绑定的本地线程 thread 的启动。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private volatile Thread thread;
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); }
boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run();
复制代码

doStartThread 是在 SingleThreadEventExecutor.execute 方法中调用的,而这方法是在 EventLoop 与 Channel 的关联时,即在 AbstractChannel#AbstractUnsafe.register 中调用的 eventLoop.execute 方法,具体我们会在后篇 Channel 篇详细讲。

protected abstract class AbstractUnsafe implements Unsafe {    @Override    public final void register(EventLoop eventLoop, final ChannelPromise promise) {        ObjectUtil.checkNotNull(eventLoop, "eventLoop");
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); }
复制代码

至此,SingleThreadEventExecutor 这个线程是从 Bootstrap.bind 到 AbstractChannel#AbstractUnsafe.register 被启动的。

Bootstrap.initAndRegister ->     AbstractBootstrap.initAndRegister ->         MultithreadEventLoopGroup.register ->             SingleThreadEventLoop.register ->                 AbstractUnsafe.register ->                    AbstractUnsafe.register0 ->                        AbstractNioChannel.doRegister
复制代码

而这个线程做的事情主要就是调用 SingleThreadEventExecutor.this.run() 方法,这个方法在 NioEventLoop 实现了。在 NioEventLoop 中,NioEventLoop.run() 执行了与 Channel 相关的 IO 操作,包括调用 select 等待就绪的 IO 事件、读写数据与数据的处理等。

public final class NioEventLoop extends SingleThreadEventLoop {
/** * The NIO {@link Selector}. */ private Selector selector; private Selector unwrappedSelector; private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider; private final SelectStrategy selectStrategy;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; }
@Override protected void run() { int selectCnt = 0; for (;;) { try { int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { nextWakeupNanos.lazySet(AWAKE); } // fall through default: } catch (IOException e) { } final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { try { if (strategy > 0) { processSelectedKeys(); } } finally { // Ensure we always run tasks. ranTasks = runAllTasks(); } } else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { ranTasks = runAllTasks(0); // This will run the minimum number of tasks } }
复制代码

在 NioEventLoop.run() 方法中,首先调用 hasTasks() 方法来判断当前任务队列中是否有任务,如果没有任务,执行的是 select(curDeadlineNanos)

public final class NioEventLoop extends SingleThreadEventLoop {
private int select(long deadlineNanos) throws IOException { if (deadlineNanos == NONE) { return selector.select(); } // Timeout will only be 0 if deadline is within 5 microsecs long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis); }
复制代码

这个 selector 正是 Java NIO 中的多路复用器 Selector,其中 selector.selectNow() 方法会检查当前是否有就绪的 IO 事件,如果有则返回就绪 IO 事件的个数,如果没有,则返回 0。selectNow() 是立即返回的,不会阻塞当前线程,而 select() 是会阻塞当前线程的。

在 NioEventLoop.run() 方法中,如果判断当前任务队列有任务时,就会调用 processSelectedKeys() 和 runAllTasks() 方法。

public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } } }
复制代码

在 processSelectedKeys 方法里,核心逻辑就是通过调用 processSelectedKey 来处理就绪的 IO 事件

public final class NioEventLoop extends SingleThreadEventLoop {
pivate void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);
unsafe.finishConnect(); }
if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); }
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
复制代码

在 processSelectedKey 中处理了三个事件:OP_READ 可读事件,OP_WRITE 可写事件,OP_CONNECT 连接建立事件。

总结

本篇与上篇间隔的时间有些久,本文介绍了 EventLoop,它可是 Netty 非常重要的一个核心点,因为在 Netty 里 Reactor 模式的实现就是 EventLoopGroup,本文没有讲到 EventLoop 的任务队列的内容,后续的篇中会补充上。在后续的文章里,我会继续与大家讨论 Netty 的 ChannelPipeline,还请大家多多关注我的个人博客或公账号。

发布于: 2021 年 04 月 04 日阅读数: 16
用户头像

然行

关注

还未添加个人签名 2018.04.26 加入

还未添加个人简介

评论

发布
暂无评论
Netty 核心源码解读 —— EventLoop 篇