1
ThreadPoolExecutor 线程销毁源码分析
 作者:new life
- 2021 年 11 月 07 日
- 本文字数:4398 字 - 阅读完需:约 14 分钟 

线程池在开发中使用非常频繁,在面试中也是高频面试点,最近翻看源码,有一些心得分享下:
线程池回收线程的大致流程:
1、调用线程池的 shutdown() 方法或者 shutdownNow() 方法;
2、主线程修改线程池的状态:
shutdown() -> SHUTDOWN
shutdownNow() -> STOP
3、中断线程池中所有线程;中断一些阻塞的线程,让线程正常结束执行;
4、每个 work 线程在获取任务的时候 getTask(), 校验当前线程池的状态,如果线程池状态已经被修改为非 RUNNING 状态,时机合适,work 线程退出;
详情请看下面的源码注释:
1. 线程池状态:
// 线程池初始状态private static final int RUNNING    = -1 << COUNT_BITS;// 调用 shutdown() 方法后线程池状态private static final int SHUTDOWN   =  0 << COUNT_BITS;// 调用 shutdownNow() 方法后线程池状态private static final int STOP       =  1 << COUNT_BITS;// 在 TERMINATED 前的一个过渡状态,用处不多private static final int TIDYING    =  2 << COUNT_BITS;// 线程池终止状态private static final int TERMINATED =  3 << COUNT_BITS;复制代码
 2.1 shutdown 方法
