深入了解 java 线程篇之 ThreadPoolExecutor
一. 概述
在深入了解 JAVA 线程篇初略的介绍了线程池中几大关键属性成员,以及大体的原理。但是还未对其进行全面的解读,接下来会其属性以及方法进行全方面解读。
二. 解析
1. 线程池状态
在线程池对象中,有个 ctl 是记录这线程池状态以及正在运行的线程数量。ctl 结构图如下:
高三位记录这线程池状态;
100 - RUNNING,运行状态,即 ctl 是负数;
000 - SHUTDOWN,关闭状态,不再接受新任务,但能处理已添加的任务。
001 - STOP,停止状态,不再接受新任务,不处理已添加的任务,并且会中断正在处理的任务。
010 - TIDYING,僵尸状态,所有的任务已终止,线程数为 0;
011 - TERMINATED,线程池彻底终止。
状态的转换关系图如下:
我们创建线程池时,其状态默认值为 ctl=0xE0000000。线程池只提供了两个方法,对线程池进行关闭的
关闭线程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//设置状态
advanceRunState(SHUTDOWN);
//中断工作线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试结束线程池
tryTerminate();
}
立马关闭线程池
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//设置状态
advanceRunState(STOP);
//中断工作线程
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试结束线程池
tryTerminate();
return tasks;
}
2. 提交任务
这里面核心的代码主要是 execute 方法;
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//如果小于核心线程数,则创建Worker工作线程(核心线程)去执行该任务
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果创建不了Worker工作线程或者说是失败,则进入下面的逻辑
//判断线程池是否在运行状态,如果是,则将任务添加到队列中去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//如果线程池现在非运行状态,并从队列中剔除该任务,并拒绝这次任务的提交;
reject(command);
else if (workerCountOf(recheck) == 0)
//如果工作线程数为0,则创建Worker工作线程;
addWorker(null, false);
}
else if (!addWorker(command, false))//创建非核心线程
//创建Worker非核心线程失败,则拒绝这次任务的提交;
reject(command);
}
流程图如下:
接下来,我们具体了解创建工作线程的方法,该方法提供了两个参数,一个是任务对象,一个是是否是核心线程标志位;
//参数: firstTask-任务, core-是否是核心线程标志
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 主要检测线程池是否在运行状态
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//如果大于指定的线程数,则拒绝创建Worker工作线程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//尝试新增工作线程数量->ctl+1
if (compareAndIncrementWorkerCount(c))
//创建成功,则打破这个死循环
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建工作线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 还没启动,该工作线程就是启动状态,则抛出异常。
throw new IllegalThreadStateException();
//添加到工作线程集合中去
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动工作线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//如果添加失败,则从工作线程集合中剔除该工作线程对象,并尝试结束线程池
addWorkerFailed(w);
}
return workerStarted;
}
具体的流程图如下:
3. 任务执行
工作线程执行任务,是调用的 ThreadPoolExecutor 对象中的 runWorker 方法。从任务队列里拉取任务。代码如下
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//如果是关闭状态且工作线程为0的话,或者是停止状态,则不执行任务
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//检测是否超时标志位,如果为true,当从队列中获取任务时超过指定的时间,则结束该工作线程。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
流程如下:
从队列里拉取任务后,则进行任务的执行。代码如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
//从任务队列中拉取任务
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//如果线程池处于停止状态,中断当前线程
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
//完成任务数进行+1
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
流程如下:
当异常或者从任务队列中过去任务为空时,则处理工作线程退出。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
//设置最小工作线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
具体流程如下:
4. 尝试结束线程池
在上面的几个流程图中,有尝试结束线程池动作,代码如下:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
流程图如下:
5. 设置核心线程数
代码如下:
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
//尝试进行一次中断工作线程
interruptIdleWorkers();
else if (delta > 0) {
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
6. 设置最大线程数
代码如下:
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
//尝试进行一次中断工作线程
interruptIdleWorkers();
}
三. 动态调整线程池参数算法
根据上面的两个方法,设置核心线程数量以及最大线程数数量,进行调整线程池参数。
对操作系统层面的 CPU 使用率以及 JVM 堆内存的使用率进行实时监控,针对该数据进行一定的算法,有效的更改线程数量以及队列容量大小;
这一块没有找到相关的介绍,后续完善该章节的内容............
四. 总结
经过上述的解读,大体对线程池中的线程池状态,任务提交,任务执行等逻辑有了大体的认知,里面照着代码梳理了流程图,还未对整体进行解读,为什么有这么多条件限制等未进行解读,后续完善该篇........
版权声明: 本文为 InfoQ 作者【邱学喆】的原创文章。
原文链接:【http://xie.infoq.cn/article/759bb1edda40c51ab0e2e1843】。文章转载请联系作者。
邱学喆
计算机原理的深度解读,源码分析。 2018.08.26 加入
在IT领域keep Learning。要知其然,也要知其所以然。原理的爱好,源码的阅读。输出我对原理以及源码解读的理解。个人的仓库:https://gitee.com/Michael_Chan
评论