写点什么

【高并发】通过源码深度分析线程池中 Worker 线程的执行流程

作者:冰河
  • 2021 年 11 月 17 日
  • 本文字数:6873 字

    阅读完需:约 23 分钟

【高并发】通过源码深度分析线程池中Worker线程的执行流程

大家好,我是冰河~~


在《【高并发】通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程》一文中我们深度分析了线程池执行任务的核心流程,在 ThreadPoolExecutor 类的 addWorker(Runnable, boolean)方法中,使用 CAS 安全的更新线程的数量之后,接下来就是创建新的 Worker 线程执行任务,所以,我们先来分析下 Worker 类的源码。

Worker 类分析

Worker 类从类的结构上来看,继承了 AQS(AbstractQueuedSynchronizer 类)并实现了 Runnable 接口。本质上,Worker 类既是一个同步组件,也是一个执行任务的线程。接下来,我们看下 Worker 类的源码,如下所示。


private final class Worker extends AbstractQueuedSynchronizer implements Runnable {  private static final long serialVersionUID = 6138294804551838833L;  //执行任务的线程类  final Thread thread;  //初始化执行的任务,第一次执行的任务  Runnable firstTask;  //完成任务的计数  volatile long completedTasks;  //Worker类的构造方法,初始化任务并调用线程工厂创建执行任务的线程  Worker(Runnable firstTask) {    setState(-1);     this.firstTask = firstTask;    this.thread = getThreadFactory().newThread(this);  }  //重写Runnable接口的run()方法  public void run() {    //调用ThreadPoolExecutor类的runWorker(Worker)方法    runWorker(this);  }
//检测是否是否获取到锁 //state=0表示未获取到锁 //state=1表示已获取到锁 protected boolean isHeldExclusively() { return getState() != 0; } //使用AQS设置线程状态 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
//尝试释放锁 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; }
public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }}
复制代码


在 Worker 类的构造方法中,可以看出,首先将同步状态 state 设置为-1,设置为-1 是为了防止 runWorker 方法运行之前被中断。这是因为如果其他线程调用线程池的 shutdownNow()方法时,如果 Worker 类中的 state 状态的值大于 0,则会中断线程,如果 state 状态的值为-1,则不会中断线程。


Worker 类实现了 Runnable 接口,需要重写 run 方法,而 Worker 的 run 方法本质上调用的是 ThreadPoolExecutor 类的 runWorker 方法,在 runWorker 方法中,会首先调用 unlock 方法,该方法会将 state 置为 0,所以这个时候调用 shutDownNow 方法就会中断当前线程,而这个时候已经进入了 runWork 方法,就不会在还没有执行 runWorker 方法的时候就中断线程。


注意:大家需要重点理解 Worker 类的实现。


Worker 类中调用了 ThreadPoolExecutor 类的 runWorker(Worker)方法。接下来,我们一起看下 ThreadPoolExecutor 类的 runWorker(Worker)方法的实现。

runWorker(Worker)方法

首先,我们看下 RunWorker(Worker)方法的源码,如下所示。


