1
ThreadPoolExecutor 线程销毁源码分析
作者:new life
- 2021 年 11 月 07 日
本文字数:4398 字
阅读完需:约 14 分钟

线程池在开发中使用非常频繁,在面试中也是高频面试点,最近翻看源码,有一些心得分享下:
线程池回收线程的大致流程:
1、调用线程池的 shutdown() 方法或者 shutdownNow() 方法;
2、主线程修改线程池的状态:
shutdown() -> SHUTDOWN
shutdownNow() -> STOP
3、中断线程池中所有线程;中断一些阻塞的线程,让线程正常结束执行;
4、每个 work 线程在获取任务的时候 getTask(), 校验当前线程池的状态,如果线程池状态已经被修改为非 RUNNING 状态,时机合适,work 线程退出;
详情请看下面的源码注释:
1. 线程池状态:
// 线程池初始状态private static final int RUNNING = -1 << COUNT_BITS;// 调用 shutdown() 方法后线程池状态private static final int SHUTDOWN = 0 << COUNT_BITS;// 调用 shutdownNow() 方法后线程池状态private static final int STOP = 1 << COUNT_BITS;// 在 TERMINATED 前的一个过渡状态,用处不多private static final int TIDYING = 2 << COUNT_BITS;// 线程池终止状态private static final int TERMINATED = 3 << COUNT_BITS;复制代码
2.1 shutdown 方法
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 校验调用 shutdown() 线程是否可以操作此方法,校验被终止任务的线程(works)是否可以被终止 checkShutdownAccess(); // CAS 修改线程池运行状态为:SHUTDOWN advanceRunState(SHUTDOWN); // 中断所有工作线程 interruptIdleWorkers(); // hook for ScheduledThreadPoolExecutor onShutdown(); } finally { mainLock.unlock(); } // 尝试中断线程池 tryTerminate();}复制代码
2.2 shutdownNow
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 校验调用 shutdownNow() 线程是否可以操作此方法,校验被终止任务的线程(works)是否可以被终止 checkShutdownAccess(); // CAS 修改线程池运行状态为:STOP advanceRunState(STOP); // 中断所有工作线程 interruptWorkers(); // 移除队列中的任务,并返回被移除的任务 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks;}复制代码
3. interruptIdleWorkers
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 遍历 works 工作线程集合,中断每个阻塞获取任务的线程,让这些线程执行完当前逻辑,退出 run() 方法 for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); }}复制代码
4. tryTerminate()
final void tryTerminate() { for (;;) { int c = ctl.get(); // 线程池 RUNNING 退出 // 线程池 TIDYING 退出 // 线程池 SHUTDOWN 并且 队列不为空(还有没处理完的任务)退出 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 还有活跃的线程,中断一个 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; }
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }复制代码
5.Work 类:工作线程
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ private static final long serialVersionUID = 6138294804551838833L;
/** 线程池里的工作线程,该线程在创建 Work 对象的时候传入 */ final Thread thread; /** 工作线程创建好后,执行的第一个任务 */ Runnable firstTask; /** 工作线程执行任务数量 */ volatile long completedTasks;
/** * 创建一个Work线程,指定第一个被执行的任务 */ Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
/** work 线程启动后,执行run方法,直到线程退出 */ public void run() { runWorker(this); }}复制代码
6. runWorker
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // task 默认为创建work时候指定的第一个任务 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { /** * 1、第一个任务不为 null,先执行第一个任务,往后循环的从任务队列里取任务执行 * 2、只有 getTask() 返回null的时候,才会退出循环 */ while (task != null || (task = getTask()) != null) { w.lock(); /** double check * 防止线程池已经被设置成 STOP 状态后,部分线程响应中断失败, * 这个地方再重新校验并设置下当前线程的中断状态 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 默认没有实现,可以在子类中扩展 beforeExecute(wt, task); Throwable thrown = null; // 执行任务 task.run(); // 省略部分 try-catch 代码 } finally { task = null; // 当前线程执行任务数累计 w.completedTasks++; w.unlock(); } } // 当 getTask() 为null的时候可以走到这一步 completedAbruptly = false; } finally { // getTask() 为 null,任务被执行完了,线程退出 processWorkerExit(w, completedAbruptly); }复制代码
7. getTask
private Runnable getTask() { // 记录上次从任务队列获取任务是否超时 boolean timedOut = false; // 循环遍历,直到获取到任务或者被中断 for (;;) { // 线程池线程数量 int c = ctl.get(); // 线程池状态 int rs = runStateOf(c); // 如果线程池状态:RUNNING(-1)|SHUTDOWN(0)|STOP(1)|TIDYING(2)|TERMINATED(3) // 两种情况: // 1、如果是 STOP 以后状态(不用管任务队列里是否有任务未执行完),线程总数减1,立即返回 null; // 2、在上面的情况之后,如果是 SHUTDOWN 以后的状态 && 任务队列为空(因为要等任务执行完),线程总数减1,并立即返回 null; if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); /** * 1、allowCoreThreadTimeOut:是否让核心线程空闲回收,默认false * 2、当前线程池中线程数是否大于核心线程数 */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /** * 在线程池中'还有线程'或者'任务队列没有任务'的情况下: * 1、如果线程池中的线程数,大于设置的最大线程数,必须停止当前线程; * 2、在当前线程获取任务超时(说明队列里没有任务可以被执行): * (1)而核心线程允许超时回收,则回收当前线程(因为核心线程和线程池中其他线程没有特殊的标记, * 回收了就回收了,如果下次线程池中线程不足,再新建一个线程扮演核心线程的角色即可); * (2) 当前线程总数超过了核心线程数,而现在又没有任务需要执行,自然回收当前线程; */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { /** * 1、如果允许核心线程空闲退出 或者 当前线程数大于核心线程数的情况下: * poll() 设置获取任务的阻塞时间,超时后,返回 null,当循环执行到上面的两处校验处,返回 null 退出; * 2、take()阻塞式获取任务,知道任务队列有可被执行任务为止; */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 超时后仍未获取到任务 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }}复制代码
8. processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 统计每个线程完成的任务数 completedTaskCount += w.completedTasks; // 从线程池(set)中移除当前线程 workers.remove(w); } finally { mainLock.unlock(); } // 修改线程池中断状态 tryTerminate();
int c = ctl.get(); /** * 如果当前线程池的状态不是 STOP,说明当前状态是 RUNNING 或者 SHUTDOWN, * 这个时候要保证线程池内有必要的线程去执行队列的任务 */ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果当前线程数为0,但是任务队列里又有任务需要被执行,必须调整线程数 if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } // 给线程池新增一个线程,保证任务被执行完 addWorker(null, false); }}复制代码
划线
评论
复制
发布于: 2021 年 11 月 07 日阅读数: 13
new life
关注
还未添加个人签名 2019.03.04 加入
还未添加个人简介











评论