写点什么

深入了解 java 线程篇之 ThreadPoolExecutor

用户头像
邱学喆
关注
发布于: 4 小时前
深入了解java线程篇之ThreadPoolExecutor

一. 概述

深入了解 JAVA 线程篇初略的介绍了线程池中几大关键属性成员,以及大体的原理。但是还未对其进行全面的解读,接下来会其属性以及方法进行全方面解读。

二. 解析

1. 线程池状态

在线程池对象中,有个 ctl 是记录这线程池状态以及正在运行的线程数量。ctl 结构图如下:

高三位记录这线程池状态;

  • 100 - RUNNING,运行状态,即 ctl 是负数;

    000 - SHUTDOWN,关闭状态,不再接受新任务,但能处理已添加的任务。

    001 - STOP,停止状态,不再接受新任务,不处理已添加的任务,并且会中断正在处理的任务。

    010 - TIDYING,僵尸状态,所有的任务已终止,线程数为 0;

    011 - TERMINATED,线程池彻底终止。

状态的转换关系图如下:

我们创建线程池时,其状态默认值为 ctl=0xE0000000。线程池只提供了两个方法,对线程池进行关闭的

  • 关闭线程池

public void shutdown() {  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    checkShutdownAccess();    //设置状态    advanceRunState(SHUTDOWN);            //中断工作线程    interruptIdleWorkers();    onShutdown(); // hook for ScheduledThreadPoolExecutor  } finally {    mainLock.unlock();  }   //尝试结束线程池  tryTerminate();}
复制代码
  • 立马关闭线程池

public List<Runnable> shutdownNow() {     List<Runnable> tasks;  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    checkShutdownAccess();    //设置状态    advanceRunState(STOP);     //中断工作线程    interruptWorkers();    tasks = drainQueue();  } finally {    mainLock.unlock();  }  //尝试结束线程池  tryTerminate();  return tasks;}
复制代码

2. 提交任务

这里面核心的代码主要是 execute 方法;

public void execute(Runnable command) {  if (command == null)               throw new NullPointerException();    int c = ctl.get();  if (workerCountOf(c) < corePoolSize) {    //如果小于核心线程数,则创建Worker工作线程(核心线程)去执行该任务    if (addWorker(command, true))      return;    c = ctl.get();  }  //如果创建不了Worker工作线程或者说是失败,则进入下面的逻辑  //判断线程池是否在运行状态,如果是,则将任务添加到队列中去  if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();    if (! isRunning(recheck) && remove(command))      //如果线程池现在非运行状态,并从队列中剔除该任务,并拒绝这次任务的提交;      reject(command);    else if (workerCountOf(recheck) == 0)      //如果工作线程数为0,则创建Worker工作线程;      addWorker(null, false);  }  else if (!addWorker(command, false))//创建非核心线程    //创建Worker非核心线程失败,则拒绝这次任务的提交;    reject(command);}
复制代码

流程图如下:

接下来,我们具体了解创建工作线程的方法,该方法提供了两个参数,一个是任务对象,一个是是否是核心线程标志位;

//参数: firstTask-任务, core-是否是核心线程标志private boolean addWorker(Runnable firstTask, boolean core) {  retry:  for (;;) {    int c = ctl.get();    int rs = runStateOf(c);        // 主要检测线程池是否在运行状态    if (rs >= SHUTDOWN &&        ! (rs == SHUTDOWN &&           firstTask == null &&           ! workQueue.isEmpty()))      return false;        for (;;) {      int wc = workerCountOf(c);      //如果大于指定的线程数,则拒绝创建Worker工作线程      if (wc >= CAPACITY ||          wc >= (core ? corePoolSize : maximumPoolSize))        return false;      //尝试新增工作线程数量->ctl+1      if (compareAndIncrementWorkerCount(c))        //创建成功,则打破这个死循环        break retry;            c = ctl.get();  // Re-read ctl      if (runStateOf(c) != rs)        continue retry;      // else CAS failed due to workerCount change; retry inner loop    }  }  boolean workerStarted = false;  boolean workerAdded = false;  Worker w = null;  try {    //创建工作线程    w = new Worker(firstTask);    final Thread t = w.thread;    if (t != null) {      final ReentrantLock mainLock = this.mainLock;      mainLock.lock();      try {        int rs = runStateOf(ctl.get());        if (rs < SHUTDOWN ||            (rs == SHUTDOWN && firstTask == null)) {          if (t.isAlive()) // 还没启动,该工作线程就是启动状态,则抛出异常。            throw new IllegalThreadStateException();          //添加到工作线程集合中去          workers.add(w);          int s = workers.size();          if (s > largestPoolSize)            largestPoolSize = s;          workerAdded = true;        }      } finally {        mainLock.unlock();      }      if (workerAdded) {        //启动工作线程        t.start();        workerStarted = true;      }    }  } finally {    if (! workerStarted)      //如果添加失败,则从工作线程集合中剔除该工作线程对象,并尝试结束线程池      addWorkerFailed(w);  }  return workerStarted;}
复制代码

具体的流程图如下:

3. 任务执行

工作线程执行任务,是调用的 ThreadPoolExecutor 对象中的 runWorker 方法。从任务队列里拉取任务。代码如下

private Runnable getTask() {  boolean timedOut = false; // Did the last poll() time out?    for (;;) {    int c = ctl.get();    int rs = runStateOf(c);        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {      //如果是关闭状态且工作线程为0的话,或者是停止状态,则不执行任务      decrementWorkerCount();      return null;    }        int wc = workerCountOf(c);   	//检测是否超时标志位,如果为true,当从队列中获取任务时超过指定的时间,则结束该工作线程。    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;        if ((wc > maximumPoolSize || (timed && timedOut))        && (wc > 1 || workQueue.isEmpty())) {      if (compareAndDecrementWorkerCount(c))        return null;      continue;    }        try {      Runnable r = timed ?        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :      workQueue.take();      if (r != null)        return r;      timedOut = true;    } catch (InterruptedException retry) {      timedOut = false;    }  }}
复制代码

流程如下:

从队列里拉取任务后,则进行任务的执行。代码如下:

final void runWorker(Worker w) {  Thread wt = Thread.currentThread();  Runnable task = w.firstTask;  w.firstTask = null;  w.unlock(); // allow interrupts  boolean completedAbruptly = true;  try {    while (task != null || (task = getTask()) != null) {      //从任务队列中拉取任务      w.lock();      if ((runStateAtLeast(ctl.get(), STOP) ||           (Thread.interrupted() &&            runStateAtLeast(ctl.get(), STOP))) &&                             !wt.isInterrupted())        //如果线程池处于停止状态,中断当前线程        wt.interrupt();      try {        beforeExecute(wt, task);        Throwable thrown = null;        try {          //任务执行          task.run();        } catch (RuntimeException x) {          thrown = x; throw x;        } catch (Error x) {          thrown = x; throw x;        } catch (Throwable x) {          thrown = x; throw new Error(x);        } finally {          afterExecute(task, thrown);        }      } finally {        task = null;        //完成任务数进行+1        w.completedTasks++;        w.unlock();      }    }    completedAbruptly = false;  } finally {    processWorkerExit(w, completedAbruptly);  }}
复制代码

流程如下:

当异常或者从任务队列中过去任务为空时,则处理工作线程退出。

private void processWorkerExit(Worker w, boolean completedAbruptly) {  if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted    decrementWorkerCount();  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    completedTaskCount += w.completedTasks;    workers.remove(w);  } finally {    mainLock.unlock();  }    tryTerminate();    int c = ctl.get();  if (runStateLessThan(c, STOP)) {    if (!completedAbruptly) {      //设置最小工作线程数      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;      if (min == 0 && ! workQueue.isEmpty())        min = 1;      if (workerCountOf(c) >= min)        return; // replacement not needed    }    addWorker(null, false);  }}
复制代码

具体流程如下:

4. 尝试结束线程池

在上面的几个流程图中,有尝试结束线程池动作,代码如下:

final void tryTerminate() {  for (;;) {    int c = ctl.get();    if (isRunning(c) ||        runStateAtLeast(c, TIDYING) ||        (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))      return;    if (workerCountOf(c) != 0) {      interruptIdleWorkers(ONLY_ONE);      return;    }    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {      if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {        try {          terminated();        } finally {          ctl.set(ctlOf(TERMINATED, 0));          termination.signalAll();        }        return;      }    } finally {      mainLock.unlock();    }     }}
复制代码

流程图如下:

5. 设置核心线程数

代码如下:

public void setCorePoolSize(int corePoolSize) {  if (corePoolSize < 0)    throw new IllegalArgumentException();  int delta = corePoolSize - this.corePoolSize;  this.corePoolSize = corePoolSize;  if (workerCountOf(ctl.get()) > corePoolSize)    //尝试进行一次中断工作线程    interruptIdleWorkers();         else if (delta > 0) {    int k = Math.min(delta, workQueue.size());    while (k-- > 0 && addWorker(null, true)) {      if (workQueue.isEmpty())        break;    }  }}
复制代码

6. 设置最大线程数

代码如下:

public void setMaximumPoolSize(int maximumPoolSize) {  if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)    throw new IllegalArgumentException();  this.maximumPoolSize = maximumPoolSize;  if (workerCountOf(ctl.get()) > maximumPoolSize)    //尝试进行一次中断工作线程    interruptIdleWorkers();}
复制代码

三. 动态调整线程池参数算法

根据上面的两个方法,设置核心线程数量以及最大线程数数量,进行调整线程池参数。

对操作系统层面的 CPU 使用率以及 JVM 堆内存的使用率进行实时监控,针对该数据进行一定的算法,有效的更改线程数量以及队列容量大小;

这一块没有找到相关的介绍,后续完善该章节的内容............

四. 总结

经过上述的解读,大体对线程池中的线程池状态,任务提交,任务执行等逻辑有了大体的认知,里面照着代码梳理了流程图,还未对整体进行解读,为什么有这么多条件限制等未进行解读,后续完善该篇........


发布于: 4 小时前阅读数: 2
用户头像

邱学喆

关注

计算机原理的深度解读,源码分析。 2018.08.26 加入

在IT领域keep Learning。要知其然,也要知其所以然。原理的爱好,源码的阅读。输出我对原理以及源码解读的理解。个人的仓库:https://gitee.com/Michael_Chan

评论

发布
暂无评论
深入了解java线程篇之ThreadPoolExecutor