写点什么

ThreadPoolExecutor 源码解读(三)如何优雅的关闭线程池(shutdown、shutdownNow、awaitTermination)

用户头像
徐同学呀
关注
发布于: 2021 年 04 月 17 日

一、前言

学会了如何提交任务,还需要知道如何正确的关闭线程池。当关闭一个线程池时,有的工作线程还正在执行任务,有的调用者正在向线程池提交任务,并且工作队列中可能还有未执行的任务。因此,关闭过程不可能是瞬时的,而是一个平滑过渡的过程。

二、shutdown()将 runState 流转为 SHUTDOWN

shutdown()主要做了 4 步:


  1. 检查是否有中断线程池的权限。

  2. 自旋CAS设置runStatusSHUTDOWN

  3. 中断空闲线程。

  4. 尝试终止线程池。


public void shutdown() {    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        //1.检查是否有Shutdown权限        checkShutdownAccess();        //2.cas设置runStatus为SHUTDOWN        advanceRunState(SHUTDOWN);        //3.中断空闲线程        interruptIdleWorkers();        //空的钩子函数        onShutdown(); // hook for ScheduledThreadPoolExecutor    } finally {        mainLock.unlock();    }    //4.尝试终止线程池    tryTerminate();}
复制代码


检查权限和设置状态都很简单,shutdown是如何只中断空闲线程?为什么要调用tryTerminate()

1、shutdown 是如何只中断空闲线程的?

从源码中可以看出在中断线程前会尝试获取Worker的锁,如果获得锁,说明当前的Worker是空闲的,可以中断,这也验证了 Worker 执行任务代码时加锁,确保除了线程池销毁导致中断外,没有其他中断的设置。


private void interruptIdleWorkers(boolean onlyOne) {    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        for (Worker w : workers) {            //循环中断空闲worder            Thread t = w.thread;            //w.tryLock()尝试获取worker的锁,如果获得锁说明当前工作线程是空闲的            if (!t.isInterrupted() && w.tryLock()) {                try {                    t.interrupt();                } catch (SecurityException ignore) {                } finally {                    w.unlock();                }            }            if (onlyOne)                break;        }    } finally {        mainLock.unlock();    }}
复制代码

2、为什么要调用 tryTerminate()?

tryTerminate()不会强行终止线程池,当workerCount为 0,workerQueue为空时:


  1. 状态流转到TIDYING

  2. 然后调用钩子函数terminated()

  3. 状态从TIDYING 流转到TERMINATED

  4. 调用termination.sinaglAll(),通知前面阻塞在awaitTermination的所有调用者线程。


final void tryTerminate() {    for (;;) {        int c = ctl.get();        if (isRunning(c) ||            runStateAtLeast(c, TIDYING) ||            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))            return;        if (workerCountOf(c) != 0) { // Eligible to terminate            interruptIdleWorkers(ONLY_ONE);            return;        }        //1.当workQueue为空,workerCount为空时,cas流转状态为TIDYING        //2.并调用了一个空钩子函数terminated        //3.最终将状态流转为TERMINATED,并通知awaitTermination        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {                try {                    terminated();                } finally {                    ctl.set(ctlOf(TERMINATED, 0));                    termination.signalAll();                }                return;            }        } finally {            mainLock.unlock();        }        // else retry on failed CAS  自旋    }}
复制代码

3、termination.signalAll()唤醒的是什么?

awaitTermination()逻辑很简单,就是重复判断runState是否到达最终状态 TERMINATED,如果是直接返回 true,如果不是,调用termination.awaitNanos(nanos)阻塞一段时间,苏醒后再判断一次,如果runStateTERMINATED返回 true,否则返回 false。


/** * Wait condition to support awaitTermination */private final Condition termination = mainLock.newCondition();
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { ////判断当前的runstate是否大于等于TERMINATED if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; //如果不是TERMINATED,将等待nanos nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); }}
复制代码

三、shutdownNow()将 runState 流转为 STOP

shutdownNow()shutdown()相似,主要做了 5 步:


  1. 检查是否有中断线程池权限。

  2. 设置runStatusSTOP

  3. 中断所有工作线程。

  4. 清空工作队列。

  5. 尝试终止线程池。


public List<Runnable> shutdownNow() {    List<Runnable> tasks;    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        //1.检查是否有shutdown权限        checkShutdownAccess();        //2.设置runStatus为stop        advanceRunState(STOP);        //3.给worders发送终止信号        interruptWorkers();        //4.清空阻塞队列        tasks = drainQueue();    } finally {        mainLock.unlock();    }    //5.尝试终止线程池    tryTerminate();    return tasks;}
复制代码

四、shutdown()和 shutdownNow()的区别

shutdownNow()shutdown()不同之处在于:


  • shutdownNow()会终止所有工作线程,不管是空闲还是正在运行。

  • shutdownNow()会清空阻塞队列。


private void interruptWorkers() {    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        for (Worker w : workers)            w.interruptIfStarted();    } finally {        mainLock.unlock();    }}
复制代码


private List<Runnable> drainQueue() {    BlockingQueue<Runnable> q = workQueue;    ArrayList<Runnable> taskList = new ArrayList<Runnable>();    //删除所有元素并加入到taskList    q.drainTo(taskList);    if (!q.isEmpty()) {        //如果q此时又被加入了任务再将任务删除并加入taskList        for (Runnable r : q.toArray(new Runnable[0])) {            if (q.remove(r))                taskList.add(r);        }    }    return taskList;}
复制代码

五、关闭线程池的正确姿势

分析源码之后可知只调用shutdown()或者shutdownNow()是不够的,因为线程池并不一定立刻终止,还需要调用awaitTermination,循环检查runState是否到了最终状态TERMINATED


故正确关闭线程池的姿势如下:


threadPoolExecutor.shutdown();//threadPoolExecutor.shutdownNow();try {    boolean loop = true;    do {        loop = !threadPoolExecutor.awaitTermination(2, TimeUnit.SECONDS);    } while (loop);} catch (InterruptedException e) {    e.printStackTrace();}
复制代码

六、总结

  • shutdown()会中断空闲工作线程,不会中断正在执行任务的工作线程,也不会清空工作队列,会等待所有已提交的任务执行完,但是拒绝新提交的任务。

  • shutdownNow(),会中断所有工作线程,并清空工作队列,拒绝新提交的任务。

  • 关闭线程池,只调用shutdown()或者shutdownNow()是不够的,因为线程池并不一定立刻终止,还需要调用awaitTermination,循环检查runState是否到了最终状态TERMINATED


PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!

发布于: 2021 年 04 月 17 日阅读数: 25
用户头像

徐同学呀

关注

公众号:徐同学呀 2018.09.24 加入

专注于源码分析及Java底层架构开发领域。持续改进,坦诚合作!我是徐同学,愿与你共同进步!

评论

发布
暂无评论
ThreadPoolExecutor源码解读(三)如何优雅的关闭线程池(shutdown、shutdownNow、awaitTermination)