写点什么

ThreadPoolExecutor 源码解读(二)execute 提交任务,Worker 详解。如何执行任务?如何回收空闲线程?

用户头像
徐同学呀
关注
发布于: 2021 年 04 月 17 日

一、前言

了解了线程池基本属性的概念是远远不够的,还需要知道每一个属性在源码中的体现,比如提交任务的过程中是如何将核心线程数、工作队列、最大线程数以及拒绝策略等连起来的?工作线程是如何执行任务代码的?线程池是如何回收空闲线程的?


默认情况下,刚初始化好的线程池是没有任何存活的线程的,等到有任务提交才创建线程执行任务。如果实际使用中希望线程池初始时尽快执行任务,可以调用prestartAllCoreThreads或者prestartCoreThread方法,预先在线程池启动几个工作线程,等待任务提交并执行。


//循环启动corePoolSize个工作线程public int prestartAllCoreThreads() {    int n = 0;    while (addWorker(null, true))        ++n;    return n;}//在corePoolSize范围内启动一个工作线程//个人认为无需在外面判断workerCountOf(ctl.get()) < corePoolSize//因为addWorker里面会判断线程数是否满足corePoolSizepublic boolean prestartCoreThread() {    return workerCountOf(ctl.get()) < corePoolSize &&        addWorker(null, true);}
复制代码

二、execute()提交任务

线程池是一个生产者消费者模式,execute()提交任务是生产者,工作线程从工作队列拉取任务执行是消费者。execute()代码逻辑主要分为 4 步:


  1. 如果工作线程数小于corePoolSize,则直接启动一个新的工作线程。

  2. 如果工作线程数大于等于corePoolSize,则将任务加入workQueue阻塞队列。

  3. 阻塞队列队列装满且正在运行的线程数小于maximumPoolSize,则创建一个新的工作线程。

  4. 当正在运行的线程数大于等于maximumPoolSize,将调用拒绝策略。


