1
ThreadPoolExecutor 线程销毁源码分析
作者:new life
- 2021 年 11 月 07 日
本文字数:4398 字
阅读完需:约 14 分钟
线程池在开发中使用非常频繁,在面试中也是高频面试点,最近翻看源码,有一些心得分享下:
线程池回收线程的大致流程:
1、调用线程池的 shutdown() 方法或者 shutdownNow() 方法;
2、主线程修改线程池的状态:
shutdown() -> SHUTDOWN
shutdownNow() -> STOP
3、中断线程池中所有线程;中断一些阻塞的线程,让线程正常结束执行;
4、每个 work 线程在获取任务的时候 getTask(), 校验当前线程池的状态,如果线程池状态已经被修改为非 RUNNING 状态,时机合适,work 线程退出;
详情请看下面的源码注释:
1. 线程池状态:
// 线程池初始状态
private static final int RUNNING = -1 << COUNT_BITS;
// 调用 shutdown() 方法后线程池状态
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 调用 shutdownNow() 方法后线程池状态
private static final int STOP = 1 << COUNT_BITS;
// 在 TERMINATED 前的一个过渡状态,用处不多
private static final int TIDYING = 2 << COUNT_BITS;
// 线程池终止状态
private static final int TERMINATED = 3 << COUNT_BITS;
复制代码
2.1 shutdown 方法
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 校验调用 shutdown() 线程是否可以操作此方法,校验被终止任务的线程(works)是否可以被终止
checkShutdownAccess();
// CAS 修改线程池运行状态为:SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断所有工作线程
interruptIdleWorkers();
// hook for ScheduledThreadPoolExecutor
onShutdown();
} finally {
mainLock.unlock();
}
// 尝试中断线程池
tryTerminate();
}
复制代码
2.2 shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 校验调用 shutdownNow() 线程是否可以操作此方法,校验被终止任务的线程(works)是否可以被终止
checkShutdownAccess();
// CAS 修改线程池运行状态为:STOP
advanceRunState(STOP);
// 中断所有工作线程
interruptWorkers();
// 移除队列中的任务,并返回被移除的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
复制代码
3. interruptIdleWorkers
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历 works 工作线程集合,中断每个阻塞获取任务的线程,让这些线程执行完当前逻辑,退出 run() 方法
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
复制代码
4. tryTerminate()
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 线程池 RUNNING 退出
// 线程池 TIDYING 退出
// 线程池 SHUTDOWN 并且 队列不为空(还有没处理完的任务)退出
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 还有活跃的线程,中断一个
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
复制代码
5.Work 类:工作线程
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
private static final long serialVersionUID = 6138294804551838833L;
/** 线程池里的工作线程,该线程在创建 Work 对象的时候传入 */
final Thread thread;
/** 工作线程创建好后,执行的第一个任务 */
Runnable firstTask;
/** 工作线程执行任务数量 */
volatile long completedTasks;
/**
* 创建一个Work线程,指定第一个被执行的任务
*/
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** work 线程启动后,执行run方法,直到线程退出 */
public void run() {
runWorker(this);
}
}
复制代码
6. runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// task 默认为创建work时候指定的第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
/**
* 1、第一个任务不为 null,先执行第一个任务,往后循环的从任务队列里取任务执行
* 2、只有 getTask() 返回null的时候,才会退出循环
*/
while (task != null || (task = getTask()) != null) {
w.lock();
/** double check
* 防止线程池已经被设置成 STOP 状态后,部分线程响应中断失败,
* 这个地方再重新校验并设置下当前线程的中断状态
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 默认没有实现,可以在子类中扩展
beforeExecute(wt, task);
Throwable thrown = null;
// 执行任务
task.run();
// 省略部分 try-catch 代码
} finally {
task = null;
// 当前线程执行任务数累计
w.completedTasks++;
w.unlock();
}
}
// 当 getTask() 为null的时候可以走到这一步
completedAbruptly = false;
} finally {
// getTask() 为 null,任务被执行完了,线程退出
processWorkerExit(w, completedAbruptly);
}
复制代码
7. getTask
private Runnable getTask() {
// 记录上次从任务队列获取任务是否超时
boolean timedOut = false;
// 循环遍历,直到获取到任务或者被中断
for (;;) {
// 线程池线程数量
int c = ctl.get();
// 线程池状态
int rs = runStateOf(c);
// 如果线程池状态:RUNNING(-1)|SHUTDOWN(0)|STOP(1)|TIDYING(2)|TERMINATED(3)
// 两种情况:
// 1、如果是 STOP 以后状态(不用管任务队列里是否有任务未执行完),线程总数减1,立即返回 null;
// 2、在上面的情况之后,如果是 SHUTDOWN 以后的状态 && 任务队列为空(因为要等任务执行完),线程总数减1,并立即返回 null;
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/**
* 1、allowCoreThreadTimeOut:是否让核心线程空闲回收,默认false
* 2、当前线程池中线程数是否大于核心线程数
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 在线程池中'还有线程'或者'任务队列没有任务'的情况下:
* 1、如果线程池中的线程数,大于设置的最大线程数,必须停止当前线程;
* 2、在当前线程获取任务超时(说明队列里没有任务可以被执行):
* (1)而核心线程允许超时回收,则回收当前线程(因为核心线程和线程池中其他线程没有特殊的标记,
* 回收了就回收了,如果下次线程池中线程不足,再新建一个线程扮演核心线程的角色即可);
* (2) 当前线程总数超过了核心线程数,而现在又没有任务需要执行,自然回收当前线程;
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/**
* 1、如果允许核心线程空闲退出 或者 当前线程数大于核心线程数的情况下:
* poll() 设置获取任务的阻塞时间,超时后,返回 null,当循环执行到上面的两处校验处,返回 null 退出;
* 2、take()阻塞式获取任务,知道任务队列有可被执行任务为止;
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 超时后仍未获取到任务
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
复制代码
8. processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 统计每个线程完成的任务数
completedTaskCount += w.completedTasks;
// 从线程池(set)中移除当前线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 修改线程池中断状态
tryTerminate();
int c = ctl.get();
/**
* 如果当前线程池的状态不是 STOP,说明当前状态是 RUNNING 或者 SHUTDOWN,
* 这个时候要保证线程池内有必要的线程去执行队列的任务
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果当前线程数为0,但是任务队列里又有任务需要被执行,必须调整线程数
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 给线程池新增一个线程,保证任务被执行完
addWorker(null, false);
}
}
复制代码
划线
评论
复制
发布于: 2021 年 11 月 07 日阅读数: 13
new life
关注
还未添加个人签名 2019.03.04 加入
还未添加个人简介
评论