写点什么

☕【Java 技术指南】「技术盲区」看看线程池是如何回收和维持运作线程的核心技术体系

作者:浩宇天尚
  • 2021 年 11 月 18 日
  • 本文字数:5461 字

    阅读完需:约 18 分钟

☕【Java技术指南】「技术盲区」看看线程池是如何回收和维持运作线程的核心技术体系

线程池的难点和重点

让我们一起来看看线程池是如何回收和维持运作线程的核心技术体系。

线程池的前提和介绍

一般来讲 JDK 线程池就是 ThreadPoolExecutor,大多数会对线程池执行任务的流程有了大体了解,实际上这个流程也十分通俗易懂,就不再赘述了,我之前的文章也介绍过了相关的技术点分析和介绍说明。

讲一讲线程池是如何回收线程的?

runWorker(Worker w)
复制代码

线程执行的基本流程

  1. 工作线程启动后,就进入 runWorker(Worker w) 方法。

  2. 内部是一个 while 循环,循环判断任务是否为空,若不为空,执行任务;

  3. 若取不到任务,或发生异常,退出循环,执行**processWorkerExit(w, completedAbruptly); **在这个方法里把工作线程移除掉。

读取任务的方式

主要有两种方式:一个是 firstTask,这个是工作线程第一次跑的时候执行的任务,最多只能执行一次,后面得从 getTask 方法里取任务。


getTask 是关键,在不考虑异常的场景下,返回,就表示退出循环,结束线程。


