Netty 源码解析 -- 事件循环机制实现原理

用户头像
binecy
关注
发布于: 2020 年 10 月 18 日
Netty源码解析 -- 事件循环机制实现原理

本文主要分享Netty中事件循环机制的实现。

源码分析基于Netty 4.1



EventLoop



前面分享服务端和客户端启动过程的文章中说过,Netty通过事件循环机制(EventLoop)处理IO事件和异步任务,简单来说,就是通过一个死循环,不断处理当前已发生的IO事件和待处理的异步任务。示例如下

while(true) {
process(selector.select());

process(getTask());
}

这种事件循环机制也是一种常用的IO事件处理机制,包括Redis,Mysql都使用了类似的机制。



关于异步任务,前面文章说过,EventLoop实现了(jvm)Executor的接口,execute方法可以提供异步任务。

register,bind,connect等操作,都会提交一个任务给EventLoop处理。如

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new Runnable() {
public void run() {
register0(promise);
}
});
}



下面看一下Netty中事件循环机制相关的类。



EventExecutor,事件执行器,负责处理事件。

EventExecutorGroup维护了一个EventExecutor链表,它继承了ScheduledExecutorService,execute方法通过next方法选择一个EventExecutor,并调用EventLoop#execute处理事件。

(EventExecutor继承了EventExecutorGroup,可以看做一个特殊的EventExecutorGroup,其execute方法可以提交一个任务任务)



EventLoop,事件循环器,继承了EventExecutor,通过循环不断处理注册于其上的Channel的IO事件。

EventLoopGroup接口则继承了EventExecutorGroup,负责调度EventLoop。



SingleThreadEventExecutor实现了EventExecutor,它会创建一个新线程,并在该线程上处理事件,可以理解为单线程处理器。

MultithreadEventExecutorGroup实现EventExecutorGroup,可以理解为多线程处理器(实际上是维护了多个EventExecutor,一个EventExecutor可以理解为一个线程),newChild方法构造具体的EventExecutor。

MultithreadEventExecutorGroup可以配置EventExecutor数量,即线程数量。

EventExecutorChooserFactory.EventExecutorChooser负责选择一个EventExecutor执行实际操作。



NioEventLoop继承了SingleThreadEventExecutor,负责处理NIO事件。所以,一个NioEventLoop对象可以看做是一个线程。

NioEventLoop也实现了EventLoop接口,它实现了事件循环机制,是Netty核心类。



MultithreadEventLoopGroup继承了MultithreadEventExecutorGroup,并实现了EventLoopGroup,其newChild方法构造具体的EventLoop。

NioEventLoopGroup#newChild会构建NioEventLoop。



EventLoop各实现类关系如下



启动

SingleThreadEventExecutor关键字段

private final Queue<Runnable> taskQueue; // 待处理异步任务
private volatile Thread thread; // EventLoop执行线程,即SingleThreadEventExecutor创建的新线程
private final Executor executor; // java.util.concurrent.Executor,负责创建线程



当我们通过execute方法提交任务时,如果还没有创建执行新线程,会通过SingleThreadEventExecutor#executor一个新线程,并在新线程中调用run方法(run方法由子类实现,负责实现事件循环机制,新线程是EventLoop真正执行线程)。



SingleThreadEventExecutor#execute

public void execute(Runnable task) {
...

boolean inEventLoop = inEventLoop();
// #1
addTask(task);
// #2
if (!inEventLoop) {
startThread();
// #3
if (isShutdown()) {
...
}
}
// #4
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

#1 添加任务到待处理列表

#2

inEventLoop方法,判断当前线程是否为EventLoop执行线程

若当前线程非EventLoop执行线程,调用startThread方法启动一个新的线程,执行run方法。

这里可以理解为启动EventLoop。

#3 如果当前EventLoop已关闭,拒绝任务

#4 若当前EventLoop线程阻塞正等待IO事件(Selector#select方法),调用wakeup方法唤醒线程执行该新增任务



循环机制

NioEventLoop#run方法负责实现NIO事件处理机制。

protected void run() {
int selectCnt = 0;
// #1
for (;;) {

int strategy;
// #2
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
// #3
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
// #4
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// #5
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
...
// #6
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
// #7
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
// #8
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
// #9
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
}
}

为了版面整洁,这里删除了异常处理代码。

#1 可以看到,这里通过一个死循环不断处理IO事件和异步任务。

#2 如果当前存在待处理的任务,调用selector.selectNow(),这时会跳出switch语句,往下处理事件和任务,否则返回SelectStrategy.SELECT。

#3 curDeadlineNanos,计算延迟任务队列中第一个任务的到期执行时间(即最晚还能延迟多长时间执行),没有任务则返回-1。

更新nextWakeupNanos为阻塞时间。

由于频繁调用(jvm)Selector.wakeup会造成性能消耗,NioEventLoop维护了一个唤醒标识nextWakeupNanos。nextWakeupNanos有三种值

NONE -- 执行线程被阻塞;

AWAKE -- 执行线程未阻塞;

其他值 -- 执行线程被超时阻塞,在指定的时间后唤醒;

NioEventLoop#wakeup方法中,只有nextWakeupNanos.getAndSet(AWAKE) != AWAKE成功才调用selector.wakeup()方法。

#4

这时如果还没有任务加入,则执行select,阻塞线程。select方法返回结果作为新的strategy。

#5

lazySet方法,设置值之后其他线程在短期内还是可能读到旧值

这里将nextWakeupNanos设置为AWAKE,主要是减少wakeup方法中不必要的wakeup操作。

所以使用lazySet方法也没有问题。

#6 selectCnt增加

旧版本的Java NIO在Linux Epoll实现上存在bug,(jvm)Selector.select方法可能在没有任何就绪事件的情况下返回,导致CPU空转,利用率飙升到100%。

于是,Netty计算select方法重复调用次数selectCnt,并在selectCnt大于SELECTORAUTOREBUILD_THRESHOLD配置(默认512)时,重建selector,从而规避该问题。

幸好在JDK66u4,JDK7b12已修复该Bug。

#7 processSelectedKeys方法处理IO事件,runAllTasks方法处理任务。

ioRatio表示执行IO事件所占CPU时间百分比,默认50,

ioTime * (100 - ioRatio) / ioRatio,通过ioTime,ioRatio计算处理任务的CPU时间。

#8 如果执行了任务或者select方法返回有效值,直接重置selectCnt。

unexpectedSelectorWakeup方法中会在selectCnt大于SELECTORAUTOREBUILD_THRESHOLD时重建selector。

#9 如果是正在关闭状态,则要关闭所有的Channel



IO事件

下面看一下Eventloop中如何处理IO事件。

NioEventLoop关键字段

Selector unwrappedSelector; // JVM中的Selector
Selector selector; // 优化后的SelectedSelectionKeySetSelector
SelectedSelectionKeySet selectedKeys; // 对(jvm)Selector#selectedKeys进行优化

SelectedSelectionKeySetSelector每次调用select前都清除SelectedSelectionKeySet

SelectedSelectionKeySet使用数组代替原Selector的中的HashSet,提高性能。数组默认大小为1024,不够用时扩展为原大小的2倍。



NioEventLoop#构造方法 -> NioEventLoop#openSelector

private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// #1
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}

