写点什么

ThreadPoolExecutor 线程销毁源码分析

作者:new life
  • 2021 年 11 月 07 日
  • 本文字数:4398 字

    阅读完需:约 14 分钟

ThreadPoolExecutor 线程销毁源码分析

线程池在开发中使用非常频繁,在面试中也是高频面试点,最近翻看源码,有一些心得分享下:

线程池回收线程的大致流程:

1、调用线程池的 shutdown() 方法或者 shutdownNow() 方法;

2、主线程修改线程池的状态:

shutdown() -> SHUTDOWN

shutdownNow() -> STOP

3、中断线程池中所有线程;中断一些阻塞的线程,让线程正常结束执行;

4、每个 work 线程在获取任务的时候 getTask(), 校验当前线程池的状态,如果线程池状态已经被修改为非 RUNNING 状态,时机合适,work 线程退出

详情请看下面的源码注释:

1. 线程池状态:

// 线程池初始状态private static final int RUNNING    = -1 << COUNT_BITS;// 调用 shutdown() 方法后线程池状态private static final int SHUTDOWN   =  0 << COUNT_BITS;// 调用 shutdownNow() 方法后线程池状态private static final int STOP       =  1 << COUNT_BITS;// 在 TERMINATED 前的一个过渡状态,用处不多private static final int TIDYING    =  2 << COUNT_BITS;// 线程池终止状态private static final int TERMINATED =  3 << COUNT_BITS;
复制代码

2.1 shutdown 方法