private Runnable getTask() {        boolean timedOut = false; // Did the last poll() time out?        for (;;) {            int c = ctl.get();            // Check if queue empty only if necessary.            if (runStateAtLeast(c, SHUTDOWN)                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {                decrementWorkerCount();                return null;            }            int wc = workerCountOf(c);            // Are workers subject to culling?            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;            if ((wc > maximumPoolSize || (timed && timedOut))                && (wc > 1 || workQueue.isEmpty())) {                if (compareAndDecrementWorkerCount(c))                    return null;                continue;            }            try {                Runnable r = timed ?                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                    workQueue.take();                if (r != null)                    return r;                timedOut = true;            } catch (InterruptedException retry) {                timedOut = false;            }        }    }
复制代码

重点关注的是 getTask 返回操作

一共有两种情况会返回


  • 第一种情况,线程池的状态已经是 STOP,TIDYING, TERMINATED,或者是 SHUTDOWN 且工作队列为空;


if(runStateAtLeast(c, SHUTDOWN)                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {                decrementWorkerCount();                return null;}
复制代码


  • 第二种情况,工作线程数已经大于最大线程数或当前工作线程已超时,且,还有其他工作线程或任务队列为空。


 if ((wc > maximumPoolSize || (timed && timedOut))                && (wc > 1 || workQueue.isEmpty())) {                if (compareAndDecrementWorkerCount(c))                    return null;                continue;}
复制代码

线程池回收工作线程

未调用 shutdown ,RUNNING 状态下全部任务执行完成的场景。


这种场景,会将工作线程的数量减少到核心线程数大小(如果本来就没有超过,则不需要回收)。

案例场景分析
  • 比如一个线程池,核心线程数为 4,最大线程数为 8。

  • 开始是 4 个工作线程,当任务把任务队列塞满,就得将工作线程增加到 8。

  • 当后面任务执行到差不多了,线程取不到任务了,就会回收到 4 个工作线程的状态(取决于 allowCoreThreadTimeOut 的值,这里讨论默认值 false 的情况,即核心线程不会超时。如果为 true,工作线程可以全部销毁)。

  • 可以先排除上面提到的条件 1,线程池的状态已经是 STOP,TIDYING, TERMINATED,或者是 SHUTDOWN 且工作队列为空。

  • 因为线程池一直是 RUNNING,这条判断永远是 false。在这个场景中,可以当条件 1 不存在。


下面分析取不出任务时线程是怎么运行的。


  1. 从任务队列取任务有两种方式,超时等待还是可以一直阻塞下去。决定因素是 timed 变量。该变量在前面赋值,如果当前线程数大于核心线程数,变量 timed 为 true, 否则为 false(当然是在:allowCoreThreadTimeOut 为 false 的情况)。


现在讨论的是 timed 为 true 的情况。keepAliveTime 一般不设置,默认值为 0,所以基本上可以认为是不阻塞,马上返回取任务的结果,在线程超时等待唤醒之后,发现取不出任务,timeOut 变为 true,进入下一次循环。


  1. 来到 1 的判断,线程池一直 RUNNING, 不进入代码块。

  2. 来到 2 的判断,这时任务队列为空,条件成立,CAS 减少线程数,若成功,返回,否则,重复 1。


注意,有可能多条线程同时通过 2 的判断,那会不会减少后线程的数量反而比预想的核心线程数少呢?


  • 比如当前线程数已经只有 5 条了,此时有两条线程同时唤醒,通过 2 的判断,同时减少数量,那剩下的线程数反而只有 3 条,和预期不一致。

  • 实际上是不会的,为了防止这种情况,compareAndDecrementWorkerCount(c) 用的是 CAS 方法,如果 CAS 失败就 continue,进入下一轮循环,重新判断。

  • 像上述例子,其中一条线程会 CAS 失败,然后重新进入循环,发现工作线程数已经只有 4 了,timed 为 false,这条线程就不会被销毁,可以一直阻塞了(workQueue.take)。

  • 从这里也可以看出,虽然有核心线程数,但线程并没有区分是核心还是非核心,并不是先创建的就是核心,超过核心线程数后创建的就是非核心,最终保留哪些线程,完全随机。

shutdown
  • 调用 shutdown ,全部任务执行完成的场景

  • 这种场景,无论是核心线程还是非核心线程,所有工作线程都会被销毁。

  • 在调用 shutdown 之后,会向所有的空闲工作线程发送中断信号。


public void shutdown() {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            checkShutdownAccess();            advanceRunState(SHUTDOWN);            interruptIdleWorkers();            onShutdown(); // hook for ScheduledThreadPoolExecutor        } finally {            mainLock.unlock();        }        tryTerminate();}
复制代码


最终传入 false,调用下面这个方法。


private void interruptIdleWorkers(boolean onlyOne) {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            for (Worker w : workers) {                Thread t = w.thread;                if (!t.isInterrupted() && w.tryLock()) {                    try {                        t.interrupt();                    } catch (SecurityException ignore) {                    } finally {                        w.unlock();                    }                }                if (onlyOne)                    break;            }        } finally {            mainLock.unlock();        }}
复制代码


  • 可以看出,在发出中断信号前,会判断是否已经中断,以及要获得工作线程的独占锁。

  • 发出中断信号的时候,工作线程要么在 getTask 里准备获取任务,要么在执行任务,那就得等它执行完当前任务才会发出,因为工作线程在执行任务的时候,也会工作线程加锁。

  • 工作线程执行完任务,又跑到 getTask 里面去了。

  • 所以我们只要看 getTask 里面怎么应对中断异常的就可以了。


工作线程在 getTask 里,有两种可能。


final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            while (task != null || (task = getTask()) != null) {                w.lock();                // If pool is stopping, ensure thread is interrupted;                // if not, ensure thread is not interrupted.  This                // requires a recheck in second case to deal with                // shutdownNow race while clearing interrupt                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    beforeExecute(wt, task);                    try {                        task.run();                        afterExecute(task, null);                    } catch (Throwable ex) {                        afterExecute(task, ex);                        throw ex;                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);        }    }
复制代码


  • 任务已全部完成,线程在阻塞等待。

  • 中断信号将其唤醒,从而进入下一轮循环。

  • 到达 1 处,符合条件,减少工作线程数量,并返回,由外层结束这条线程。

  • 这里的 decrementWorkerCount 是自旋式的,一定会减 1。

任务还没有完全执行完

调用 shutdown 之后,未执行完的任务要执行完毕,线程池才能结束。所以此时有可能线程还在工作。

分两个阶段讨论

阶段 1 任务较多,工作线程都能获得任务


分析一下收到中断信号后线程的表现。


  • 假设有线程 A,正通过 getTask 里获取任务。此时 A 被中断,在获取任务时,无论是 poll 还是 take,都会抛出中断异常。

  • 异常被捕获,重新进入下一轮循环,只要队列不为空,就可以继续取任务。


workQueue 是 BlockingQueue 类型,以常见的 LinkedBlockingQueue 和 ArrayBlockingQueue 为例,加锁时都是调用 lockInterruptibly,是响应中断的。


该方法又调用了 AQS 的 acquireInterruptibly(int arg)。


acquireInterruptibly(int arg),无论是在入口处判断中断异常,还是在 parkAndCheckInterrupt 方法阻塞,被中断唤醒并判断中断异常时,均使用了 Thread.interrupted。


这个方法会返回线程的中断状态,并把中断状态重置!也就是说,线程不再是中断状态了,这样在再次取任务时,就不会报错了。


因此,这对于正在准备取任务的线程,只是相当于浪费了一次循环,这可能是线程中断带来的副作用吧,当然,对整体的运行不影响。

任务刚好要执行完了

这时任务已经快取完了,比如有 4 条工作线程,只剩下 2 个任务,那就可能出现 2 条线程获得任务,2 条线程阻塞。


因为在获取任务前的判断,没有加锁,那么会不会出现,所有线程都通过了前面的校验,来到 workQueue 获取任务的地方,刚好任务队列已经空了,线程全部阻塞了呢?因为 shutdown 已经执行完毕,无法再向线程发出中断信号,从而线程一直在阻塞,无法被回收。


假设有 A,B,C,D 四条工作线程,同时通过了条件 1 和条件 2 的判断,来到取任务的地方。那么,工作队列至少还有一个任务,至少会有一条线程能取到任务。


假设 A,B 获得了任务,C,D 阻塞。


A, B 接下来的步骤是:


  • 任务执行完成后,再次 getTask,此时符合条件 1,返回,线程准备被回收。

  • processWorkerExit(Worker w, boolean completedAbruptly) 将线程回收。


回收就只是把线程干掉这么简单吗?来看看 processWorkerExit(Worker w, boolean completedAbruptly) 的方法。


可以看到,在里面除了 workers.remove(w) 移除线,还调用了 tryTerminate。


第一个判断条件没有一个子条件符合,跳过。第二个条件,工作线程还存在,那么随机中断一条空闲线程。


那么问题就来了,中断一条空闲线程,也没说是一定中断正在阻塞的线程啊。如果 A, B 同时退出,有没有可能出现 A 中断 B, B 中断 A,AB 互相中断,从而没有线程去中断唤醒阻塞的线程呢?


假设 A 能走到这里,说明 A 已经从工作线程的集合 workers 里面移除了(processWorkerExit(Worker w, boolean completedAbruptly) 在 tryTerminate()之前,已经将其移除)。那么 A 中断 B,B 来到这里中断,就不会在 workers 里面找到 A 了。


也就是说,退出的线程不能互相中断,我从集合中退出后,中断了你,你不能中断我,因为我已经退出集合,你只能中断别人。那么,即使有 N 个线程同时退出,至少在最后,也会有一条线程,会中断剩余的阻塞线程。


==阻塞的 C,D 中的任意一条被中断唤醒后,又会重复 step1 的动作,周而复始,直到所有阻塞线程都被中断,唤醒。


这也是为什么在 tryTerminate 里面,传入 false,只需要中断任意一条空闲线程的原因。

总结

ThreadPoolExecutor 回收工作线程,一条线程 getTask 返回,就会被回收。

两种场景。

  • 未调用 shutdown ,RUNNING 状态下全部任务执行完成的场景

  • 线程数量大于 corePoolSize,线程超时阻塞,超时唤醒后 CAS 减少工作线程数,如果 CAS 成功,返回,线程回收。

  • 否则进入下一次循环。当工作者线程数量小于等于 corePoolSize,就可以一直阻塞了。

  • 调用 shutdown ,全部任务执行完成的场景

  • shutdown 会向所有线程发出中断信号,这时有两种可能。

所有线程都在阻塞

中断唤醒,进入循环,都符合第一个 if 判断条件,都返回,所有线程回收。

任务还没有完全执行完

至少会有一条线程被回收。在 processWorkerExit(Worker w, boolean completedAbruptly)方法里会调用 tryTerminate,向任意空闲线程发出中断信号。所有被阻塞的线程,最终都会被一个个唤醒,回收。

发布于: 2021 年 11 月 18 日阅读数: 22
用户头像

浩宇天尚

关注

🏆 InfoQ写作平台-签约作者 🏆 2020.03.25 加入

【个人简介】酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“ 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、APM专题及微服务/分布式体系等

评论

发布
暂无评论
☕【Java技术指南】「技术盲区」看看线程池是如何回收和维持运作线程的核心技术体系