if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}

...

final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
try {
// #2
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

...

selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} ...
}
});

...
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
// #3
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

#1 通过(jvm)SelectorProvider打开一个Selector

#2 构造了selectedKeySet,并通过反射将该对象设置到Selector的selectedKeys,publicSelectedKeys属性中,这样Selector监听到的事件就会存储到selectedKeySet。

#3 构造了SelectedSelectionKeySetSelector对象



NioEventLoop#select负责阻塞线程,等待IO事件

private int select(long deadlineNanos) throws IOException {
// #1
if (deadlineNanos == NONE) {
return selector.select();
}

// #2
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

#1 一直阻塞,知道发生IO事件或加入了新任务

#2 计算阻塞时间,在原阻塞时间加上995微秒后转化为毫秒。

如果原阻塞时间在5微秒内,就不阻塞了。



IO事件的处理流程为

NioEventLoop#processSelectedKeys -> (没有禁用SelectedSelectionKeySet)processSelectedKeysOptimized -> processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...

try {
int readyOps = k.readyOps();
// #1
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

// #2
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}

// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// #3
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

#1 处理OP_CONNECT

移除关注事件OP_CONNECT,否则Selector.select(..)将不断返回

前面分享客户端启动过程的文章说过了,这里会调用AbstractNioUnsafe#finishConnect,完成客户端Connect操作,可回顾《客户端启动过程解析》。

#2 先处理OP_WRITE事件,能够尽早写入数据释放内存,这里涉及flush操作,后面有文章解析。

#3 处理OPREAD或OPACCEPT事件。

对于ServerChannel,这里会调用NioMessageUnsafe#read,处理OP_ACCEPT事件,可回顾《客户端启动过程解析》。

对于SocketChannel,这里会调用NioByteUnsafe#read,进行读写操作,后面有文章解析。



异步任务

下面看一下Eventloop中如何处理异步任务。

run方法#4步骤 -> SingleThreadEventExecutor#runAllTasks

protected boolean runAllTasks(long timeoutNanos) {
// #1
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}

final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// #2
safeExecute(task);

runTasks ++;

// #3
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
// #4
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
// #5
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}

#1 AbstractScheduledEventExecutor#scheduledTaskQueue中存放的是定时任务,

SingleThreadEventExecutor#taskQueue中存放的是待处理的任务。

fetchFromScheduledTaskQueue方法会获取已到期的定时任务,移动到SingleThreadEventExecutor#taskQueue。

#2 执行获取的任务

#3 每个64个任务检查一次是否超时,因为nanoTime()方法也是一个相对昂贵的操作。

#4 取下一个任务,继续处理

#5 预留的扩展方法。



NioEventLoop在Netty 4.1版本被优化,代码做了较大改动,删除了原来的wakeup标志,改用nextWakeupNanos,代码更清晰。

请参考 -- Clean up NioEventLoop



Netty是由事件驱动的,服务端register,bind,客户端connect等操作都是提交异步任务给EventLoop处理的

,而Accept,Read/Writ,Connect等IO事件都都需要EventLoop的处理。

大家可以结合前面分析服务端和客户端启动过程的文章,理解EventLoop是如何驱动Netty工作的。



如果您觉得本文不错,欢迎关注我的微信公众号,您的关注是我坚持的动力!



发布于: 2020 年 10 月 18 日 阅读数: 1146
用户头像

binecy

关注

还未添加个人签名 2020.08.26 加入

还未添加个人简介

评论

发布
暂无评论
Netty源码解析 -- 事件循环机制实现原理