写点什么

同事有话说:ThreadPoolExecutor 是怎么回收线程的

用户头像
云流
关注
发布于: 2021 年 01 月 27 日

ThreadPoolExecutor 总览


先来点必要的知识储备,否则看后面的内容会可能会觉得一脸懵。


  • ThreadPoolExecutor 使用ctl变量存储线程池状态,高 3 位表示运行状态runState,低 29 位表示工作线程数量workerCount


image

  • 运行状态runState共有以下几种值:RUNNING:接收新的任务提交,处理任务缓冲队列中的任务。SHUTDOWN:不接收新的任务提交,但会处理任务缓冲队列中的任务。STOP:不接收新的任务提交,不会处理任务缓冲队列中的任务,并中断执行过程中的任务。TIDYING:所有的任务都已经被终止,工作者线程数为 0,线程过渡到TIDYING状态会调用terminated()钩子函数。TERMINATEDterminated()方法执行完成。

  • runState状态间的流转如下图:


image

  • ThreadPoolExecutor 将提交的任务与工作线程两者解耦,并不直接关联,其内部构建了一个生产者-消费者模型:任务的提交方就是生产者,工作线程扮演消费者的角色,其负责任务的执行。整个 ThreadPoolExecutor 的运行流程如下图所示:(图来自 Java线程池实现原理及其在美团业务中的实践 )


image

任务如何被提交:ThreadPoolExecutor 的 execute()方法


public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    // ctl变量存储着当前线程池的运行状态runState和总线程数量workerCount    int c = ctl.get();    // 当前工作者线程数小于设置的corePoolSize值    if (workerCountOf(c) < corePoolSize) {        if (addWorker(command, true))            return;        c = ctl.get();    }    // 当前线程池状态runState为RUNNING,且成功提交任务到任务缓冲队列中    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();        // 重新检查线程池状态,必要时对刚提交到队列中的任务执行回退即remove操作        if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            // 如果当前可用的工作者线程数为0,创建新的工作者线程            addWorker(null, false);    }    // 如果工作者线程数大于corePoolSize,且任务缓冲队列workQueue已满,则创建新的工作者线程,直到数量到达maximumPoolSize上限    else if (!addWorker(command, false))        reject(command);}复制代码
复制代码

可对照着下面的流程图来阅读上面的execute()方法源码:


image


execute()方法中出现了一个比较重要的方法addWorker(),让我们点进去看看这个方法做了什么事情:


/*** @param firstTask 提交的待执行的任务,可能为null,此时代表应该创建新的工作线程* 处理任务缓冲队列中待处理的任务。* @param core 布尔值,为true代表使用corePoolSize作为边界,为false则使用maximumPoolSize/*private boolean addWorker(Runnable firstTask, boolean core) {    retry:    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);
// 检查当前线程池状态 // 需要注意的是,当runState==SHUTDOWN时,此时线程池不允许提交新的任务,但需要把任务缓冲队列中的任务处理完,所以如果任务缓冲队列非空且提交的firstTask参数为null,代表应该创建新的工作线程处理队列中待完成的任务,此时不应该直接return if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;
for (;;) { int wc = workerCountOf(c); // CAPACITY是理论上工作线程数量的最大值(2^29) if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS非阻塞地更新ThreadPoolExecutor本身维护的工作线程数量,更新成功则跳出循环 if (compareAndIncrementWorkerCount(c)) break retry; // 重新获取一次线程池状态,如果运行状态有变更,则回到retry块的开头重新开始 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // 如果重新获取的线程池运行状态没有变更,证明只是CAS更新失败,则只需要重新执行CAS更新工作线程数量的逻辑即可。 // else CAS failed due to workerCount change; retry inner loop } }
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());
// 正如前面提到的,需要addWorker时,线程池状态可能为RUNNING,也可能为SHUTDOWN if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 将新创建的worker添加到全局维护的worker集合中(workers其实是一个HashSet) workers.add(w); int s = workers.size(); if (s > largestPoolSize) // 跟踪最大的池大小 largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 这里会执行worker的run()方法 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;}复制代码
复制代码

addWorker()方法看起来长,但其实它完成的事情并不多,就两件事:


  • 检测runState状态,CAS 更新workerCount

  • 在 ReentrantLock 独占保护下,新建 Worker,再次检测runState状态,将新建的 Worker 线程安全地添加到workers(一个 HashSet 集合)并执行,同时会维护largestPoolSize变量(跟踪最大的池大小)。


同样可以对照着下面给出的流程图来阅读上面的addWorker()源码:


image


提交的任务如何被执行:ThreadPoolExecutor 的 runWorker()方法


Worker 是 ThreadPoolExecutor 内部一个私有类,其定义如下(省略了部分代码):


private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable{    /** 该Worker所在的线程,即工作线程本体 */    final Thread thread;    /** 初始化时带入的预备执行的任务,可能为null */    Runnable firstTask;    /** 记录着这个Worker完成的总任务数 */    volatile long completedTasks;
/** 构造函数,执行一些初始化操作 */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
/** 具体的运行逻辑委托给外部的即ThreadPoolExecutor的runWorker()方法执行 */ public void run() { runWorker(this); }
// Lock methods // Worker通过继承AQS类自定义了一个独占锁,重写了相关方法,这里省略不给出 .......}复制代码
复制代码