public void shutdown() {    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {      // 校验调用 shutdown() 线程是否可以操作此方法,校验被终止任务的线程(works)是否可以被终止      checkShutdownAccess();      // CAS 修改线程池运行状态为:SHUTDOWN       advanceRunState(SHUTDOWN);      // 中断所有工作线程      interruptIdleWorkers();      // hook for ScheduledThreadPoolExecutor      onShutdown();     } finally {      mainLock.unlock();    }  	// 尝试中断线程池    tryTerminate();}
复制代码

2.2 shutdownNow

public List<Runnable> shutdownNow() {    List<Runnable> tasks;    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {     // 校验调用 shutdownNow() 线程是否可以操作此方法,校验被终止任务的线程(works)是否可以被终止      checkShutdownAccess();     // CAS 修改线程池运行状态为:STOP       advanceRunState(STOP);      // 中断所有工作线程      interruptWorkers();      // 移除队列中的任务,并返回被移除的任务      tasks = drainQueue();    } finally {      mainLock.unlock();    }    tryTerminate();    return tasks;}
复制代码


3. interruptIdleWorkers

private void interruptIdleWorkers(boolean onlyOne) {  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    // 遍历 works 工作线程集合,中断每个阻塞获取任务的线程,让这些线程执行完当前逻辑,退出 run() 方法    for (Worker w : workers) {      Thread t = w.thread;      if (!t.isInterrupted() && w.tryLock()) {        try {          t.interrupt();        } catch (SecurityException ignore) {        } finally {          w.unlock();        }      }      if (onlyOne)        break;    }  } finally {    mainLock.unlock();  }}
复制代码


4. tryTerminate()

 final void tryTerminate() {   for (;;) {     int c = ctl.get();     // 线程池 RUNNING 退出     // 线程池 TIDYING 退出     // 线程池 SHUTDOWN 并且 队列不为空(还有没处理完的任务)退出     if (isRunning(c) ||         runStateAtLeast(c, TIDYING) ||         (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))       return;     // 还有活跃的线程,中断一个     if (workerCountOf(c) != 0) { // Eligible to terminate       interruptIdleWorkers(ONLY_ONE);       return;     }
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
复制代码


5.Work 类:工作线程

 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{   private static final long serialVersionUID = 6138294804551838833L;
/** 线程池里的工作线程,该线程在创建 Work 对象的时候传入 */ final Thread thread; /** 工作线程创建好后,执行的第一个任务 */ Runnable firstTask; /** 工作线程执行任务数量 */ volatile long completedTasks;
/** * 创建一个Work线程,指定第一个被执行的任务 */ Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
/** work 线程启动后,执行run方法,直到线程退出 */ public void run() { runWorker(this); }}
复制代码


6. runWorker

final void runWorker(Worker w) {  Thread wt = Thread.currentThread();  // task 默认为创建work时候指定的第一个任务  Runnable task = w.firstTask;  w.firstTask = null;  w.unlock(); // allow interrupts  boolean completedAbruptly = true;  try {        /**    * 1、第一个任务不为 null,先执行第一个任务,往后循环的从任务队列里取任务执行    * 2、只有 getTask() 返回null的时候,才会退出循环    */    while (task != null || (task = getTask()) != null) {      w.lock();      /** double check      * 防止线程池已经被设置成 STOP 状态后,部分线程响应中断失败,      * 这个地方再重新校验并设置下当前线程的中断状态      */      if ((runStateAtLeast(ctl.get(), STOP) ||           (Thread.interrupted() &&            runStateAtLeast(ctl.get(), STOP))) &&          !wt.isInterrupted())        wt.interrupt();      try {        // 默认没有实现,可以在子类中扩展        beforeExecute(wt, task);        Throwable thrown = null;        // 执行任务        task.run();        // 省略部分 try-catch 代码      } finally {        task = null;        // 当前线程执行任务数累计        w.completedTasks++;        w.unlock();      }    }    // 当 getTask() 为null的时候可以走到这一步    completedAbruptly = false;  } finally {    // getTask() 为 null,任务被执行完了,线程退出    processWorkerExit(w, completedAbruptly);  }
复制代码


7. getTask

private Runnable getTask() {  // 记录上次从任务队列获取任务是否超时  boolean timedOut = false;   // 循环遍历,直到获取到任务或者被中断  for (;;) {    // 线程池线程数量    int c = ctl.get();    // 线程池状态    int rs = runStateOf(c);		    // 如果线程池状态:RUNNING(-1)|SHUTDOWN(0)|STOP(1)|TIDYING(2)|TERMINATED(3)    // 两种情况:    // 1、如果是 STOP 以后状态(不用管任务队列里是否有任务未执行完),线程总数减1,立即返回 null;    // 2、在上面的情况之后,如果是 SHUTDOWN 以后的状态 && 任务队列为空(因为要等任务执行完),线程总数减1,并立即返回 null;    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {      decrementWorkerCount();      return null;    }    int wc = workerCountOf(c);    /**    * 1、allowCoreThreadTimeOut:是否让核心线程空闲回收,默认false    * 2、当前线程池中线程数是否大于核心线程数    */    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;		    /**     * 在线程池中'还有线程'或者'任务队列没有任务'的情况下:    * 1、如果线程池中的线程数,大于设置的最大线程数,必须停止当前线程;    * 2、在当前线程获取任务超时(说明队列里没有任务可以被执行):    *   (1)而核心线程允许超时回收,则回收当前线程(因为核心线程和线程池中其他线程没有特殊的标记,    *      回收了就回收了,如果下次线程池中线程不足,再新建一个线程扮演核心线程的角色即可);    *   (2) 当前线程总数超过了核心线程数,而现在又没有任务需要执行,自然回收当前线程;    */    if ((wc > maximumPoolSize || (timed && timedOut))        && (wc > 1 || workQueue.isEmpty())) {      if (compareAndDecrementWorkerCount(c))        return null;      continue;    }    try {      /**       * 1、如果允许核心线程空闲退出 或者 当前线程数大于核心线程数的情况下:      * 		poll() 设置获取任务的阻塞时间,超时后,返回 null,当循环执行到上面的两处校验处,返回 null 退出;      * 2、take()阻塞式获取任务,知道任务队列有可被执行任务为止;      */      Runnable r = timed ?        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :      workQueue.take();      if (r != null)        return r;      // 超时后仍未获取到任务      timedOut = true;    } catch (InterruptedException retry) {      timedOut = false;    }  }}
复制代码


8. processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {  if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted    decrementWorkerCount();  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    // 统计每个线程完成的任务数    completedTaskCount += w.completedTasks;    // 从线程池(set)中移除当前线程    workers.remove(w);  } finally {    mainLock.unlock();  }	// 修改线程池中断状态  tryTerminate();
int c = ctl.get(); /** * 如果当前线程池的状态不是 STOP,说明当前状态是 RUNNING 或者 SHUTDOWN, * 这个时候要保证线程池内有必要的线程去执行队列的任务 */ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果当前线程数为0,但是任务队列里又有任务需要被执行,必须调整线程数 if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } // 给线程池新增一个线程,保证任务被执行完 addWorker(null, false); }}
复制代码


用户头像

new life

关注

还未添加个人签名 2019.03.04 加入

还未添加个人简介

评论

发布
暂无评论
ThreadPoolExecutor 线程销毁源码分析