1.关于 NioEventLoop 的问题整理
一.默认下 Netty 服务端起多少线程及何时启动?
答:默认是 2 倍 CPU 核数个线程。在调用 EventExcutor 的 execute(task)方法时,会判断当前线程是否为 Netty 的 Reactor 线程,也就是判断当前线程是否为 NioEventLoop 对应的线程实体。如果是,则说明 Netty 的 Reactor 线程已经启动了。如果不是,则说明是外部线程调用 EventExcutor 的 execute()方法。于是会先调用 startThread()方法判断当前线程是否已被启动,如果还没有被启动就启动当前线程作为 Netty 的 Reactor 线程。
二.Netty 是如何解决 JDK 空轮询的?
答:Netty 会判断如果当前阻塞的一个 Select()操作并没有花那么长时间,那么就说明此时有可能触发了空轮询 Bug。默认情况下如果这个现象达到 512 次,那么就重建一个 Selector,并且把之前 Selector 上所有的 key 重新移交到新 Selector 上。通过以上这种处理方式来避免 JDK 空轮询 Bug。
三.Netty 是如何保证异步串行无锁化的?
答:异步串行无锁化有两个场景。
场景一:拿到客户端一个 Channel,不需要对该 Channel 进行同步,直接就可以多线程并发读写。
场景二:ChannelHandler 里的所有操作都是线程安全的,不需要进行同步。
Netty 在所有外部线程去调用 EventLoop 或者 Channel 的方法时,会通过 inEventLoop()方法来判断出当前线程是外部线程(非 NioEventLoop 的线程实体)。在这种情况下,会把所有操作都封装成一个 Task 放入 MPSC 队列,然后在 NioEventLoop 的执行逻辑也就是 run()方法里,这些 Task 会被逐个执行。
2.理解 Reactor 线程模型主要分三部分
一.NioEventLoop 的创建
二.NioEventLoop 的启动
三.NioEventLoop 的执行
3.NioEventLoop 的创建
(1)创建入口
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
复制代码
(2)确定 NioEventLoop 的个数
由 NioEventLoopGroup 的构造方法来确定 NioEventLoop 的个数。如果 NioEventLoopGroup 没有传递构造参数,那么 NioEventLoop 线程的个数为 CPU 核数的 2 倍。如果 NioEventLoopGroup 传递了参数 n,那么 NioEventLoop 线程的个数就是 n。
(3)NioEventLoopGroup 的创建流程
NioEventLoopGroup 的构造方法会触发创建流程。
一.创建线程执行器 ThreadPerTaskExecutor
每次调用 ThreadPerTaskExecutor.execute()方法时都会创建一个线程。
二.创建 NioEventLoop
NioEventLoop 对应 NioEventLoopGroup 线程池里的线程,NioEventLoopGroup 的构造方法会用一个 for 循环通过调用 newChild()方法来创建 NioEventLoop 线程。
三.创建线程选择器 EventExecutorChooser
线程选择器的作用是用于给每个新连接分配一个 NioEventLoop 线程,也就是从 NioEventLoopGroup 线程池中选择一个 NioEventLoop 线程来处理新连接。
//MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels.
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
//Create a new instance using the default number of threads,
//the default ThreadFactory and the SelectorProvider which is returned by SelectorProvider#provider().
public NioEventLoopGroup() {
this(0);
}
//Create a new instance using the specified number of threads,
//ThreadFactory and the SelectorProvider which is returned by SelectorProvider#provider().
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
...
}
//Abstract base class for EventLoopGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
if (logger.isDebugEnabled()) logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
...
}
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
...
//Create a new instance.
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
//Create a new instance.
//@param nThreads,the number of threads that will be used by this instance.
//@param executor,the Executor to use, or null if the default should be used.
//@param chooserFactory,the EventExecutorChooserFactory to use.
//@param args,arguments which will passed to each #newChild(Executor, Object...) call
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
//1.创建ThreadPerTaskExecutor线程执行器
if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
//2.创建NioEventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
...
//创建每一个NioEventLoop时,都会调用newChild()方法给每一个NioEventLoop配置一些核心参数
//传入线程执行器executor去创建NioEventLoop
children[i] = newChild(executor, args);
}
//3.创建线程选择器
chooser = chooserFactory.newChooser(children);
...
}
...
}
复制代码
创建 NioEventLoopGroup 的脉络如下:
new NioEventLoopGroup() //线程组,线程个数默认为2 * CPU核数
new ThreadPerTaskExecutor() //创建线程执行器,作用是负责创建NioEventLoop对应的线程
for(...) { newChild() } //构造NioEventLoop,创建NioEventLoop线程组
chooserFactory.newChooser() //线程选择器,用于给每个新连接分配一个NioEventLoop线程
复制代码
(4)创建线程执行器 ThreadPerTaskExecutor
ThreadPerTaskExecutor 的作用是:每次调用它的 execute()方法执行 Runnable 任务时,都会通过 threadFactory.newThread()创建出一个线程,然后把要执行的 Runnable 任务传递进该线程进行执行。
其中成员变量 threadFactory 是在传参给 ThreadPerTaskExecutor 的构造方法时,由 newDefaultThreadFactory()方法构建的,也就是一个 DefaultThreadFactory 对象。
所以线程执行器 ThreadPerTaskExecutor 在通过 threadFactory.newThread()创建线程时,其实就是调用 DefaultThreadFactory 的 newThread()方法。
而 DefaultThreadFactory.newThread()方法创建出来的线程实体,是 Netty 经过优化之后的 FastThreadLocalThread 对象,这个线程实体在操作 ThreadLocal 时,要比 JDK 快。
ThreadPerTaskExecutor 线程执行器总结:
一.每次执行 ThreadPerTaskExecutor 的 execute()方法时,都会创建出一个 FastThreadLocalThread 的线程实体,所以 Netty 的线程实体都是由 ThreadPerTaskExecutor 创建的。
二.FastThreadLocalThread 线程实体的命名规则是:nioEventLoop-自增的线程池编号-自增的线程数编号。
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
...
//Create a new instance.
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
//1.创建ThreadPerTaskExecutor线程执行器
if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
//2.创建NioEventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
...
//创建每一个NioEventLoop时,都会调用newChild()方法给每一个NioEventLoop配置一些核心参数
//传入线程执行器executor去创建NioEventLoop
children[i] = newChild(executor, args);
}
//3.创建线程选择器
chooser = chooserFactory.newChooser(children);
...
}
protected ThreadFactory newDefaultThreadFactory() {
//getClass()是获取该方法所属的对象类型,也就是NioEventLoopGroup类型
//因为是通过NioEventLoopGroup的构造方法层层调用到这里的
return new DefaultThreadFactory(getClass());
}
...
}
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) throw new NullPointerException("threadFactory");
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
//调用DefaultThreadFactory的newThread()方法执行Runnable任务
threadFactory.newThread(command).start();
}
}
//A ThreadFactory implementation with a simple naming rule.
public class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolId = new AtomicInteger();
private final AtomicInteger nextId = new AtomicInteger();
private final boolean daemon;
private final int priority;
protected final ThreadGroup threadGroup;
...
public DefaultThreadFactory(Class<?> poolType) {
this(poolType, false, Thread.NORM_PRIORITY);
}
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
//toPoolName()方法会把NioEventLoopGroup的首字母变成小写
this(toPoolName(poolType), daemon, priority);
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority,
System.getSecurityManager() == null? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
...
//prefix用来标记线程名字的前缀
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
if (t.isDaemon()) {
if (!daemon) t.setDaemon(false);
} else {
if (daemon) t.setDaemon(true);
}
if (t.getPriority() != priority) t.setPriority(priority);
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
...
}
复制代码
(5)创建 NioEventLoop
说明一:
由 MultithreadEventExecutorGroup 的构造方法可知,Netty 会使用 for 循环 + newChild()方法来创建 nThreads 个 NioEventLoop,而且一个 NioEventLoop 对应一个线程实体 FastThreadLocalThread。
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
...
//Create a new instance.
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
//1.创建ThreadPerTaskExecutor线程执行器
if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
//2.创建NioEventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
...
//创建每一个NioEventLoop时,都会调用newChild()方法给每一个NioEventLoop配置一些核心参数
//传入线程执行器executor去创建NioEventLoop
children[i] = newChild(executor, args);
}
//3.创建线程选择器
chooser = chooserFactory.newChooser(children);
...
}
//Create a new EventExecutor which will later then accessible via the #next() method.
//This method will be called for each thread that will serve this MultithreadEventExecutorGroup.
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
...
}
复制代码
说明二:
MultithreadEventExecutorGroup 的 newChild()抽象方法是由 NioEventLoopGroup 实现的,所以在执行 NioEventLoopGroup 的默认构造方法时也会执行其 newChild()方法。
NioEventLoopGroup 的 newChild()方法需要传递一个 executor 参数,该参数就是执行 NioEventLoopGroup 构造方法开始时创建的线程执行器,之后 newChild()方法会返回一个新创建的 NioEventLoop 对象。
//MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels.
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
...
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
//executor是执行NioEventLoopGroup构造方法开始时创建的线程执行器ThreadPerTaskExecutor
//this指的是NioEventLoopGroup,表示新创建的NioEventLoop对象归属哪个NioEventLoopGroup
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
...
}
复制代码
说明三:
创建 NioEventLoop 对象时,NioEventLoop 的构造方法会通过调用其 openSelector()方法来创建一个 Selector,所以一个 Selector 就和一个 NioEventLoop 绑定了,而一个 Selector 可以将多个连接绑定在一起来负责监听这些连接的读写事件。
在 NioEventLoop 的 openSelector()方法中,Netty 会通过反射对 Selector 底层的数据结构进行优化(Hash Set => 数组)。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
//The NIO Selector.
Selector selector;
private final SelectorProvider provider;
private final SelectStrategy selectStrategy;
...
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
//调用其父类SingleThreadEventLoop的构造方法
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) throw new NullPointerException("selectorProvider");
if (strategy == null) throw new NullPointerException("selectStrategy");
this.provider = selectorProvider;
this.selector = openSelector();//创建一个Selector
this.selectStrategy = strategy;
}
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
...
} catch(IOException e) {
...
}
...
return selector;
}
...
}
复制代码
说明四:
NioEventLoop 的构造方法还会调用其父类的父类 SingleThreadEventExecutor 的构造方法。SingleThreadEventExecutor 的构造方法里有两个关键的操作:一是把线程执行器保存起来,因为后面创建 NioEventLoop 对应的线程时要用到。二是创建一个 MPSC 任务队列,因为 Netty 中所有异步执行的本质都是通过该任务队列来协调完成的。
//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
private final Queue<Runnable> tailTasks;
...
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
//调用其父类SingleThreadEventExecutor的构造方法
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
//调用父类SingleThreadEventExecutor的newTaskQueue()方法
tailTasks = newTaskQueue(maxPendingTasks);
}
...
}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private final boolean addTaskWakesUp;
private final Executor executor;
private final int maxPendingTasks;
private final Queue<Runnable> taskQueue;
private final RejectedExecutionHandler rejectedExecutionHandler;
...
//Create a new instance
protectedSingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory,
boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
}
//Create a new instance
//@param parent,the EventExecutorGroup which is the parent of this instance and belongs to it
//@param executor,the Executor which will be used for executing
//@param addTaskWakesUp,true if and only if invocation of #addTask(Runnable) will wake up the executor thread
//@param maxPendingTasks,the maximum number of pending tasks before new tasks will be rejected.
//@param rejectedHandler,the RejectedExecutionHandler to use.
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
//关键操作一:把线程执行器保存起来
this.executor = ObjectUtil.checkNotNull(executor, "executor");
//关键操作二:创建一个MPSC任务队列
this.taskQueue = newTaskQueue(this.maxPendingTasks);
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
//Create a new Queue which will holds the tasks to execute.
//NioEventLoop会重写这个newTaskQueue()方法
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
...
}
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
...
//创建一个MPSC任务队列
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
//This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(maxPendingTasks);
}
...
}
复制代码
MPSC 队列也就是多生产者单消费者队列。单消费者是指某个 NioEventLoop 对应的线程(执行其 run()方法的那个线程)。多生产者就是这个 NioEventLoop 对应的线程之外的线程,通常情况下就是我们的业务线程。比如,一些线程在调用 writeAndFlush()方法时可以不用考虑线程安全而随意调用,那么这些线程就是多生产者。
MPSC 队列是通过 JCTools 这个工具包来实现的,Netty 的高性能很大程度上要归功于这个工具包。MPSC 的全称是 Muti Producer Single Consumer。Muti Producer 对应的是外部线程,Single Consumer 对应的是 Netty 的 NioEventLoop 线程。外部线程在执行 Netty 的一些任务时,如果判断不是由 NioEventLoop 对应的线程执行的,就会直接放入一个任务队列里,然后由一个 NioEventLoop 对应的线程去执行。
创建 NioEventLoop 总结:
NioEventLoopGroup 的 newChild()方法创建 NioEventLoop 时做了三项事情:一.创建一个 Selector 用于轮询注册到该 NioEventLoop 上的连接,二.创建一个 MPSC 任务队列,三.保存线程执行器到 NioEventLoop。
(6)创建线程选择器 EventExecutorChooser
说明一:
在传统的 BIO 编程中,一个新连接被创建后,通常需要给这个连接绑定一个 Selector,之后这个连接的整个生命周期都由这个 Selector 管理。
说明二:
创建 NioEventLoop 时会创建一个 Selector,所以一个 Selector 会对应一个 NioEventLoop,一个 NioEventLoop 上会有一个 Selector。线程选择器的作用就是为一个连接在 NioEventLoopGroup 中选择一个 NioEventLoop,从而将该连接绑定到这个 NioEventLoop 的 Selector 上。
说明三:
根据 MultithreadEventExecutorGroup 的构造方法,会使用 DefaultEventExecutorChooserFactory 的 newChooser()方法来创建线程选择器。创建好线程选择器 EventExecutorChooser 之后,便可以通过其 next()方法获取一个 NioEventLoop。
Netty 通过判断 NioEventLoopGroup 中的 NioEventLoop 个数是否是 2 的幂来创建不同的线程选择器。但不管是哪一种选择器,最终效果都是从第一个 NioEventLoop 开始遍历到最后一个 NioEventLoop,然后再从第一个 NioEventLoop 开始,如此循环。
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
...
//Create a new instance.
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
//Create a new instance.
//@param nThreads,the number of threads that will be used by this instance.
//@param executor,the Executor to use, or null if the default should be used.
//@param chooserFactory,the EventExecutorChooserFactory to use.
//@param args,arguments which will passed to each #newChild(Executor, Object...) call
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
//1.创建ThreadPerTaskExecutor线程执行器
if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
//2.创建NioEventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
...
//创建每一个NioEventLoop时,都会调用newChild()方法给每一个NioEventLoop配置一些核心参数
//传入线程执行器executor去创建NioEventLoop
children[i] = newChild(executor, args);
}
//3.创建线程选择器,chooserFactory就是传入的DefaultEventExecutorChooserFactory实例
chooser = chooserFactory.newChooser(children);
...
}
...
}
//Default implementation which uses simple round-robin to choose next EventExecutor.
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
//如果NioEventLoop个数是2的幂,则进行位与运算
return new PowerOfTowEventExecutorChooser(executors);
} else {
//如果NioEventLoop个数不是2的幂,则进行普通取模运算
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
复制代码
说明四:
创建 NioEventLoopGroup 的最后一个步骤就是创建线程选择器 chooser,创建线程选择器的流程如下:
chooserFactory.newChooser() //创建线程选择器的入口,chooser的作用就是为新连接绑定一个NioEventLoop
DefaultEventExecutorChooserFactory.isPowerOfTwo() //判断NioEventLoop个数是否为2的幂
PowerOfTowEventExecutorChooser //优化
index++ & (length - 1) //位与运算
GenericEventExecutorChooser //普通
abs(index++ % length) //取模运算
复制代码
(7)NioEventLoopGroup 的创建总结
默认情况下,NioEventLoopGroup 会创建 2 倍 CPU 核数个 NioEventLoop。一个 NioEventLoop 和一个 Selector 以及一个 MPSC 任务队列一一对应。
NioEventLoop 线程的命名规则是 nioEventLoopGroup-xx-yy,其中 xx 表示全局第 xx 个 NioEventLoopGroup 线程池,yy 表示这个 NioEventLoop 在这个 NioEventLoopGroup 中是属于第 yy 个。
线程选择器 chooser 的作用是为一个连接选择一个 NioEventLoop,可通过线程选择器的 next()方法返回一个 NioEventLoop。如果 NioEventLoop 的个数为 2 的幂,则 next()方法会使用位与运算进行优化。
一个 NioEventLoopGroup 会有一个线程执行器 executor、一个线程选择器 chooser、一个数组 children 存放 2 倍 CPU 核数个 NioEventLoop。
4.NioEventLoop 的启动
(1)启动 NioEventLoop 的两大入口
入口一:服务端启动,注册服务端 Channel 到 Selector 时
入口二:新连接接入,通过 chooser 绑定一个 NioEventLoop 时
下面先看入口一:
调用 ServerBootstrap 的 bind()方法其实会调用 AbstractBootstrap 的 doBind()方法,然后会调用 AbstractBootstrap 的 initAndRegister()方法,接着执行 config().group().register(channel)注册服务端 Channel。最后会逐层深入调用到 AbstractChannel.AbstractUnsafe 的 register()方法,来启动一个 NioEventLoop 将服务端 Channel 注册到 Selector 上。
//Bootstrap sub-class which allows easy bootstrap of ServerChannel
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
...
...
}
//AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.
//It support method-chaining to provide an easy way to configure the AbstractBootstrap.
//When not used in a ServerBootstrap context, the #bind() methods are useful for connectionless transports such as datagram (UDP).
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
...
//Create a new Channel and bind it.
public ChannelFuture bind(int inetPort) {
//首先根据端口号创建一个InetSocketAddress对象,然后调用重载方法bind()
return bind(new InetSocketAddress(inetPort));
}
//Create a new Channel and bind it.
public ChannelFuture bind(SocketAddress localAddress) {
//验证服务启动需要的必要参数
validate();
if (localAddress == null) throw new NullPointerException("localAddress");
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();//1.初始化和注册Channel
final Channel channel = regFuture.channel();
...
doBind0(regFuture, channel, localAddress, promise);//2.绑定服务端端口
...
return promise;
}
final ChannelFuture initAndRegister() {
Channel channel = null;
...
//1.创建服务端Channel
channel = channelFactory.newChannel();
//2.初始化服务端Channel
init(channel);
...
//3.注册服务端Channel,比如通过NioEventLoopGroup的register()方法进行注册
ChannelFuture regFuture = config().group().register(channel);
...
return regFuture;
}
...
}
//Bootstrap sub-class which allows easy bootstrap of ServerChannel
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
...
@Override
public final ServerBootstrapConfig config() {
return config;
}
...
}
public abstract class AbstractBootstrapConfig<B extends AbstractBootstrap<B, C>, C extends Channel> {
protected final B bootstrap;
...
protected AbstractBootstrapConfig(B bootstrap) {
this.bootstrap = ObjectUtil.checkNotNull(bootstrap, "bootstrap");
}
//Returns the configured EventLoopGroup or null if non is configured yet.
public final EventLoopGroup group() {
//比如返回一个NioEventLoopGroup对象
return bootstrap.group();
}
...
}
//MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels.
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
...
...
}
//Abstract base class for EventLoopGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
...
@Override
public ChannelFuture register(Channel channel) {
//先通过next()方法获取一个NioEventLoop,然后通过NioEventLoop.register()方法注册服务端Channel
return next().register(channel);
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
...
}
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
...
@Override
public EventExecutor next() {
//通过线程选择器chooser选择一个NioEventLoop
return chooser.next();
}
...
}
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
...
...
}
//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
...
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//调用AbstractUnsafe的register()方法
promise.channel().unsafe().register(this, promise);
return promise;
}
...
}
//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private volatile EventLoop eventLoop;
...
//Unsafe implementation which sub-classes must extend and use.
protected abstract class AbstractUnsafe implements Unsafe {
...
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
//绑定事件循环器,即绑定一个NioEventLoop到该Channel上
AbstractChannel.this.eventLoop = eventLoop;
//注册Selector,并启动一个NioEventLoop
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
...
//通过启动这个NioEventLoop线程来调用register0()方法将这个服务端Channel注册到Selector上
//其实执行的是SingleThreadEventExecutor的execute()方法
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
...
}
}
}
...
}
复制代码
(2)判断当前线程是否是 NioEventLoop 线程
调用 NioEventLoop 的 inEventLoop()方法可以判断当前线程是否是 Netty 的 Reactor 线程,也就是 NioEventLoop 对应的线程实体。NioEventLoop 的线程实体被创建之后,会将该线程实体保存到 NioEventLoop 父类的成员变量 thread 中。
服务端启动、注册服务端 Channel 到 Selector,执行到 AbstractUnsafe.register()方法中的 eventLoop.inEventLoop()代码时,会将 main 方法对应的主线程传递进来与 this.thread 进行比较。由于 this.thread 此时并未赋值,所以为空,因此 inEventLoop()方法返回 false,于是便会执行 eventLoop.execute()代码创建一个线程并启动。
//SingleThreadEventLoop implementation which register the Channel's
//to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
...
...
}
//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
...
...
}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
...
}
//Abstract base class for EventExecutors that want to support scheduling.
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
...
...
}
//Abstract base class for {@link EventExecutor} implementations.
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
...
@Override
public boolean inEventLoop() {
//注册服务端Channel时是通过主线程进行注册的,Thread.currentThread()对应的就是main线程
//调用SingleThreadEventExecutor.inEventLoop()方法
return inEventLoop(Thread.currentThread());
}
...
}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private volatile Thread thread;
...
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;//此时线程还没创建,this.thread为null
}
...
}
复制代码
(3)创建一个线程并启动
AbstractUnsafe.register()方法准备将服务端 Channel 注册到 Selector 上时,首先在判断条件中执行 eventLoop.inEventLoop()代码发现为 false,于是便执行 eventLoop.execute()代码创建一个线程并启动它去执行注册任务。而执行 eventLoop.execute()代码其实就是调用 SingleThreadEventExecutor 的 execute()方法。
//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
...
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//调用AbstractUnsafe的register()方法,并把NioEventLoop自己当作参数传入
promise.channel().unsafe().register(this, promise);
return promise;
}
...
}
//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private volatile EventLoop eventLoop;
...
//Unsafe implementation which sub-classes must extend and use.
protected abstract class AbstractUnsafe implements Unsafe {
...
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
//绑定事件循环器,即绑定一个NioEventLoop到该Channel上
AbstractChannel.this.eventLoop = eventLoop;
//注册Selector,并启动一个NioEventLoop
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
...
//通过启动这个NioEventLoop线程来调用register0()方法将这个服务端Channel注册到Selector上
//其实执行的是SingleThreadEventExecutor的execute()方法
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
...
}
}
}
...
}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private volatile Thread thread;
//创建NioEventLoop时会通过构造方法传入NioEventLoopGroup的线程执行器executor
private final Executor executor;
...
@Override
public void execute(Runnable task) {
if (task == null) throw new NullPointerException("task");
boolean inEventLoop = inEventLoop();
//判断当前线程是否是Netty的Reactor线程
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) reject();
}
if (!addTaskWakesUp && wakesUpForTask(task)) wakeup(inEventLoop);
}
private void startThread() {
//判断Reactor线程有没有被启动;如果没有被启动,则通过CAS调用doStartThread()方法启动线程
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
private void doStartThread() {
assert thread == null;
//executor.execute()方法会创建出一个FastThreadLocalThread线程来执行Runnable任务
//所以在Runnable的run()方法中,Thread.currentThread()指的是这个FastThreadLocalThread线程
executor.execute(new Runnable() {
@Override
public void run() {
//Thread.currentThread()指的是FastThreadLocalThread线程
thread = Thread.currentThread();
...
SingleThreadEventExecutor.this.run();//启动线程
...
}
});
}
//具体的run()方法由子类比如NioEventLoop来实现
protected abstract void run();
...
}
复制代码
SingleThreadEventExecutor 的 execute()方法的说明如下:
一.这个方法也可能会被用户代码使用,如 ctx.executor().execute(task)。所以 execute()方法里又调用 inEventLoop()方法进行了一次外部线程判断,确保执行 task 任务时不会遇到线程问题。
二.如果当前线程不是 Netty 的 Reactor 线程,则调用 startThread()方法启动一个 Reactor 线程。在 startThread()方法中首先会判断当前 NioEventLoop 对应的 Reactor 线程实体有没有被启动。如果没有被启动,则通过设置 CAS 成功后调用 doStartThread()方法启动线程。
三.执行 doStartThread()方法时,会调用 NioEventLoop 的内部成员变量 executor 的 execute()方法。executor 就是线程执行器 ThreadPerTaskExecutor,它的作用是每次执行 Runnable 任务时都会创建一个线程来执行。也就是 executor.execute()方法会通过 DefaultThreadFactory 的 newThread()方法,创建出一个 FastThreadLocalThread 线程来执行 Runnable 任务。
四.doStartThread()方法的 Runnable 任务会由一个 FastThreadLocalThread 线程来执行。在 Runnable 任务的 run()方法里,会保存 ThreadPerTaskExecutor 创建出来的 FastThreadLocalThread 对象到 SingleThreadEventExecutor 的成员变量 thread 中,然后调用 SingleThreadEventExecutor 的 run()方法。
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
...
//Create a new instance.
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
//1.创建ThreadPerTaskExecutor线程执行器
if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
//2.创建NioEventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
...
//创建每一个NioEventLoop时,都会调用newChild()方法给每一个NioEventLoop配置一些核心参数
//传入线程执行器executor去创建NioEventLoop
children[i] = newChild(executor, args);
}
//3.创建线程选择器
chooser = chooserFactory.newChooser(children);
...
}
protected ThreadFactory newDefaultThreadFactory() {
//getClass()是获取该方法所属的对象类型,也就是NioEventLoopGroup类型
//因为是通过NioEventLoopGroup的构造方法层层调用到这里的
return new DefaultThreadFactory(getClass());
}
...
}
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) throw new NullPointerException("threadFactory");
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
//调用DefaultThreadFactory的newThread()方法执行Runnable任务
threadFactory.newThread(command).start();
}
}
//A ThreadFactory implementation with a simple naming rule.
public class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolId = new AtomicInteger();
private final AtomicInteger nextId = new AtomicInteger();
private final boolean daemon;
private final int priority;
protected final ThreadGroup threadGroup;
...
public DefaultThreadFactory(Class<?> poolType) {
this(poolType, false, Thread.NORM_PRIORITY);
}
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
//toPoolName()方法会把NioEventLoopGroup的首字母变成小写
this(toPoolName(poolType), daemon, priority);
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority,
System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
...
//prefix用来标记线程名字的前缀
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
if (t.isDaemon()) {
if (!daemon) t.setDaemon(false);
} else {
if (daemon) t.setDaemon(true);
}
if (t.getPriority() != priority) t.setPriority(priority);
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
...
}
复制代码
NioEventLoop 是如何与一个线程实体绑定的?NioEventLoop 会通过线程执行器 ThreadPerTaskExecutor 创建一个 FastThreadLocalThread,然后再将该 FastThreadLocalThread 线程保存到其成员变量中,从而实现与一个线程实体进行绑定。
(4)NioEventLoop 的启动总结
一.在注册服务端 Channel 的过程中,主线程最终会调用 AbstractUnsafe 的 register()方法。该方法首先会将一个 NioEventLoop 绑定到这个服务端 Channel 上,然后把实际注册 Selector 的逻辑封装成一个 Runnable 任务,接着调用 NioEventLoop 的 execute()方法来执行这个 Runnable 任务。
二.NioEventLoop 的 execute()方法其实就是其父类 SingleThreadEventExecutor 的 execute()方法,它会先判断当前调用 execute()方法的线程是不是 Netty 的 Reactor 线程,如果不是就调用 startThread()方法来创建一个 Reactor 线程。
三.startThread()方法会通过线程执行器 ThreadPerTaskExecutor 的 execute()方法来创建一个线程。这个线程是一个 FastThreadLocalThread 线程,这个线程需要执行如下逻辑:把线程保存到 NioEventLoop 的成员变量 thread 中,然后调用 NioEventLoop 的 run()方法来启动 NioEventLoop。
NioEventLoop 的启动流程如下:
bind() -> initAndRegister() -> config().group().register() -> eventloop.execute() //入口
startThread() -> doStartThread() //创建线程
ThreadPerTaskExecutor.execute() //线程执行器创建FastThreadLocalThread线程
thread = Thread.currentThread() //保存FastThreadLocalThread线程到NioEventLoop的成员变量中
NioEventLoop.run() //启动NioEventLoop
复制代码
NioEventLoop 的启动流程说明如下:
首先 bind()方法会将具体绑定端口的操作封装成一个 Runnable 任务,然后调用 NioEventLoop 的 execute()方法,接着 Netty 会判断调用 execute()方法的线程是否是 NIO 线程,如果发现不是就会调用 startThread()方法开始创建线程。
创建线程是通过线程执行器 ThreadPerTaskExecutor 来创建的。线程执行器的作用是每执行一个任务都会创建一个线程,而且创建出来的线程就是 NioEventLoop 底层的一个 FastThreadLocalThread 线程。
创建完 FastThreadLocalThread 线程后会执行一个 Runnable 任务,该 Runnable 任务首先会将这个线程保存到 NioEventLoop 对象。保存的目的是为了判断后续对 NioEventLoop 的相关执行线程是否为本身。如果不是就将封装好的一个任务放入 TaskQueue 中进行串行执行,实现线程安全。该 Runnable 任务然后会调用 NioEventLoop 的 run()方法,从而启动 NioEventLoop。NioEventLoop 的 run()方法是驱动 Netty 运转的核心方法。
文章转载自:东阳马生架构
原文链接:https://www.cnblogs.com/mjunz/p/18784178
体验地址:http://www.jnpfsoft.com/?from=001YH
评论