final void runWorker(Worker w) {  Thread wt = Thread.currentThread();  Runnable task = w.firstTask;  w.firstTask = null;  //释放锁,将state设置为0,允许中断任务的执行  w.unlock();  boolean completedAbruptly = true;  try {    //如果任务不为空,或者从任务队列中获取的任务不为空,则执行while循环    while (task != null || (task = getTask()) != null) {      //如果任务不为空,则获取Worker工作线程的独占锁      w.lock();      //如果线程已经停止,或者中断线程后线程终止并且没有成功中断线程      //大家好好理解下这个逻辑      if ((runStateAtLeast(ctl.get(), STOP) ||         (Thread.interrupted() &&          runStateAtLeast(ctl.get(), STOP))) &&        !wt.isInterrupted())        //中断线程        wt.interrupt();      try {        //执行任务前执行的逻辑        beforeExecute(wt, task);        Throwable thrown = null;        try {          //调用Runable接口的run方法执行任务          task.run();        } catch (RuntimeException x) {          thrown = x; throw x;        } catch (Error x) {          thrown = x; throw x;        } catch (Throwable x) {          thrown = x; throw new Error(x);        } finally {          //执行任务后执行的逻辑          afterExecute(task, thrown);        }      } finally {        //任务执行完成后,将其设置为空        task = null;        //完成的任务数量加1        w.completedTasks++;        //释放工作线程获得的锁        w.unlock();      }    }    completedAbruptly = false;  } finally {    //执行退出Worker线程的逻辑    processWorkerExit(w, completedAbruptly);  }}
复制代码


这里,我们拆解 runWorker(Worker)方法。


(1)获取当前线程的句柄和工作线程中的任务,并将工作线程中的任务设置为空,执行 unlock 方法释放锁,将 state 状态设置为 0,此时可以中断工作线程,代码如下所示。


Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//释放锁,将state设置为0,允许中断任务的执行w.unlock();
复制代码


(2)在 while 循环中进行判断,如果任务不为空,或者从任务队列中获取的任务不为空,则执行 while 循环,否则,调用 processWorkerExit(Worker, boolean)方法退出 Worker 工作线程。


while (task != null || (task = getTask()) != null)
复制代码


(3)如果满足 while 的循环条件,首先获取工作线程内部的独占锁,并执行一系列的逻辑判断来检测是否需要中断当前线程的执行,代码如下所示。


//如果任务不为空,则获取Worker工作线程的独占锁w.lock();//如果线程已经停止,或者中断线程后线程终止并且没有成功中断线程//大家好好理解下这个逻辑if ((runStateAtLeast(ctl.get(), STOP) ||   (Thread.interrupted() &&    runStateAtLeast(ctl.get(), STOP))) &&  !wt.isInterrupted())  //中断线程  wt.interrupt();
复制代码


(4)调用执行任务前执行的逻辑,如下所示


//执行任务前执行的逻辑beforeExecute(wt, task);
复制代码


(5)调用 Runable 接口的 run 方法执行任务


//调用Runable接口的run方法执行任务task.run();
复制代码


(6)调用执行任务后执行的逻辑


//执行任务后执行的逻辑afterExecute(task, thrown);
复制代码


(7)将完成的任务设置为空,完成的任务数量加 1 并释放工作线程的锁。


//任务执行完成后,将其设置为空task = null;//完成的任务数量加1w.completedTasks++;//释放工作线程获得的锁w.unlock();
复制代码


(8)退出 Worker 线程的执行,如下所示


//执行退出Worker线程的逻辑processWorkerExit(w, completedAbruptly);
复制代码


从代码分析上可以看到,当从 Worker 线程中获取的任务为空时,会调用 getTask()方法从任务队列中获取任务,接下来,我们看下 getTask()方法的实现。

getTask()方法

我们先来看下 getTask()方法的源代码,如下所示。


private Runnable getTask() {  //轮询是否超时的标识  boolean timedOut = false;  //自旋for循环  for (;;) {    //获取ctl    int c = ctl.get();    //获取线程池的状态    int rs = runStateOf(c);
//检测任务队列是否在线程池停止或关闭的时候为空 //也就是说任务队列是否在线程池未正常运行时为空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //减少Worker线程的数量 decrementWorkerCount(); return null; } //获取线程池中线程的数量 int wc = workerCountOf(c);
//检测当前线程池中的线程数量是否大于corePoolSize的值或者是否正在等待执行任务 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果线程池中的线程数量大于corePoolSize //获取大于corePoolSize或者是否正在等待执行任务并且轮询超时 //并且当前线程池中的线程数量大于1或者任务队列为空 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { //成功减少线程池中的工作线程数量 if (compareAndDecrementWorkerCount(c)) return null; continue; }
try { //从任务队列中获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //任务不为空直接返回任务 if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }}
复制代码


getTask()方法的逻辑比较简单,大家看源码就可以了,我这里就不重复描述了。


接下来,我们看下在正式调用 Runnable 的 run()方法前后,执行的 beforeExecute 方法和 afterExecute 方法。

beforeExecute(Thread, Runnable)方法

beforeExecute(Thread, Runnable)方法的源代码如下所示。


protected void beforeExecute(Thread t, Runnable r) { }
复制代码


可以看到,beforeExecute(Thread, Runnable)方法的方法体为空,我们可以创建 ThreadPoolExecutor 的子类来重写 beforeExecute(Thread, Runnable)方法,使得线程池正式执行任务之前,执行我们自己定义的业务逻辑。

afterExecute(Runnable, Throwable)方法

afterExecute(Runnable, Throwable)方法的源代码如下所示。


protected void afterExecute(Runnable r, Throwable t) { }
复制代码


可以看到,afterExecute(Runnable, Throwable)方法的方法体同样为空,我们可以创建 ThreadPoolExecutor 的子类来重写 afterExecute(Runnable, Throwable)方法,使得线程池在执行任务之后执行我们自己定义的业务逻辑。


接下来,就是退出工作线程的 processWorkerExit(Worker, boolean)方法。

processWorkerExit(Worker, boolean)方法

processWorkerExit(Worker, boolean)方法的逻辑主要是执行退出 Worker 线程,并且对一些资源进行清理,源代码如下所示。


private void processWorkerExit(Worker w, boolean completedAbruptly) {  //执行过程中出现了异常,突然中断  if (completedAbruptly)    //将工作线程的数量减1    decrementWorkerCount();  //获取全局锁  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    //累加完成的任务数量    completedTaskCount += w.completedTasks;    //将完成的任务从workers集合中移除    workers.remove(w);  } finally {    //释放锁    mainLock.unlock();  }  //尝试终止工作线程的执行  tryTerminate();  //获取ctl  int c = ctl.get();  //判断当前线程池的状态是否小于STOP(RUNNING或者SHUTDOWN)  if (runStateLessThan(c, STOP)) {    //如果没有突然中断完成    if (!completedAbruptly) {      //如果allowCoreThreadTimeOut为true,为min赋值为0,否则赋值为corePoolSize      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;      //如果min为0并且工作队列不为空      if (min == 0 && ! workQueue.isEmpty())        //min的值设置为1        min = 1;      //如果线程池中的线程数量大于min的值      if (workerCountOf(c) >= min)        //返回,不再执行程序        return;     }    //调用addWorker方法    addWorker(null, false);  }}
复制代码


接下来,我们拆解 processWorkerExit(Worker, boolean)方法。


(1)执行过程中出现了异常,突然中断执行,则将工作线程数量减 1,如下所示。


//执行过程中出现了异常,突然中断if (completedAbruptly)  //将工作线程的数量减1  decrementWorkerCount();
复制代码


(2)获取锁累加完成的任务数量,并将完成的任务从 workers 集合中移除,并释放,如下所示。


//获取全局锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {  //累加完成的任务数量  completedTaskCount += w.completedTasks;  //将完成的任务从workers集合中移除  workers.remove(w);} finally {  //释放锁  mainLock.unlock();}
复制代码


(3)尝试终止工作线程的执行


//尝试终止工作线程的执行tryTerminate();
复制代码


(4)处判断当前线程池中的线程个数是否小于核心线程数,如果是,需要新增一个线程保证有足够的线程可以执行任务队列中的任务或者提交的任务。


//获取ctlint c = ctl.get();//判断当前线程池的状态是否小于STOP(RUNNING或者SHUTDOWN)if (runStateLessThan(c, STOP)) {  //如果没有突然中断完成  if (!completedAbruptly) {    //如果allowCoreThreadTimeOut为true,为min赋值为0,否则赋值为corePoolSize    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;    //如果min为0并且工作队列不为空    if (min == 0 && ! workQueue.isEmpty())      //min的值设置为1      min = 1;    //如果线程池中的线程数量大于min的值    if (workerCountOf(c) >= min)      //返回,不再执行程序      return;   }  //调用addWorker方法  addWorker(null, false);}
复制代码


接下来,我们看下 tryTerminate()方法。

tryTerminate()方法

tryTerminate()方法的源代码如下所示。


final void tryTerminate() {  //自旋for循环  for (;;) {    //获取ctl    int c = ctl.get();    //如果线程池的状态为RUNNING    //或者状态大于TIDYING    //或者状态为SHUTDOWN并且任务队列为空    //直接返回程序,不再执行后续逻辑    if (isRunning(c) ||      runStateAtLeast(c, TIDYING) ||      (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))      return;    //如果当前线程池中的线程数量不等于0    if (workerCountOf(c) != 0) {       //中断线程的执行      interruptIdleWorkers(ONLY_ONE);      return;    }    //获取线程池的全局锁    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {      //通过CAS将线程池的状态设置为TIDYING      if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {        try {          //调用terminated()方法          terminated();        } finally {          //将线程池状态设置为TERMINATED          ctl.set(ctlOf(TERMINATED, 0));          //唤醒所有因为调用线程池的awaitTermination方法而被阻塞的线程          termination.signalAll();        }        return;      }    } finally {      //释放锁      mainLock.unlock();    }  }}
复制代码


(1)获取 ctl,根据情况设置线程池状态或者中断线程的执行,并返回。


//获取ctlint c = ctl.get();//如果线程池的状态为RUNNING//或者状态大于TIDYING//或者状态为SHUTDOWN并且任务队列为空//直接返回程序,不再执行后续逻辑if (isRunning(c) ||  runStateAtLeast(c, TIDYING) ||  (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))  return;//如果当前线程池中的线程数量不等于0if (workerCountOf(c) != 0) {   //中断线程的执行  interruptIdleWorkers(ONLY_ONE);  return;}
复制代码


(2)获取全局锁,通过 CAS 设置线程池的状态,调用 terminated()方法执行逻辑,最终将线程池的状态设置为 TERMINATED,唤醒所有因为调用线程池的 awaitTermination 方法而被阻塞的线程,最终释放锁,如下所示。


//获取线程池的全局final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {  //通过CAS将线程池的状态设置为TIDYING  if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {    try {      //调用terminated()方法      terminated();    } finally {      //将线程池状态设置为TERMINATED      ctl.set(ctlOf(TERMINATED, 0));      //唤醒所有因为调用线程池的awaitTermination方法而被阻塞的线程      termination.signalAll();    }    return;  }} finally {  //释放锁  mainLock.unlock();}
复制代码


接下来,看下 terminated()方法。

terminated()方法

terminated()方法的源代码如下所示。


protected void terminated() { }
复制代码


可以看到,terminated()方法的方法体为空,我们可以创建 ThreadPoolExecutor 的子类来重写 terminated()方法,值得 Worker 线程调用 tryTerminate()方法时执行我们自己定义的 terminated()方法的业务逻辑。


好了,今天就到这儿吧,我是冰河,我们下期见~~

发布于: 2021 年 11 月 17 日阅读数: 9
用户头像

冰河

关注

公众号:冰河技术 2020.05.29 加入

互联网高级技术专家,《深入理解分布式事务:原理与实战》,《海量数据处理与大数据技术实战》和《MySQL技术大全:开发、优化与运维实战》作者,mykit-data与mykit-transaction-message框架作者。【冰河技术】作者。

评论 (1 条评论)

发布
用户头像
原创不易,冰河在线求三连,不过分吧?
2021 年 11 月 17 日 15:33
回复
没有更多了
【高并发】通过源码深度分析线程池中Worker线程的执行流程