Java 并发系列终结篇:彻底搞懂 Java 线程池的工作原理,nginx 性能优化面试题
下面我们通过位运算来验证一下 ctl 是如何工作的,当然,如果你不理解这个位运算的过程对理解线程池的源码影响并不大,所以对以下验证内容不感兴趣的同学可以直接略过。
可以看到上述代码中 RUNNING 的值为-1 左移 29 位,我们知道在计算机中**负数是以其绝对值的补码来表示的,而补码是由反码加 1 得到。**因此-1 在计算机中存储形式为 1 的反码+1
1 的原码:00000000 00000000 00000000 00000001
1 的反码:11111111 11111111 11111111 11111110
-1 存储: 11111111 11111111 11111111 11111111
接下来对-1 左移 29 位可以得到 RUNNING 的值为:
// 高三位表示线程状态,即高三位为 111 表示 RUNNING
11100000 00000000 00000000 00000000
而 AtomicInteger 初始线程数量是 0,因此 ctlOf 方法中的“|”运算如下:
RUNNING: 11100000 00000000 00000000 00000000
|
线程数为 0: 00000000 00000000 00000000 00000000
得到 ctl: 11100000 00000000 00000000 00000000
通过 RUNNING|0(线程数)即可得到 ctl 的初始值。同时还可以通过以下方法将 ctl 拆解成运行状态和线程数:
// 00001111 11111111 11111111 11111111
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 获取线程池运行状态
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
// 获取线程池中的线程数
private static int workerCountOf(int c) { return c & COUNT_MASK; }
假设此时线程池为 RUNNING 状态,且线程数为 0,验证一下 runStateOf 是如何得到线程池的运行状态的:
COUNT_MASK: 00001111 11111111 11111111 11111111
~COUNT_MASK: 11110000 00000000 00000000 00000000
&
ctl: 11100000 00000000 00000000 00000000
RUNNING: 11100000 00000000 00000000 00000000
复制代码
如果不理解上边的验证流程没有关系,只要知道通过 runStateOf 方法可以得到线程池的运行状态,通过 workerCountOf 可以得到线程池中的线程数即可。
接下来我们进入线程池的源码的源码分析环节。
2.ThreadPoolExecutor 的 execute
============================
向线程池提交任务的方法是 execute 方法,execute 方法是 ThreadPoolExecutor 的核心方法,以此方法为入口来进行剖析,execute 方法的代码如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取 ctl 的值
int c = ctl.get();
// 1.线程数小于 corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 线程池中线程数小于核心线程数,则尝试创建核心线程执行任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.到此处说明线程池中线程数大于核心线程数或者创建线程失败
if (isRunning(c) && workQueue.offer(command)) {
// 如果线程是运行状态并且可以使用 offer 将任务加入阻塞队列未满,offer 是非阻塞操作。
int recheck = ctl.get();
// 重新检查线程池状态,因为上次检测后线程池状态可能发生改变,如果非运行状态就移除任务并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果是运行状态,并且线程数是 0,则创建线程
else if (workerCountOf(recheck) == 0)
// 线程数是 0,则创建非核心线程,且不指定首次执行任务,这里的第二个参数其实没有实际意义
addWorker(null, false);
}
// 3.阻塞队列已满,创建非核心线程执行任务
else if (!addWorker(command, false))
// 如果失败,则执行拒绝策略
reject(command);
}
execute 方法中的逻辑可以分为三部分:
1.如果线程池中的线程数小于核心线程,则直接调用 addWorker 方法创建新线程来执行任务。
2.如果线程池中的线程数大于核心线程数,则将任务添加到阻塞队列中,接着再次检验线程池的运行状态,因为上次检测过之后线程池状态有可能发生了变化,如果线程池关闭了,那么移除任务,执行拒绝策略。如果线程依然是运行状态,但是线程池中没有线程,那么就调用 addWorker 方法创建线程,注意此时传入任务参数是 null,即不指定执行任务,因为任务已经加入了阻塞队列。创建完线程后从阻塞队列中取出任务执行。
3.如果第 2 步将任务添加到阻塞队列失败了,说明阻塞队列任务已满,那么则会执行第三步,即创建非核心线程来执行任务,如果非核心线程创建失败那么就执行拒绝策略。
可以看到,代码的执行逻辑和我们在第二章中分析的线程池的工作流程是一样的。
接下来看下 execute 方法中创建线程的方法 addWoker,addWoker 方法承担了核心线程和非核心线程的创建,通过一个 boolean 参数 core 来区分是创建核心线程还是非核心线程。先来看 addWorker 方法前半部分的代码:
// 返回值表示是否成功创建了线程
private boolean addWorker(Runnable firstTask, boolean core) {
// 这里做了一个 retry 标记,相当于 goto.
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
// 根据 core 来确定创建最大线程数,超过最大值则创建线程失败,注意这里的最大值可能有 s 三个 corePoolSize、maximumPoolSize 和线程池线程的最大容量
if (workerCountOf(c)
= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 通过 CAS 来将线程数+1,如果成功则跳出循环,执行下边逻辑
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 线程池的状态发生了改变,退回 retry 重新执行
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}
// ...省略后半部分
return workerStarted;
}
这部分代码会通过是否创建核心线程来确定线程池中线程数的值,如果是创建核心线程,那么最大值不能超过 corePoolSize,如果是创建非核心线程那么线程数不能超过 maximumPoolSize,另外无论是创建核心线程还是非核心线程,最大线程数都不能超过线程池允许的最大线程数 COUNT_MASK(有可能设置的 maximumPoolSize 大于 COUNT_MASK)。如果线程数大于最大值就返回 false,创建线程失败。
接下来通过 CAS 将线程数加 1,如果成功那么就 break retry 结束无限循环,如果 CAS 失败了则就 continue retry 从新开始 for 循环,注意这里的 retry 不是 Java 的关键字,是一个可以任意命名的字符。
接下来,如果能继续向下执行则开始执行创建线程并执行任务的工作了,看下 addWorker 方法的后半部分代码:
private boolean addWorker(Runnable firstTask, boolean core) {
// ...省略前半部分
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 实例化一个 Worker,内部封装了线程
w = new Worker(firstTask);
// 取出新建的线程
final Thread t = w.thread;
if (t != null) {
// 这里使用 ReentranLock 加锁保证线程安全
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
// 拿到锁湖重新检查线程池状态,只有处于 RUNNING 状态或者处于 SHUTDOWN 并且 firstTask==null 时候才会创建线程
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 线程不是处于 NEW 状态,说明线程已经启动,抛出异常
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// 将线程加入线程队列,这里的 worker 是一个 HashSet
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 开启线程执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
这部分逻辑其实比较容易理解,就是创建 Worker 并开启线程执行任务的过程,Worker 是对线程的封装,创建的 worker 会被添加到 ThreadPoolExecutor 中的 HashSet 中。也就是线程池中的线程都维护在这个名为 workers 的 HashSet 中并被 ThreadPoolExecutor 所管理,HashSet 中的线程可能处于正在工作的状态,也可能处于空闲状态,一旦达到指定的空闲时间,则会根据条件进行回收线程。
我们知道,线程调用 start 后就会开始执行线程的逻辑代码,执行完后线程的生命周期就结束了,那么线程池是如何保证 Worker 执行完任务后仍然不结束的呢?当线程空闲超时或者关闭线程池又是怎样进行线程回收的呢?这个实现逻辑其实就在 Worker 中。看下 Worker 的代码:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 执行任务的线程
final Thread thread;
// 初始化 Worker 时传进来的任务,可能为 null,如果不空,则创建和立即执行这个 task,对应核心线程创建的情况
Runnable firstTask;
Worker(Runnable firstTask) {
// 初始化时设置 setate 为-1
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 通过线程工程创建线程
this.thread = getThreadFactory().newThread(this);
}
// 线程的真正执行逻辑
public void run() {
runWorker(this);
}
// 判断线程是否是独占状态,如果不是意味着线程处于空闲状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 获取锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// ...
}
Worker 是位于 ThreadPoolExecutor 中的一个内部类,它继承了 AQS,使用 AQS 来实现了独占锁的功能,但是并没支持可重入。这里使用不可重入的特性来表示线程的执行状态,即可以通过 isHeldExclusively 方法来判断,如果是独占状态,说明线程正在执行任务,如果非独占状态,说明线程处于空闲状态。关于 AQS 我们前边文章中已经详细分析过了,不了解 AQS 的可以翻看前边 ReentranLock 的文章。
另外,Worker 还实现了 Runnable 接口,因此它的执行逻辑就是在 run 方法中,run 方法调用的是线程池中的 runWorker(this)方法。任务的执行逻辑就在 runWorker 方法中,它的代码如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 取出 Worker 中的任务,可能为空
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// task 不为 null 或者阻塞队列中有任务,通过循环不断的从阻塞队列中取出任务执行
while (task != null || (task = getTask()) != null) {
w.lock();
// ...
try {
// 任务执行前的 hook 点
beforeExecute(wt, task);
try {
// 执行任务
task.run();
// 任务执行后的 hook 点
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 超时没有取到任务,则回收空闲超时的线程
processWorkerExit(w, completedAbruptly);
}
}
可以看到,runWorker 的核心逻辑就是不断通过 getTask 方法从阻塞队列中获取任务并执行.通过这样的方式实现了线程的复用,避免了创建线程。这里要注意的是这里是一个“生产者-消费者”模式,getTask 是从阻塞队列中取任务,所以如果阻塞队列中没有任务的时候就会处于阻塞状态。getTask 中通过判断是否要回收线程而设置了等待超时时间,如果阻塞队列中一直没有任务,那么在等待 keepAliveTime 时间后会抛出异常。最终会走到上述代码的 finally 方法中,意味着有线程空闲时间超过了 keepAliveTime 时间,那么调用 processWorkerExit 方法移除 Worker。processWorkerExit 方法中没有复杂难以理解的逻辑,这里就不再贴代码了。我们重点看下 getTask 中是如何处理的,代码如下:
private Runnable getTask() {
boolean timedO
ut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// ...
// Flag1. 如果配置了 allowCoreThreadTimeOut==true 或者线程池中的线程数大于核心线程数,则 timed 为 true,表示开启指定线程超时后被回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// ...
try {
// Flag2. 取出阻塞队列中的任务,注意如果 timed 为 true,则会调用阻塞队列的 poll 方法,并设置超时时间为 keepAliveTime,如果超时没有取到任务则会抛出异常。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
重点看 getTask 是如何处理空闲超时的逻辑的。我们知道,回收线程的条件是线程大于核心线程数或者配置了 allowCoreThreadTimeOut 为 true,当线程空闲超时的情况下就会回收线程。上述代码在 Flag1 处先判断了如果线程池中的线程数大于核心线程数,或者开启了 allowCoreThreadTimeOut,那么就需要开启线程空闲超时回收。所以在 Flag2 处,timed 为 true 的情况下调用了阻塞队列的 poll 方法,并传入了超时时间为 keepAliveTime,如果在 keepAliveTime 时间内,阻塞队列一直为 null 那么久会抛出异常,结束 runWorker 的循环。进而执行 runWorker 方法中回收线程的操作。
这里需要我们理解阻塞队列 poll 方法的使用,poll 方法接受一个时间参数,是一个阻塞操作,在给定的时间内没有获取到数据就会抛出异常。其实说白了,阻塞队列就是一个使用 ReentranLock 实现的“生产者-消费者”模式,我们在深入理解 Java 线程的等待与唤醒机制(二)这篇文章中使用 ReentranLock 实现“生产者-消费者”模型其实就是一个简单的阻塞队列,与 JDK 中的 BlockingQueue 实现机制类似。感兴趣的同学可以自己查看 ArrayBlockingQueue 等阻塞队列的实现,限于文章篇幅,这里就不再赘述了。
3.ThreadPoolExecutor 的拒绝策略
=========================
上一小节中我们多次提到线程池的拒绝策略,它是在 reject 方法中实现的。实现代码也非常简单,代码如下:
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
通过调用 handler 的 rejectedExecution 方法实现。这里其实就是运用了策略模式,handler 是一个 RejectedExecutionHandler 类型的成员变量,RejectedExecutionHandler 是一个接口,只有一个 rejectedExecution 方法。在实例化线程池时构造方法中传入对应的拒绝策略实例即可。前文已经提到了 Java 提供的几种默认实现分别为 DiscardPolicy、DiscardOldestPolicy、CallerRunsPolicy 以及 AbortPolicy。
以 AbortPolicy 直接抛出异常为例,来看下代码实现:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
评论