回看前面的addWorker()方法,当新创建的 Worker 被成功放进 Worker 集合后,其所在线程的start()方法将被调用,从而 Worker 对象的run()方法会被调用,而run()里面是调用了runWorker(),所以让我们来重点看下这个方法:


final void runWorker(Worker w) {    Thread wt = Thread.currentThread();    Runnable task = w.firstTask;    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        // 如果该Worker被创建时的firstTask不为null,则首先直接执行该task任务        // 否则调用getTask()方法从任务缓冲队列中获取任务并执行        while (task != null || (task = getTask()) != null) {            // 获取Worker自定义的独占锁,确保该Worker在执行任务过程中不会被外部中断            w.lock();            // 如果检测到当前线程池状态已经进入到STOP,则需要进一步确认当前线程是否已经被中断            // 如果没有被中断,则需要执行中断操作。            if ((runStateAtLeast(ctl.get(), STOP) ||                 (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                wt.interrupt();            try {                beforeExecute(wt, task);                Throwable thrown = null;                try {                    // 万事俱备,执行任务                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    afterExecute(task, thrown);                }            } finally {                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        // 对于跳出while循环,即不再从任务缓冲队列中获取任务并执行的Worker,需要执行回收逻辑        // 这里也就是本文关心的ThreadPoolExecutor回收线程的地方        processWorkerExit(w, completedAbruptly);    }}复制代码
复制代码

runWorker()的执行过程是:


  1. while 循环不断地通过getTask()方法获取任务。

  2. getTask()方法从阻塞队列中取任务。

  3. 如果线程池已经进入了STOP状态,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。

  4. 执行任务。

  5. 如果getTask()结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。


可对照着下面的流程图来阅读上面给出的runWorker()源码:


image


ThreadPoolExecutor 的线程如何被回收:processWorkerExit()


由前一小节的内容已经知道processWorkerExit()方法会在runWorker()方法中被执行,可以先看一下它做了什么事情:


private void processWorkerExit(Worker w, boolean completedAbruptly) {    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted        decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 实际这里就是回收线程的主要操作了,移除线程池对该线程的引用,使其可以被JVM正常地回收 workers.remove(w); } finally { mainLock.unlock(); }
tryTerminate(); // 由于引起线程回收的可能性有很多,线程池还要判断是什么引发了这次回收, // 是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程,于是就有了下面这部分逻辑 int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); }}复制代码
复制代码

由此可见,线程池中线程的回收依赖 JVM 自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被 JVM 回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。


ThreadPoolExecutor 线程池什么时候回收线程?


从前面的内容可以知道一个 Worker 在运行了runWorker()方法后,会在getTask()方法返回null,即从任务缓冲队列中获取不到任务时跳出循环,执行processWorkerExit()回收当前 Worker 线程,所以 ThreadPoolExecutor 什么时候回收线程,其实就是看getTask()什么时候返回null,那让我们看下getTask()内部发生了什么:


private Runnable getTask() {    boolean timedOut = false;
for (;;) { int c = ctl.get(); int rs = runStateOf(c);
// 检测线程池是否正在终止,若此时runState进入了STOP或者任务缓冲队列未空,则减少工作线程数量并返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
int wc = workerCountOf(c);
// 当允许核心线程超时或者工作线程数大于设置的核心线程数量上限时,timed被设为true // 后文的分析我们基于allowCoreThreadTimeOut=false的前提,即核心线程不会超时 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
try { // 从任务缓冲队列中获取任务 // 根据timed的值来确定调用限时获取的poll()方法还是阻塞获取的take()方法 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) // 成功从任务缓冲队列中获取到任务则直接返回 return r; timedOut = true; } catch (InterruptedException retry) { // 发生中断就重置timedOut,并重新一轮循环 timedOut = false; } }}复制代码
复制代码

getTask()的主要逻辑是,在一个无限制 for 循环中,从workQueue阻塞队列中取出待执行的任务,能正常获取到则将任务返回,否则持续循环获取,直至获取成功,除非 线程池状态进入 stopping 或者 workQueue 为空 ,或者 工作者线程数超过了最大池大小 maximumPoolSize ,或者 获取任务超时 ,getTask()方法才会跳出 for 循环并返回。


可对照下面的流程图来阅读getTask()的源码:


image


回到我们关心的重点,getTask()什么时候会返回null?流程图中其实已经给出来了,主要是这么两处地方:


  1. 第一处是线程池是否已经停止,对应源码


image

当 ThreadPoolExecutor 的shutdown()方法被调用,线程池进入SHUTDOWN状态,这里需要分两种情况:


  • 调用shutdown()时,全部任务已经被执行完成,此时getTask()阻塞在 workQueue.take()


shutdown()为例,其内部调用栈上会调用interruptIdleWorkers(false);,而interruptIdleWorkers()内部长这个样子:


image


就是说shutdown()会中断线程池中持有的每个空闲的 Worker 线程(getTask()方法并不持有 Worker 对象的自定义独占锁,不信你往前翻下源码🌚 ),getTask()方法就会从workQueue.take()的阻塞中返回(因为线程池的阻塞队列是支持响应中断的)并正常进入到下一轮 for 迭代,然后程序会走到第一处“线程池是否已经停止”的条件判断,满足条件并返回 null。


  • 调用shutdown()时,任务缓冲队列中还有任务未被执行,此时getTask()在正常的循环运行中


假设getTask()走到了workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),它正在从任务缓冲队列中获取任务,如果 shutdown()方法被调用,该 Worker 线程被中断,由于workQueue.poll()方法响应中断,随意它会立即返回 null,getTask()会继续进入到下一轮 for 迭代,这个时候它会重新走到第一处“线程池是否已经停止”的条件判断,这个时候条件并不满足因为workQueue并不为空,所以getTask()会正常运行下去,直到workQueue为空即剩余任务被处理完才会正常跳出循环返回 null。


这个第二种情况还存在一种非常特殊的情况需要拿出来单独讨论,就是调用shutdown()时任务缓冲队列中只剩下 2 个任务,但此时 Worker 线程还有 4 个,按照前面讨论的结果我们很容易想到,4 个 Worker 线程中会有 2 个 Worker 正常在getTask()的第一处“线程池是否已经停止”条件判断中退出返回null,那么就会留下 2 个 Worker 线程阻塞在getTask()workQueue.take(),而shutdown()仅会对每一个符合条件 Worker 发出一次中断信号,那么这 2 个阻塞在workQueue.take()的 Worker 是怎么正常退出循环并被回收的呢?这个就要看我们前面提到的负责回收 Worker 线程的processWorkerExit()方法了,其内部会调用tryTerminate()方法,而这个tryTerminate()内部会去调用interruptIdleWorkers(true)方法,当传入true时,该方法仅会对当前线程池持有的众多空闲 Worker 中的一个发起中断,也就是说前面提到的 2 个阻塞的 Worker 中有一个会被中断信号唤醒并正常退出循环,而这个被唤醒的 Worker 在被回收过程中(processWorkerExit()被调用)又会发新的中断信号给另外一个阻塞的 Worker,这就跟多米诺骨牌一样,一系列被阻塞的 Worker 线程会由于其中某一个 Worker 被中断唤醒正常回收从而逐一发起中断信号唤醒剩余的阻塞的 Worker 线程,最终做到全部的 Worker 线程都被回收。


  1. 第二处是当前工作线程数是否过多,对应源码


image

这个就好解释了,这种对应的场景是shutdown()未被调用,线程池处于RUNNING状态,workQueue中的任务已经被全部取出并执行完成。这个场景下,线程池会将 Worker 线程数降低到corePoolSize大小(假设 allowCoreThreadTimeOut=false),timedtimedOut都会变为true,而由于workQueue为空所以第二处“当前工作线程数是否过多”的条件判断会判定为true,进而退出循环返回null


至此,对于"ThreadPoolExecutor 线程池什么时候回收线程"这个问题也就解释清楚了。


后记


作者:zyx94

链接:https://juejin.cn/post/6922069411981426702


用户头像

云流

关注

还未添加个人签名 2020.09.02 加入

还未添加个人简介

评论

发布
暂无评论
同事有话说:ThreadPoolExecutor是怎么回收线程的