本文我们将一起探究一下 EventLoop 的实现原理,让大家对 Netty 的线程模型有更加深入的了解。在上一篇里(ServerBootstrap 篇),Netty Server 在初始化时,会将 bossGroup 和 workerGroup 赋值给 ServerBootstrap 的 group,那么这个 EventLoopGroup 是什么呢?
EventLoopGroup
## TcpServer.java
private final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZ_GROUP_SIZE);
private final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZ_THREAD_SIZE);
public void init() throws Exception {
// Server 服务启动
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
复制代码
在开篇里,我们了解到 Netty 是 Reactor 模式的一个实现,那 Reactor 模式与 EventLoopGroup 又有什么关系呢?其实,Netty 里 Reactor 模式的实现就是 EventLoopGroup(如果使用 NIO,那就是 NioEventLoopGroup)。
EventLoopGroup 继承于 EventExecutorGroup,NioEventLoopGroup 继承于 MultithreadEventExecutorGroup,MultithreadEventExecutorGroup 是 EventExecutorGroup 的一个实现。在实例化 NioEventLoopGroup 时,MultithreadEventExecutorGroup 中会构建了一个类型为 EventExecutor 的数组 children,并通过调用 newChild 进行初始化。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
复制代码
抽象方法 newChild 在 EventExecutorGroup 中实现的,它返回一个 NioEventLoop 实例。
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
复制代码
EventLoop
NioEventLoop 是 EventLoop 的一个实现,NioEventLoop 继承于 SingleThreadEventLoop,而 SingleThreadEventLoop 又继承于 SingleThreadEventExecutor。SingleThreadEventExecutor 实现了在 Netty 中对本地线程的抽象,在 SingleThreadEventExecutor 类中封装了一个 Thread thread 属性,也正因此,我们常说一个 NioEventLoop 其实是与一个线程绑定。由于 NioEventLoop 继承于 SingleThreadEventExecutor,因此 NioEventLoop 的启动其实就是 NioEventLoop 所绑定的本地线程 thread 的启动。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private volatile Thread thread;
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
复制代码
doStartThread 是在 SingleThreadEventExecutor.execute
方法中调用的,而这方法是在 EventLoop 与 Channel 的关联时,即在 AbstractChannel#AbstractUnsafe.register
中调用的 eventLoop.execute 方法,具体我们会在后篇 Channel 篇详细讲。
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
复制代码
至此,SingleThreadEventExecutor 这个线程是从 Bootstrap.bind 到 AbstractChannel#AbstractUnsafe.register 被启动的。
Bootstrap.initAndRegister ->
AbstractBootstrap.initAndRegister ->
MultithreadEventLoopGroup.register ->
SingleThreadEventLoop.register ->
AbstractUnsafe.register ->
AbstractUnsafe.register0 ->
AbstractNioChannel.doRegister
复制代码
而这个线程做的事情主要就是调用 SingleThreadEventExecutor.this.run() 方法,这个方法在 NioEventLoop 实现了。在 NioEventLoop 中,NioEventLoop.run() 执行了与 Channel 相关的 IO 操作,包括调用 select 等待就绪的 IO 事件、读写数据与数据的处理等。
public final class NioEventLoop extends SingleThreadEventLoop {
/**
* The NIO {@link Selector}.
*/
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
private final SelectStrategy selectStrategy;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
} catch (IOException e) {
}
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
}
复制代码
在 NioEventLoop.run()
方法中,首先调用 hasTasks() 方法来判断当前任务队列中是否有任务,如果没有任务,执行的是 select(curDeadlineNanos)
。
public final class NioEventLoop extends SingleThreadEventLoop {
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
return selector.select();
}
// Timeout will only be 0 if deadline is within 5 microsecs
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
复制代码
这个 selector 正是 Java NIO 中的多路复用器 Selector,其中 selector.selectNow()
方法会检查当前是否有就绪的 IO 事件,如果有则返回就绪 IO 事件的个数,如果没有,则返回 0。selectNow()
是立即返回的,不会阻塞当前线程,而 select() 是会阻塞当前线程的。
在 NioEventLoop.run()
方法中,如果判断当前任务队列有任务时,就会调用 processSelectedKeys()
和 runAllTasks()
方法。
public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
}
}
复制代码
在 processSelectedKeys 方法里,核心逻辑就是通过调用 processSelectedKey 来处理就绪的 IO 事件。
public final class NioEventLoop extends SingleThreadEventLoop {
pivate void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
复制代码
在 processSelectedKey 中处理了三个事件:OP_READ 可读事件,OP_WRITE 可写事件,OP_CONNECT 连接建立事件。
总结
本篇与上篇间隔的时间有些久,本文介绍了 EventLoop,它可是 Netty 非常重要的一个核心点,因为在 Netty 里 Reactor 模式的实现就是 EventLoopGroup,本文没有讲到 EventLoop 的任务队列的内容,后续的篇中会补充上。在后续的文章里,我会继续与大家讨论 Netty 的 ChannelPipeline,还请大家多多关注我的个人博客或公账号。
评论