Netty 学习之旅 ------ 图说 Netty 线程模型
1、图说 Netty 线程模型
=============
提到 Netty 的线程模型,我们不得不再重复提到主从 Reactor 线程模型,Netty 线程模型基本上基于主从 Reactor 模型的实现方式,Netty 线程模型将从如下两个图进行展开:
Reactor?主从线程模型
Netty 服务端示例代码。
关于 Reactor 主从多线程模型的讲解,请重点关注Netty 线程模型前置篇,Reactor 主从模式源码实现,从 Netty 服务端示例代码可以看出,Netty 的线程模型,必然与 EventLoopGroup 脱离不了干系。不错,上文中的 bossGroup 就是相当与 Reactor 模式中的 Main?Reactor,而 workGroup,则相当于 SubReactor。
Netty 线程模型类层次结构:
Netty 线程模型类继承图:
从如上图可以知道,Netty 的线程模型,中有 4 个基础接口,它们分别是 EventLoopGroup、EventLoop、EventExecuteGroup、EventExecute。其中 EventExecute 扩展自 java.util.concurrent.ScheduledExecutorService 接口,类似与线程池(执行)的职责,而 EventLoop 首先继承自 EventExecute,并主要扩展了 register 方法,就是将通道 Channel 注册到 Selector 的方法。
NioEventLoop,就是基于 Nio 的实现。在这个类中有一个亮点,就是规避了 JDK nio 的一个 bug,Selector select 方法的空轮询,核心思想是,如果连续多少次(默认为 512)在没有超时的情况就返回,并且已经准备就绪的键的数量为 0,则认为发生了空轮询,如果发生了空轮询,就新建一个新的 Selector,并重新将通道,关心的事件注册到新的 Selector,并关闭旧的 Selector,其源码实现如下:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupt
ed so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding selector.",
selectCnt);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
}
// Harmless exception - log anyway
}
}
/**
Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
around the infamous epoll 100% CPU bug.
*/
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector();
}
});
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
newSelector = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
评论