public void shutdown() {    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {      // 校验调用 shutdown() 线程是否可以操作此方法,校验被终止任务的线程(works)是否可以被终止      checkShutdownAccess();      // CAS 修改线程池运行状态为:SHUTDOWN       advanceRunState(SHUTDOWN);      // 中断所有工作线程      interruptIdleWorkers();      // hook for ScheduledThreadPoolExecutor      onShutdown();     } finally {      mainLock.unlock();    }  	// 尝试中断线程池    tryTerminate();}复制代码
 2.2 shutdownNow
public List<Runnable> shutdownNow() {    List<Runnable> tasks;    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {     // 校验调用 shutdownNow() 线程是否可以操作此方法,校验被终止任务的线程(works)是否可以被终止      checkShutdownAccess();     // CAS 修改线程池运行状态为:STOP       advanceRunState(STOP);      // 中断所有工作线程      interruptWorkers();      // 移除队列中的任务,并返回被移除的任务      tasks = drainQueue();    } finally {      mainLock.unlock();    }    tryTerminate();    return tasks;}复制代码
 3. interruptIdleWorkers
private void interruptIdleWorkers(boolean onlyOne) {  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    // 遍历 works 工作线程集合,中断每个阻塞获取任务的线程,让这些线程执行完当前逻辑,退出 run() 方法    for (Worker w : workers) {      Thread t = w.thread;      if (!t.isInterrupted() && w.tryLock()) {        try {          t.interrupt();        } catch (SecurityException ignore) {        } finally {          w.unlock();        }      }      if (onlyOne)        break;    }  } finally {    mainLock.unlock();  }}复制代码
 4. tryTerminate()
 final void tryTerminate() {   for (;;) {     int c = ctl.get();     // 线程池 RUNNING 退出     // 线程池 TIDYING 退出     // 线程池 SHUTDOWN 并且 队列不为空(还有没处理完的任务)退出     if (isRunning(c) ||         runStateAtLeast(c, TIDYING) ||         (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))       return;     // 还有活跃的线程,中断一个     if (workerCountOf(c) != 0) { // Eligible to terminate       interruptIdleWorkers(ONLY_ONE);       return;     }
     final ReentrantLock mainLock = this.mainLock;     mainLock.lock();     try {       if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {         try {           terminated();         } finally {           ctl.set(ctlOf(TERMINATED, 0));           termination.signalAll();         }         return;       }     } finally {       mainLock.unlock();     }     // else retry on failed CAS   } }复制代码
 5.Work 类:工作线程
 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{   private static final long serialVersionUID = 6138294804551838833L;
   /** 线程池里的工作线程,该线程在创建 Work 对象的时候传入 */   final Thread thread;   /** 工作线程创建好后,执行的第一个任务 */   Runnable firstTask;   /** 工作线程执行任务数量 */   volatile long completedTasks;
   /**    * 创建一个Work线程,指定第一个被执行的任务    */   Worker(Runnable firstTask) {     setState(-1);       this.firstTask = firstTask;     this.thread = getThreadFactory().newThread(this);   }
   /** work 线程启动后,执行run方法,直到线程退出  */   public void run() {     runWorker(this);   }}复制代码
 6. runWorker
final void runWorker(Worker w) {  Thread wt = Thread.currentThread();  // task 默认为创建work时候指定的第一个任务  Runnable task = w.firstTask;  w.firstTask = null;  w.unlock(); // allow interrupts  boolean completedAbruptly = true;  try {        /**    * 1、第一个任务不为 null,先执行第一个任务,往后循环的从任务队列里取任务执行    * 2、只有 getTask() 返回null的时候,才会退出循环    */    while (task != null || (task = getTask()) != null) {      w.lock();      /** double check      * 防止线程池已经被设置成 STOP 状态后,部分线程响应中断失败,      * 这个地方再重新校验并设置下当前线程的中断状态      */      if ((runStateAtLeast(ctl.get(), STOP) ||           (Thread.interrupted() &&            runStateAtLeast(ctl.get(), STOP))) &&          !wt.isInterrupted())        wt.interrupt();      try {        // 默认没有实现,可以在子类中扩展        beforeExecute(wt, task);        Throwable thrown = null;        // 执行任务        task.run();        // 省略部分 try-catch 代码      } finally {        task = null;        // 当前线程执行任务数累计        w.completedTasks++;        w.unlock();      }    }    // 当 getTask() 为null的时候可以走到这一步    completedAbruptly = false;  } finally {    // getTask() 为 null,任务被执行完了,线程退出    processWorkerExit(w, completedAbruptly);  }复制代码
 7. getTask
private Runnable getTask() {  // 记录上次从任务队列获取任务是否超时  boolean timedOut = false;   // 循环遍历,直到获取到任务或者被中断  for (;;) {    // 线程池线程数量    int c = ctl.get();    // 线程池状态    int rs = runStateOf(c);		    // 如果线程池状态:RUNNING(-1)|SHUTDOWN(0)|STOP(1)|TIDYING(2)|TERMINATED(3)    // 两种情况:    // 1、如果是 STOP 以后状态(不用管任务队列里是否有任务未执行完),线程总数减1,立即返回 null;    // 2、在上面的情况之后,如果是 SHUTDOWN 以后的状态 && 任务队列为空(因为要等任务执行完),线程总数减1,并立即返回 null;    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {      decrementWorkerCount();      return null;    }    int wc = workerCountOf(c);    /**    * 1、allowCoreThreadTimeOut:是否让核心线程空闲回收,默认false    * 2、当前线程池中线程数是否大于核心线程数    */    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;		    /**     * 在线程池中'还有线程'或者'任务队列没有任务'的情况下:    * 1、如果线程池中的线程数,大于设置的最大线程数,必须停止当前线程;    * 2、在当前线程获取任务超时(说明队列里没有任务可以被执行):    *   (1)而核心线程允许超时回收,则回收当前线程(因为核心线程和线程池中其他线程没有特殊的标记,    *      回收了就回收了,如果下次线程池中线程不足,再新建一个线程扮演核心线程的角色即可);    *   (2) 当前线程总数超过了核心线程数,而现在又没有任务需要执行,自然回收当前线程;    */    if ((wc > maximumPoolSize || (timed && timedOut))        && (wc > 1 || workQueue.isEmpty())) {      if (compareAndDecrementWorkerCount(c))        return null;      continue;    }    try {      /**       * 1、如果允许核心线程空闲退出 或者 当前线程数大于核心线程数的情况下:      * 		poll() 设置获取任务的阻塞时间,超时后,返回 null,当循环执行到上面的两处校验处,返回 null 退出;      * 2、take()阻塞式获取任务,知道任务队列有可被执行任务为止;      */      Runnable r = timed ?        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :      workQueue.take();      if (r != null)        return r;      // 超时后仍未获取到任务      timedOut = true;    } catch (InterruptedException retry) {      timedOut = false;    }  }}复制代码
 8. processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {  if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted    decrementWorkerCount();  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    // 统计每个线程完成的任务数    completedTaskCount += w.completedTasks;    // 从线程池(set)中移除当前线程    workers.remove(w);  } finally {    mainLock.unlock();  }	// 修改线程池中断状态  tryTerminate();
  int c = ctl.get();  /**  * 如果当前线程池的状态不是 STOP,说明当前状态是 RUNNING 或者 SHUTDOWN,  * 这个时候要保证线程池内有必要的线程去执行队列的任务  */  if (runStateLessThan(c, STOP)) {    if (!completedAbruptly) {      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;      // 如果当前线程数为0,但是任务队列里又有任务需要被执行,必须调整线程数      if (min == 0 && ! workQueue.isEmpty())        min = 1;      if (workerCountOf(c) >= min)        return; // replacement not needed    }    // 给线程池新增一个线程,保证任务被执行完    addWorker(null, false);  }}复制代码
 划线
评论
复制
发布于: 2021 年 11 月 07 日阅读数: 13

new life
关注
还未添加个人签名 2019.03.04 加入
还未添加个人简介











 
    
评论