public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    int c = ctl.get();    if (workerCountOf(c) < corePoolSize) {        //1、worker数小于corePoolSize,直接创建worker线程        if (addWorker(command, true))            //创建成功,返回            return;        c = ctl.get();    }    //2、线程数大于等于corePoolSize,任务加入workQueue阻塞队列    //2.1.判断线程池状态是否为running,是并加入阻塞队列    if (isRunning(c) && workQueue.offer(command)) {        //2.2.再次检查线程池状态是否running        int recheck = ctl.get();        if (! isRunning(recheck) && remove(command))            //2.2.1 线程池不在running态,将任务从阻塞队列删除,调用拒绝策略            reject(command);        //若线程数为0则启动一个空线程从workQueue中取任务        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    //不是running或者任务加入阻塞队列失败(阻塞队列满了)    //3、判断maximumPoolSize,阻塞队列已满且小于maximumPoolSize,创建新的worker    else if (!addWorker(command, false))        //4、大于等于maximumPoolSize,拒绝策略        reject(command);}
复制代码


当线程数大于等于corePoolSize,任务被加入workQueue时,对线程池的状态进行了double check,这是有必要的:


  1. 在将任务加入workQueue前判断线程池是否是RUNNING,不是则不加入workQueue

  2. 任务加入workQueue后还需要再判断一次线程池的状态是否是RUNNING,因为在这个过程中可能线程池死亡或者外部调用了shundown() 或者 shutdownNow()使得线程池正在销毁,此时需要删除刚加入工作队列的任务,并触发拒绝策略。


当任务加入workQueue后判断此时工作线程数为 0,则启动一个空工作线程消费workQueue中的任务。

三、addWorker() 创建并启动工作线程

execute()的核心在addWorker(Runnable firstTask, boolean core),它是如何创建并启动一个工作线程的呢?addWorker第二个参数为 true 时线程数与corePoolSize比较,false 时线程数与maximumPoolSize比较。


addWorker代码很长,有两个大的 for 循环,逻辑主要涉及 2 步:


  • 第一个 for 循环:自旋 CAS 操作workerCount加 1。

  • 第二个 for 循环:创建并启动新 Worker 线程。


worderCount加 1 和创建 worker 时都反复判断线程池当前的运行状态,是为了当线程池调用了shutdown()或者shutdownNow()时做出相应的措施。


private boolean addWorker(Runnable firstTask, boolean core) {    //1、这个循环主要是为了自旋 cas  workerCount+1,成功之后就创建worder    // 因为代码块没有加锁,且是有可能多个线程同时操作,所以采用了cas乐观锁的方式。    retry:    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);
// Check if queue empty only if necessary. //线程池状态 大于等于SHUTDOWN(不是running),且非(SHUTDOWN 且task为null且阻塞队列不为空) if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;
for (;;) { int wc = workerCountOf(c); //worker数>=CAPACITY 或者worker数>=corePoolSize or maximumPoolSize 直接返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //cas workercount+1 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
//2、创建worker boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //1、new Worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //1.1、这里需要加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //当获取锁后再次检查 线程池状态 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //工作线程已经是startable if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //2、工作线程还未启动,加入workers(HashSet) workers.add(w); int s = workers.size(); if (s > largestPoolSize) //2.2、实时修改largestPoolSize = s largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //启动工作线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) //worder线程启动失败后的一些措施 //1、从workers删除worder //2、worderCount递减 //3、尝试终止线程池 addWorkerFailed(w); } return workerStarted;}
复制代码


第一个 for 循环,workerCount自旋 CAS 加 1 的过程中判断线程数是否满足corePoolSize或者maximumPoolSizeworkerCount加 1 成功结束外围循环,也有可能在 CAS 的过程中workerCount改变导致失败则重新判断、自旋重试。


第二个 for 循环,创建Worker工作线程,并启动,启动的过程中需要加锁,因为启动完 Worker,还需要将其加入到workers中,并将workers.size()赋值给largestPoolSize;如果Worker启动失败,则需要一些措施:


  1. workers删除Worder

  2. worderCount 自旋 CAS 递减。

  3. 如果线程池正处于销毁的过程,则尝试自旋终止线程池。


private void addWorkerFailed(Worker w) {    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        if (w != null)            //1、从workers删除worder            workers.remove(w);        //2、worderCount cas自旋递减        decrementWorkerCount();        //3、如果线程池正处于销毁的过程,则尝试自旋终止线程池        tryTerminate();    } finally {        mainLock.unlock();    }}
复制代码

四、Worker 类是一把锁也是一个线程

addWorker中创建的工作线程是Worker对象,它是ThreadPoolExecutor的内部类,继承于AbstractQueuedSynchronizer,相当于是一把锁,实现了Runnable接口,又是一个线程。


Worker有两个核心的成员变量threadfirstTask。因为Worker本身是一个Runnable线程,初始化时会被ThreadFactory包装创建为一个Thread赋值给threadaddWorker方法中直接调用thread.start()启动Worker线程;firstTask是提交进来的任务,Worker直接调用firstTask的 run()函数,执行任务代码。


private final class Worker    extends AbstractQueuedSynchronizer    implements Runnable{    //Worker是不可能被序列化的,但是为了禁止javac告警而提供serialVersionUID    private static final long serialVersionUID = 6138294804551838833L;    //这个线程非常的重要 Worker就是运行在其中的    final Thread thread;    //任务线程,并不会当做线程去start而是直接调用run()    Runnable firstTask;    //数据统计,可以忽略    volatile long completedTasks;
/** * Creates with given first task and thread from ThreadFactory. */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //默认ThreadFactory,创建工作线程 this.thread = getThreadFactory().newThread(this); }}
复制代码

1、runWorker 执行任务

Worker线程启动后会自动条用其run函数:


Worker刚 start 时,直接拿Worker的成员变量firstTask来执行任务代码,当firstTask为空时,从工作队列中循环取出task执行,而工作队列为空则阻塞等待。


runWorker()在代码中提供了两个空函数beforeExecuteafterExecute,一般称之为钩子函数,用户可以自行继承ThreadPoolExecutor时,将其重写,加上一些业务逻辑。


public void run() {    runWorker(this);}final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            while (task != null || (task = getTask()) != null) {                w.lock();                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    //这里是一个空函数,使用者可以自行继承ThreadPoolExecutor 并重写beforeExecute                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        //直接调用了run函数                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                        //这也是一个空函数,使用者可以自行继承ThreadPoolExecutor 并重写afterExecute                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            //如果从阻塞队列取不到任务,将会让当前的工作线程退出            processWorkerExit(w, completedAbruptly);        }}
复制代码


runWorker()源码中显而易见,直接调用的是Runnable任务taskrun,并且try-catch捕获了异常,却只是上抛并没有处理。如果提交的Runnable任务run方法中有异常,用户没有自行捕获,Worker捕获了,则会结束循环,并会将当前worker线程销毁,影响线程池的正常使用,所以强制要求Runnable任务run方法中任务代码try-catch,防患于未然。


Worker是一把锁,在runWorker()循环执行任务时会上锁,其目的是确保Worker线程,除了线程池销毁导致中断外,没有其他中断的设置。在调用shutdown()关闭线程池时,会中断空闲的工作线程,就是通过遍历workers,判断是否能获取Worker线程的锁,而定义Worker线程是否为空闲。源码英文注释是这样描述的:


Before running any task, the lock is acquired to prevent other pool interrupts

while the task is executing, and then we ensure that unless pool is stopping,

this thread does not have its interrupt set.

2、getTask()决定 Worker 的生死命运

getTask()是从workerQueue中获取任务,涉及了如何从队列中取任务,以及决定了工作线程Worker的生死命运。


getTask()返回 null 是一件非常恐怖的事情,因为runWorker会因为getTask()返回 null 而结束循环,从而执行销毁退出Worker线程的逻辑,也就是回收Worker线程。getTest()返回 null 的情况:


  1. getTest()本身是个 for 循环,会不断判断线程池的运行状态,如果线程池正处于STOP状态以上或者处于SHUTDOWN状态且workQueue为空,会将workerCount递减为 0,并返回 null。

  2. 当工作线程数大于maximumPoolSize且工作线程大于 1 时,workerCount递减 1,并返回 null。意图是回收该 Worker 线程,使线程数保持在maximumPoolSize数量内。

  3. 具有超时效果timed = true,大于corePoolSize的工作线程都具有超时效果,而设置 allowCoreThreadTimeOut 为 true,小于corePoolSize的工作线程也具有超时效果,此时超时则返回 null。

  4. 去任务的过程中 poll 超时返回 null。


private Runnable getTask() {    boolean timedOut = false; // Did the last poll() time out?
for (;;) { int c = ctl.get(); int rs = runStateOf(c);
// Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
int wc = workerCountOf(c);
// Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //大于maximumPoolSize时,将不从阻塞队列中取 //大于corePoolSize 或者 allowCoreThreadTimeOut 为true,且超时了不从阻塞队列中取 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
try { //通过timed判断是用poll还是take, Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }}
复制代码


通过timed判断如何从队列中取任务:


  1. timed=true使用具有时间的poll,如果取不到会阻塞一段时间,超时还未取到则返回 null。

  2. time=false使用 take(),取不到任务会一直阻塞。


一般情况下allowCoreThreadTimeOut=false,只有超过 corePoolSize 的线程具有超时效果,受 keepAliveTime 的支配。如果使用者将allowCoreThreadTimeOut设置为 true,则所有的工作线程都会因为空闲超时而被回收。

3、processWorkerExit 因 runWorker 循环结束而回收 Worker

processWorkerExitrunWorker循环结束而回收Worker,主要做了如下几件事:


  1. 将该Worker线程的任务完成数统计加给全局completedTaskCount

  2. workers删除worker

  3. 如果线程池处于销毁状态,尝试继续推进销毁状态。

  4. 可能会启动一个空的 Worker 线程。


private void processWorkerExit(Worker w, boolean completedAbruptly) {    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted        decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //统计完成的任务数 completedTaskCount += w.completedTasks; //从workers删除worker workers.remove(w); } finally { mainLock.unlock(); } //重试终止,如果线程池状态为running 直接返回 tryTerminate();
int c = ctl.get(); if (runStateLessThan(c, STOP)) { //stop以内,completedAbruptly=false if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } //allowCoreThreadTimeOut=true,且没有正在运行的工作线程 //或者allowCoreThreadTimeOut=false,工作线程数没有达到corePoolSize //则创建一个空工作线程作为弥补。 addWorker(null, false); }}
复制代码


runWorker中局部变量completedAbruptly首先设置为 true,循环结束则设置completedAbruptly为 false,但是若是因为任务代码异常,则不会执行到,所以不会再销毁的当前Worker的时候再启动一个空Worker弥补上。


什么时候会再启动一个空Worker呢?当线程池的状态小于 STOP,也就是 RUNNING 或者 SHUTDOWN,且不是因为任务代码异常结束循环时,继续判断是否需要启动一个空Worker线程作为弥补:


  1. allowCoreThreadTimeOut=true,且没有正在运行的工作线程且workQueue不为空时,启动一个空Worker继续消费workQueue中的任务。

  2. allowCoreThreadTimeOut=false,工作线程数没有达到corePoolSize,则创建一个空工作线程作为弥补。

五、总结

  1. 可以调用prestartAllCoreThreads或者prestartCoreThread方法预先创建几个工作线程池,等待任务提交并执行。

  2. 线程池中通过创建并启动Worker线程,执行用户提交的任务。

  3. Worker继承于AbstractQueuedSynchronizer,是一把锁,实现了Runnable接口,又是一个线程。

  4. 用户提交的Runnable任务task,启动Worker后自动执行Workerrun(),并直接调用taskrun()任务代码。

  5. 用户提交的任务,需要自行try-catch,否则一旦出现异常,被Worker捕获,但不处理直接上报,使得Worker线程被回收,影响线程池的正常使用。

  6. 工作线程Worker的回收是在Worker循环从workerQueue获取任务时,超过keepAliveTime依然没有获取到任务,则返回 null,导致Worker循环中断,最后执行Worker销毁退出。

  7. 设置allowCoreThreadTimeOut为 true,所有的线程都会因为超时而被回收。

  8. Worker是一把锁,在runWorker()循环执行任务时会上锁,其目的是确保Worker线程,除了线程池销毁导致中断外,没有其他中断的设置。


PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!

发布于: 2021 年 04 月 17 日阅读数: 10
用户头像

徐同学呀

关注

公众号:徐同学呀 2018.09.24 加入

专注于源码分析及Java底层架构开发领域。持续改进,坦诚合作!我是徐同学,愿与你共同进步!

评论

发布
暂无评论
ThreadPoolExecutor源码解读(二)execute提交任务,Worker详解。如何执行任务?如何回收空闲线程?