一、内容概括
本文内容主要围绕 JDK 中的 ThreadPoolExecutor 展开,首先描述了 ThreadPoolExecutor 的构造流程以及内部状态管理的机理,随后用大量篇幅深入源码探究了 ThreadPoolExecutor 线程分配、任务处理、拒绝策略、启动停止等过程,其中对 Worker 内置类进行重点分析,内容不仅包含其工作原理,更对其设计思路进行了一定分析。文章内容既包含了源码流程分析,还具有设计思路探讨和二次开发实践。
二、构造 ThreadPoolExecutor
2.1 线程池参数列表
大家可以通过如下构造方法创建线程池(其实还有其它构造器,大家可以深入源码进行查看,但最终都是调用下面的构造器创建线程池);
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
复制代码
其中的构造参数的作用如下:
2.2 执行流程概述
由构造参数的作用我们可知,线程池中由几个重要的组件:核心线程池 、 空闲(非核心)线程池 和 阻塞队列。这里首先给出线程池的核心执行流程图,大家首先对其有个印象,之后分析源码就会轻松一些了。
下面对流程图中一些注释说明下:cap 表示池的容量,size 表示池中正在运行的线程数。对于阻塞队列来说,cap 表示队列容量,size 表示已经入队的任务数量。cpS<cpc 表示运行中的核心线程数小于线程池设置核心线程数的情况。
1)当核心线程池 未 “满” 时,会创建新的核心线程执行提交的任务。这里的 “满” 指的是核心线程池中的数量(size)小于容量(cap),此时会通过线程工厂新创建线程执行提交任务。
2)当核心线程池 已 “满” 时,会将提交的任务 push 进任务队列中,等待核心线程的释放。一旦核心线程释放后,将会从任务队列中 pull task 继续执行。因为使用的是阻塞队列,对于已经释放的核心线程,也会阻塞在获取任务的过程中。
3)当任务队列也满了时(这里的满是指真的满了,当然暂不考虑无界队列情况),会从空闲线程池中继续创建线程执行提交的任务。但空闲线程池中的线程是有存活时间(keepAliveTime)的,当线程执行完任务后,只能存活 keepAliveTime 时长,时间一过,线程就得被销毁。
4)当空闲线程池的线程数不断增加,直到 ThreadPoolExecutor 中的总线程数大于 maximumPoolSize 时,会拒绝执行任务,将提交的任务交给 RejectedExecutionHandler 进行后续处理。
上面所说的核心线程池和空闲线程池只是抽象出来的一个概念,后面我们将对其具体内容进行分析。
2.3 常用线程池
在进入 ThreadPoolExecutor 的源码分析前,我们先介绍下常用的线程池(其实并不常用,只是 JDK 自带了)。这些线程池可由 Executors 这个工具类(或叫线程池工厂)来创建。
2.3.1 FixedThreadPool
固定线程数线程池的创建方式如下:其中核心线程数与最大线程数固定且相等,采用以链表为底层结构的无界阻塞队列。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
复制代码
特点:
核心线程数与最大线程数相等,因此不会创建空闲线程。keepAliveTime 设置与否无关紧要。
采用无界队列,任务会被无限添加,直至内存溢出(OOM)。
由于无界队列不可能被占满,任务在执行前不可能被拒绝(前提是线程池一直处于运行状态)。
应用场景:
2.3.2 SingleThreadExecutor
单线程线程池的创建方式如下:其中核心线程数与最大线程数都为 1,采用以链表为底层结构的无界阻塞队列。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
复制代码
特点
应用场景
适用单线程的场景。
适用于对提交任务的处理有顺序性要求的场景。
2.3.3 CachedThreadPool
缓冲线程池的创建方式如下:其中核心线程数为 0,最大线程数为 Integer.MAX_VALUE(可以理解为无穷大)。采用同步阻塞队列。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
复制代码
特点:
核心线程数为 0,则初始就创建空闲线程,并且空闲线程的只能等待任务 60s,60s 内没有提交任务,空闲线程将被销毁。
最大线程数为无穷大,这样会造成巨量线程同时运行,CPU 负载过高,导致应用崩溃。
采用同步阻塞队列,即队列不存储任务。提交一个消费一个。由于最大线程数为无穷大,因此,只要提交任务就一定会被消费(应用未崩溃前)。
应用场景:
适用于耗时短、异步的小程序。
适用于负载较轻的服务器。
三、线程池状态以及活跃线程数
ThreadPoolExecutor 中有两个非常重要的参数:线程池状态 (rs) 以及 活跃线程数(wc)。前者用于标识当前线程池的状态,并根据状态量来控制线程池应该做什么;后者用于标识活跃线程数,根据数量控制应该在核心线程池还是空闲线程池创建线程。
ThreadPoolExecutor 用一个 Integer 变量(ctl)来设置这两个参数。我们知道,在不同操作系统下,Java 中的 Integer 变量都是 32 位,ThreadPoolExecutor 使用前 3 位(31~29)表示线程池状态,用后 29 位(28~0)表示活跃线程数。
这样设置的目的是什么呢?
我们知道,在并发场景中同时维护两个变量的代价是非常大的,往往需要进行加锁来保证两个变量的变化是原子性的。而将两个参数用一个变量维护,便只需一条语句就能保证两个变量的原子性。这种方式大大降低了使用过程中的并发问题。
有了上面的概念,我们从源码层面看看 ThreadPoolExecutor 的几种状态,以及 ThreadPoolExecutor 如何同时操作状态和活跃线程数这两个参数的。
ThreadPoolExecutor 关于状态初始化的源码如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
复制代码
ThreadPoolExecutor 使用原子 Integer 定义了 ctl 变量。ctl 在一个 int 中包装了活跃线程数和线程池运行时状态两个变量。为了达到这样的目的,ThreadPoolExecutor 的线程数被限制在 2^29-1(大约 500 million)个,而不是 2^31-1(2 billion)个,因为前 3 位被用于标识 ThreadPoolExecutor 的状态。如果未来 ThreadPoolExecutor 中的线程数不够用了,可以把 ctl 设置为原子 long 类型,再调整下相应的掩码就行了。
COUNT_BITS 概念上用于表示状态位与线程数位的分界值,实际用于状态变量等移位操作。此处为 Integer.sixze-3=32-3=29。
CAPACITY 表示 ThreadPoolExecutor 的最大容量。由下图可以看出,经过移位操作后,一个 int 值的后 29 位达到最大值:全为 1。这 29 位表示活跃线程数,全为 1 时表明达到 ThreadPoolExecutor 能容纳的最大线程数。前 3 位为 0,表示该变量只与活跃线程数相关,与状态无关。这也是为了便于后续的位操作。
RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 表示 ThreadPoolExecutor 的 5 个状态。这 5 个状态对应的可执行操作如下:
RUNNING:可接收新任务,可持续处理阻塞队列中的任务。
SHUTDOWN:不可接收新任务,可继续处理阻塞队列中的任务。
STOP:不可接收新任务,中断阻塞队列中所有任务。
TIDYING:所有任务直接终止,所有线程清空。
TERMINATED:线程池关闭。
这 5 个状态的计算过程如下图所示,经过移位计算后,数值的后 29 位全为 0,前 3 位分别代表不同的状态。
经过以上的变量定义后,ThreadPoolExecutor 将状态与线程数分离,分别设置再一个 int 值的不同连续位上,这也为下面的操作带来了极大的便利。
接下来我们来看看 ThreadPoolExecutor 是如何获取状态和线程数的。
3.1 runStateOf(c)方法
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
复制代码
runStateOf() 方法是用于获取线程池状态的方法。其中形参 c 一般是 ctl 变量,包含了状态和线程数,runStateOf()移位计算的过程如下图所示。
CAPACITY 取反后高三位置 1,低 29 位置 0。取反后的值与 ctl 进行 ‘与’ 操作。由于任何值 ‘与’ 1 等于原值,‘与’ 0 等于 0。因此 ‘与’ 操作过后,ctl 的高 3 位保留原值,低 29 位置 0。这样就将状态值从 ctl 中分离出来。
3.2 workerCountOf(c)方法
private static int workerCountOf(int c) {
return c & CAPACITY;
}
复制代码
workerCountOf(c) 方法的分析思路与上述类似,就是把后 29 位从 ctl 中分离出来,获得活跃线程数。如下图所示,这里就不再赘述。
3.3 ctlOf(rs, wc)方法
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
复制代码
ctlOf(rs, wc)通过状态值和线程数值计算出 ctl 值。rs 是 runState 的缩写,wc 是 workerCount 的缩写。rs 的后 29 位为 0,wc 的前三位为 0,两者通过 ‘或’ 操作计算出来的最终值同时保留了 rs 的前 3 位和 wc 的后 29 位,即 ctl 值。
ThreadPoolExecutor 中还有一些其它操作 ctl 的方法,分析思路与上面都大同小异,大家有兴趣可以自己看看。
本小结最后再来看看 ThreadPoolExecutor 状态转换的途径,也可以理解为生命周期。
四、execute()执行流程
4.1 execute 方法
execute() 源码如下所示:
public void execute(Runnable command) {
// 如果待执行的任务为null,直接返回空指针异常。如果任务都没有,下面的步骤都没有执行的必要啦。
if (command == null) throw new NullPointerException();
// 获取 ctl 的值,ctl = (runState + workerCount)
int c = ctl.get();
// 如果 workerCount(工作线程数) < 核心线程数
if (workerCountOf(c) < corePoolSize) {
// 执行 addWorker 方法。addWorker()方法会在下面进行详细分析,这里可以简单理解为添加工作线程处理任务。这里的true表示:在小于核心线程数时添加worker线程,即添加核心线程。
if (addWorker(command, true))
// 添加成功则直接返回
return;
// 添加失败,重新获取 ctl 的值,防止在添加worker时状态改变
c = ctl.get();
}
// 运行到这里表示核心线程数已满,因此下面addWorker中第二个参数为false。判断线程池是否是运行状态,如果是则尝试将任务添加至 任务队列 中
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取 ctl 的值,进行 double-check
int recheck = ctl.get();
// 如果线程池为非运行状态,则尝试从任务队列中移除任务
if (! isRunning(recheck) && remove(command))
// 移除成功后执行拒绝策略
reject(command);
// 如果线程池为运行状态、或移除任务失败
else if (workerCountOf(recheck) == 0)
// 执行 addWorker 方法,此时添加的是非核心线程(空闲线程,有存活时间)
addWorker(null, false);
}
// 如果线程池是非运行状态,或者 任务队列 添加任务失败,再次尝试 addWorker() 方法
else if (!addWorker(command, false))
// addWorker() 失败,执行拒绝策略
reject(command);
}
复制代码
源码分析直接看注释就行了,每一行都有,灰常灰常的详细了。
从源码中可以看到,execute() 方法主要封装了 ThreadPoolExecutor 创建线程的判断逻辑,核心线程和空闲线程的创建时机,拒绝策略的执行时机都在该方法进行判断。这里通过下面的流程图对上述源码进行总结下。
通过创建线程去执行提交的任务逻辑封装在 addWorker() 方法中。下一小节我们将来分析执行提交任务的具体逻辑。execute() 方法中还有几个方法这里说明下。
3.1.1 workerCountOf()
从 ctl 中获取活跃线程数,在第二小节已经介绍过了。
3.1.2 isRunning()
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
复制代码
依据 ctl 的值判断 ThreadPoolExecutor 是否运行状态。源码中直接判断 ctl < SHUTDOWN 是否成立,这是因为运行状态下的 ctl 最高位为 1,肯定是负数;而其它状态最高位为 0,肯定是正数。因此判断 ctl 的大小即可判断是否为运行态。
3.1.3 reject()
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
复制代码
直接调用初始化时的 RejectedExecutionHandler 接口的 rejectedExecution() 方法。这也是典型的策略模式的使用,真正的拒绝操作被封装在实现了 RejectedExecutionHandler 接口的实现类中。这里就不进行展开。
4.2 addWorker 方法
addWorker()源码分析如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 死循环执行逻辑。确保多线程环境下在预期条件下退出循环。
for (;;) {
// 获取 ctl 值并从中提取线程池 运行状态
int c = ctl.get();
int rs = runStateOf(c);
// 如果 rs > SHUTDOWN,此时不允许接收新任务,也不允许执行工作队列中的任务,直接返回fasle。
// 如果 rs == SHUTDOWN,任务为null,并且工作队列不为空,此时走下面的 '执行工作队列中任务' 的逻辑。
// 这里设置 firstTask == null 是因为:线程池在SHUTDOWN状态下,不允许添加新任务,只允许执行工作队列中剩余的任务。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取活跃线程数
int wc = workerCountOf(c);
// 如果活跃线程数 >= 容量,不允许添加新任务
// 如果 core 为 true,表示创建核心线程,如果 活跃线程数 > 核心线程数,则不允许创建线程
// 如果 core 为 false,表示创建空闲线程,如果 活跃线程数 > 最大线程数,则不允许创建线程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增加核心线程数,增加成功直接中断最外层死循环,开始创建worker线程
// 增加失败则持续执行循环内逻辑
if (compareAndIncrementWorkerCount(c))
break retry;
// 获取 ctl 值,判断运行状态是否改变
c = ctl.get();
// 如果运行状态已经改变,则从重新执行外层死循环
// 如果运行状态未改变,继续执行内层死循环
if (runStateOf(c) != rs)
continue retry;
}
}
// 用于记录worker线程的状态
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// new 一个新的worker线程,每一个Worker内持有真正执行任务的线程。
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁,保证workerAdded状态更改的原子性
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 获取线程池状态
int rs = runStateOf(ctl.get());
// 如果为运行状态,则创建worker线程
// 如果为 SHUTDOWN 状态,并且 firstTask == null,此时将创建线程执行 任务队列 中的任务。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果线程在未启动前就已经运行,抛出异常
if (t.isAlive())
throw new IllegalThreadStateException();
// 本地缓存worker线程
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// worker线程添加成功,更改为 true 状态
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 更改状态成功后启动worker线程
if (workerAdded) {
// 启动worker线程
t.start();
// 更改启动状态
workerStarted = true;
}
}
} finally {
// 如果工作线程状态未改变,则处理失败逻辑
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
复制代码
addWorker() 通过内外两层死循环判断 ThreadPoolExecutor 运行状态并通过 CAS 成功更新活跃线程数。这是为了保证线程池中的多个线程在并发环境下都能够按照预期的条件退出循环。
随后方法会 new 一个 Worker 并启动 Worker 内置的工作线程。这里通过 workerAdded 和 workerStarted 两个状态判断 Worker 是否被成功缓存与启动。
修改 workerAdded 过程会使用 ThreadPoolExecutor 的 mainlock 上锁保证原子性,防止多线程并发环境下, 向 workers 中添加数据以及获取 workers 数量这两个过程出现预期之外的情况。
addWorker() 启动 worker 线程的步骤是先 new 一个 Worker 对象,然后从中获取工作线程,再 start,因此真正的线程启动过程还是在 Worker 对象中。
这里通过一张流程图对 addWorker 总结下:
addWorker 还有几个方法也在这里分析下:
4.2.1 runStateOf()
从 ctl 中获取 ThreadPoolExecutor 状态,详细分析看第二章。
4.2.2 workerCountOf()
从 ctl 中获取 ThreadPoolExecutor 活跃线程数,详细分析看第二章。
4.2.3 compareAndIncrementWorkerCount()
int c = ctl.get();
if (compareAndIncrementWorkerCount(c)) {...}
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
复制代码
通过 CAS 的方式令 ctl 中活跃线程数+1。这里为什么只要让 ctl 的值+1 就能更改线程数了呢?因为 ctl 线程数的值存储在后 29 位中,在不溢出的情况下,+1 只会影响后 29 位的数值,只会令线程数+1。而不影响线程池状态。
4.2.4 addWorkerFailed()
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
// 移除worker
workers.remove(w);
// 活跃线程数-1
decrementWorkerCount();
// 尝试停止线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
复制代码
该方法是在工作线程启动失败后执行的方法。什么情况下会出现这种问题呢?在成功增加活跃线程数后并成功 new Worker 后,线程池状态改变为 > SHUTDOWN,既不可接受新任务,又不能执行任务队列剩余的任务,此时线程池应该直接停止。
该方法就是在这种情况下:
执行完 tryTerminate() 方法后,线程池将会进入到 TERMINATED 状态。
4.2.5 tryTerminate()
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果当前线程池状态为以下之一,无法直接进入 TERMINATED 状态,直接返回false,表示尝试失败
if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果活跃线程数不为0,中断所有的worker线程,这个会在下面详细讲解,这里会关系到 Worker 虽然继承了AQS,但是并未使用里面的CLH的原因。
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
// 加上全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 首先通过 CAS 将 ctl 改变成 (rs=TIDYING, wc=0),因为经过上面的判断保证了当先线程池能够达到这个状态。
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 钩子函数,用户可以通过继承 ThreadPoolExecutor 实现自定义的方法。
terminated();
} finally {
// 将 ctl 改变成 (rs=TERMINATED, wc=0),此时线程池将关闭。
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒其它线程,唤醒其实也没用了,其它线程唤醒后经过判断得知线程池 TERMINATED 后也会退出。
termination.signalAll();
}
return;
}
} finally {
// 释放全局锁
mainLock.unlock();
}
}
}
复制代码
五、Worker 内置类分析
5.1 Worker 对象分析
Worker 对象的源码分析:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// 工作线程
final Thread thread;
// 提交的待执行任务
Runnable firstTask;
// 已经完成的任务量
volatile long completedTasks;
Worker(Runnable firstTask) {
// 初始化状态
setState(-1);
this.firstTask = firstTask;
// 通过线程工厂创建线程
this.thread = getThreadFactory().newThread(this);
}
// 执行提交任务的方法,具体执行逻辑封装在 runWorker() 中,当addWorker() 中t.start()后,将执行该方法
public void run() {
runWorker(this);
}
// 实现AQS中的一些方法
protected boolean isHeldExclusively() { ... }
protected boolean tryAcquire(int unused) { ... }
protected boolean tryRelease(int unused) { ... }
public void lock() { ... }
public boolean tryLock() { ... }
public void unlock() { ... }
public boolean isLocked() { ... }
// 中断持有的线程
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try { t.interrupt(); }
catch (SecurityException ignore) {}
}
}
}
复制代码
从上面源码可以看出:Worker 实现了 Runnable 接口,说明 Worker 是一个任务;Worker 又继承了 AQS,说明 Worker 同时具有锁的性质,但 Worker 并没有像 ReentrantLock 等锁工具使用了 CLH 的功能,因为线程池中并不存在多个线程访问同一个 Worker 的场景,这里只是使用了 AQS 中状态维护的功能,这个具体会在下面进行详细说明。
每个 Worker 对象会持有一个工作线程 thread,在 Worker 初始化时,通过线程工厂创建该工作线程并将自己作为任务传入工作线程当中。因此,线程池中任务的运行其实并不是直接执行提交任务的 run()方法,而是执行 Worker 中的 run()方法,在该方法中再执行提交任务的 run()方法。
Worker 中的 run() 方法是委托给 ThreadPoolExecutor 中的 runWorker() 执行具体逻辑。
这里用一张图总结下:
Worker 本身是一个任务,并且持有用户提交的任务和工作线程。
工作线程持有的任务是 this 本身,因此调用工作线程的 start()方法其实是执行 this 本身的 run()方法。
this 本身的 run()委托全局的 runWorker()方法执行具体逻辑。
runWorker()方法中执行用户提交任务的 run()方法,执行用户具体逻辑。
5.2 runWorker 方法
runWorker() 源码如下所示:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 拷贝提交的任务,并将 Worker 中的 firstTask 置为 null,便于下一次重新赋值。
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 执行完持有任务后,通过 getTask() 不断从任务队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
try {
// ThreadPoolExecutor 的钩子函数,用户可以实现 ThreadPoolExecutor,并重写 beforeExecute() 方法,从而在任务执行前 完成用户定制的操作逻辑。
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行提交任务的 run() 方法
task.run();
} catch (RuntimeException x) {
...
} finally {
// ThreadPoolExecutor 的钩子函数,同 beforeExecute,只不过在任务执行完后执行。
afterExecute(task, thrown);
}
} finally {
// 便于任务回收
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 执行到这里表示任务队列中没了任务,或者线程池关闭了,此时需要将worker从缓存冲清除
processWorkerExit(w, completedAbruptly);
}
}
复制代码
runWorker() 是真正执行提交任务的方法,但其并没有通过 Thread.start()方法执行任务,而是直接执行任务的 run()方法。
runWorker() 会从任务队列中不断获取任务并执行。
runWorker() 提供了两个钩子函数,如果 jdk 的 ThreadPoolExecutor 无法满足开发人员的需求,开发人员可以继承 ThreadPoolExecutor 并重写 beforeExecute()和 afterExecute()方法定制任务执行前需要执行的逻辑。比如设置一些监控指标或者打印日志等。
5.2.1 getTask()
private Runnable getTask() {
boolean timedOut = false;
// 死循环保证一定获取到任务
for (;;) {
...
try {
// 从任务队列中获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
复制代码
5.2.2 processWorkerExit()
private void processWorkerExit(Worker w, boolean completedAbruptly) {
...
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 从缓存中移除worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试停止线程池
tryTerminate();
...
}
复制代码
六、shutdown()执行流程
线程池拥有两个主动关闭的方法;
shutdown():关闭线程池中所有空闲 Worker 线程,改变线程池状态为 SHUTDOWN;
shutdownNow():关闭线程池中所有 Worker 线程,改变线程池状态为 STOP,并返回所有正在等待处理的任务列表。
这里为什么要将 Worker 线程区分为空闲和非空闲呢?
由上面的 runWorker() 方法,我们知道 Worker 线程在理想情况下会在 while 循环中不断从任务队列中获取任务并执行,此时的 Worker 线程就是非空闲的;没有在执行任务的 worker 线程则是空闲的。因为线程池的 SHUTDOWN 状态不允许接收新任务,只允许执行任务队列中剩余的任务,因此需要中断所有空闲的 Worker 线程,非空闲线程则持续执行任务队列的任务,直至队列为空。而线程池的 STOP 状态既不允许接受新任务,也不允许执行剩余的任务,因此需要关闭所有 Worker 线程,包括正在运行的。
6.1 shutdown()
shutdown() 源码如下:
public void shutdown() {
// 上全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 校验是否有关闭线程池的权限,这里主要通过 SecurityManager 校验当前线程与每个 Worker 线程的 “modifyThread” 权限
checkShutdownAccess();
// 修改线程池状态
advanceRunState(SHUTDOWN);
// 关闭所有空闲线程
interruptIdleWorkers();
// 钩子函数,用户可以继承 ThreadPoolExecutor 并实现自定义钩子,ScheduledThreadPoolExecutor便实现了自己的钩子函数
onShutdown();
} finally {
mainLock.unlock();
}
// 尝试关闭线程池
tryTerminate();
}
复制代码
shutdown() 将 ThreadPoolExecutor 的关闭步骤封装在几个方法中,并且通过全局锁保证只有一个线程能主动关闭 ThreadPoolExecutor。ThreadPoolExecutor 同样提供了一个钩子函数 onShutdown() 让开发人员定制化关闭过程。比如 ScheduledThreadPoolExecutor 就会在关闭时对任务队列进行清理。
下面对其中的方法进行分析。
checkShutdownAccess()
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
// 校验当前线程的权限,其中 shutdownPerm 就是一个具有 modifyThread 参数的 RuntimePermission 对象。
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 校验所有worker线程是否具有 modifyThread 权限
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
复制代码
advanceRunState()
// targetState = SHUTDOWN
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
// 判断当前线程池状态 >= SHUTDOWN是否成立,如果不成立的话,通过CAS进行修改
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
复制代码
该方法中判断线当前程池状态 >= SHUTDOWN 是否成立其实也是用到了之前线程池状态定义的技巧。对于非运行状态的其它状态都为正数,且高三位都不同,TERMINATED(011) > TIDYING(010) > STOP(001) > SHUTDOWN(000)而高三位的大小取决了整个数的大小。因此对于不同状态,无论活跃线程数是多少,线程池的状态始终决定着 ctl 值的大小。即 TERMINATED 状态下的 ctl 值 > TIDYING 状态下的 ctl 值恒成立。
interruptIdleWorkers()
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 判断worker线程是否已经被标记中断了,如果没有,则尝试获取worker线程的锁
if (!t.isInterrupted() && w.tryLock()) {
try {
// 中断线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 如果 onlyOne 为true的话最多中断一个线程
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
复制代码
刚方法会尝试获取 Worker 的锁,只有获取成功的情况下才会中断线程。这里也与前面说的 Worker 虽然继承了 AQS 但却没使用 CLH 有关,后面会进行分析。
tryTerminate() 方法已经在前面分析过了,这里不过多叙述。
6.2 shutdownNow()
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 校验关闭线程池权限
checkShutdownAccess();
// 修改线程池状态为STOP
advanceRunState(STOP);
// 中断所有线程
interruptWorkers();
// 获取队列中所有正在等待处理的任务列表
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试关闭线程池
tryTerminate();
// 返回任务列表
return tasks;
}
复制代码
该方法与 shutdown() 比较相似,都将核心步骤封装在了几个方法中,其中 checkShutdownAccess() 和 advanceRunState() 相同。下面对不同的方法进行说明
interruptWorkers()
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历所有的Worker,只要Worker启动了就将其中断
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
// state >= 0表示worker已经启动,Worker启动并且持有线程不为null并且持有线程未被标记中断,则中断该线程
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
复制代码
该方法并没有尝试去获取 Worker 的锁,而是直接中断线程。因为 STOP 状态下的线程池不允许处理任务队列中正在等待的任务。
drainQueue()
// 将任务队列中的任务添加进列表中返回,通常情况下使用 drainTo() 就行了,但如果队列是延迟队列或是其他无法通过drainTo()方法转移任务时,再通过循环遍历进行转移
private List<Runnable> drainQueue() {
...
}
复制代码
七、Worker 继承 AQS 的原因
首先说结论——Worker 继承 AQS 是为了使用其中状态管理的功能,并没有像 ReentrantLock 使用 AQS 中 CLH 的性质。
我们先来看看 Worker 中与 AQS 相关的方法:
// 参数为unused,从命名也可以知道该参数未被使用
protected boolean tryAcquire(int unused) {
// 通过CAS改变将状态由0改变为1
if (compareAndSetState(0, 1)) {
// 设置当前线程独占
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 该方法只在 runWorker() 中被使用
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
复制代码
Worker 中的 tryAcquire 只是将状态改为 1,而参数未被使用,因此我们可以断定,Worker 中的状态可能取值为(0, 1)。这里没有考虑初始化状态-1 是避免出现混淆。
再看 lock() 方法,lock() 方法被调用的唯一位置就是在 runWorker() 中启动 worker 线程前。而 runWorker() 是通过 Worker 中的 run() 调用的。Worker 作为任务只被传递给本身持有的工作线程中,因此 Worker 中的 run() 方法只能被本身持有的工作线程通过 start() 调用,因此 runWorker() 只会被 Worker 本身持有的工作线程所调用,lock() 方法也只会被单线程调用,不存在多个线程竞争同一把锁的情况,也就不存在多线程环境下,只有一个线程能获得锁导致其他等待线程被添加进 CLH 队列的情况。所以 Worker 并没没有使用 CLH 的功能。
这也就很好说明了 tryAcquire() 方法并没有使用传递的参数,因为 Worker 只存在两种状态,要么被上锁(非空闲,state=1),要么未被上锁(空闲,state=0)。无需通过传递参数设置其他的状态。
final void runWorker(Worker w) {
...
try {
while (task != null || (task = getTask()) != null) {
// 唯一被调用的地方
w.lock();
...
}
}
}
复制代码
以上分析说明了 Worker 没有使用 AQS 的 CLH 功能。那么 Worker 是如何使用状态管理的功能的呢?
在关闭线程池的 shutdown() 方法中,有一个步骤是中断所有的空闲 Worker 线程。而在中断所有 Worker 线程前会判断 Worker 线程是否能被获取到锁,通过 tryLock() -> tryAcquire() 判断 Worker 的状态是否为 0,只有能够获取到锁的 Worker 才会被中断,而能被获取到锁的 Worker 即为空闲 Worker(state=0)。而不能被获取到锁的 Worker 表名已经执行过 lock() 方法了,此时 Worker 在 While 循环不断获取阻塞队列的任务执行,在 shutdown()方法中不能被中断。
private void interruptIdleWorkers(boolean onlyOne) {
...
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) { ... }
}
}
}
复制代码
因此 Worker 的状态管理其实是通过 state 的值(0 或 1)判断 Worker 是否为空闲的,如果是空闲的,则可以在线程池关闭时被中断掉,否则得一直在 while 循环中获取阻塞队列中的任务并执行,直至队列中任务为空后才被释放。如下图所示:
八、拒绝策略
本章只讨论 ThreadPoolExecutor 内置的四个拒绝策略 handler。
8.1 CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 如果线程池未被关闭,直接在当前线程中执行任务
if (!e.isShutdown()) {
r.run();
}
}
}
复制代码
直接在调用线程中执行被拒绝的任务。只要线程池为 RUNNING 状态,任务仍被执行。如果为非 RUNNING 状态,任务将直接被忽略,这也符合线程池状态的行为。
8.2 AbortPolicy
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 抛出拒绝异常
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
复制代码
任务被拒绝后直接抛出拒绝异常。
8.3 DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
// 空方法,什么都不执行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
复制代码
抛弃该任务。拒绝方法为空,表示什么都不执行,等同于将任务抛弃。
8.4 DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 从阻塞队列中获取(移除)队头的任务,
e.getQueue().poll();
// 再次尝试execute当前任务
e.execute(r);
}
}
}
复制代码
移除阻塞队列中最早进入队列中(队头)的任务,然后再次尝试执行 execute()方法,将当前任务入队。这是典型的喜新厌旧的策略。
九、ThreadPoolExecutor 二次开发实践
介绍完了 ThreadPoolExecutor 的核心原理,我们来看看 vivo 自研的 NexTask 并发框架是如何玩转线程池并提升业务人员的开发速度和代码执行速度。
NexTask 对业务常用模式、算法、场景进行抽象化,以组件的形式落地。它提供了一个快速、轻量级、简单易用并且屏蔽了底层技术细节的方式,能够让开发人员快速编写并发程序,更大程度上为开发赋能。
首先给出 NexTask 架构图,然后我们针对架构图中使用到了 ThreadPoolExecutor 的地方进行详细分析。
// Executor部分代码:
public class Executor {
...
private static DefaultTaskProcessFactory taskProcessFactory =
new DefaultTaskProcessFactory();
// 对外提供的API,用户快速创建任务处理器
public static TaskProcess getCommonTaskProcess(String name) {
return TaskProcessManager.getTaskProcess(name, taskProcessFactory);
}
public static TaskProcess getTransactionalTaskProcess(String name) {
return TaskProcessManager.getTaskProcessTransactional(name, taskProcessFactory);
}
...
}
复制代码
Executor 是对外提供的接口,开发人员可以使用它具备的简单易用的 API,快速通过任务管理器 TaskProcessManager 创建任务处理器 TaskProcess。
// TaskProcessManager 部分代码:
public class TaskProcessManager {
// 缓存map,<业务名称, 针对该业务的任务处理器>
private static Map<String, TaskProcess> taskProcessContainer =
new ConcurrentHashMap<String, TaskProcess>();
...
}
复制代码
TaskProcessManager 持有一个 ConcurrentHashMap 本地缓存有所的任务处理器,每个任务处理器与特定的业务名称一一映射。在获取任务处理器时,通过具体的业务名称从缓存中获取,不仅能够保证各个业务间的任务处理相互隔离,同时能够防止多次创建、销毁线程池造成的资源损耗。
// TaskProcess 部分代码:
public class TaskProcess {
// 线程池
private ExecutorService executor;
// 线程池初始化
private void createThreadPool() {
executor = new ThreadPoolExecutor(coreSize, poolSize, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2048), new DefaultThreadFactory(domain),
new ThreadPoolExecutor.AbortPolicy());
}
// 多线程提交任务进行处理
public <T> List<T> executeTask(List<TaskAction<T>> tasks) {
int size = tasks.size();
// 创建一个与任务数相同的 CountDownLatch,保证所有任务全部处理完后一起返回结果
final CountDownLatch latch = new CountDownLatch(size);
// 返回结果初始化
List<Future<T>> futures = new ArrayList<Future<T>>(size);
List<T> resultList = new ArrayList<T>(size);
// 遍历所有任务,提交到线程池
for (final TaskAction<T> runnable : tasks) {
Future<T> future = executor.submit(new Callable<T>() {
@Override
public T call() throws Exception {
// 处理具体的任务逻辑
try { return runnable.doInAction(); }
// 处理完成后,CountDownLatch - 1
finally { latch.countDown(); }
}
});
futures.add(future);
}
try {
// 等待所有任务处理完成
latch.await(50, TimeUnit.SECONDS);
} catch (Exception e) {
log.info("Executing Task is interrupt.");
}
// 封装结果并返回
for (Future<T> future : futures) {
try {
T result = future.get();// wait
if (result != null) {
resultList.add(result);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return resultList;
}
...
}
复制代码
每个 TaskProcess 都持有一个线程池,由线程池的初始化过程可以看到,TaskProcess 采用的是有界阻塞队列,队列中最多存放 2048 个任务,一旦超过这个数量后,将会直接拒绝接收任务并抛出拒绝处理异常。
TaskProcess 会遍历用户提交的任务列表,并通过 submit() 方法将其提交至线程池处理,submit() 底层其实还是调用的 ThreadPoolExecutor#execute() 方法,只不过会在调用前将任务封装成 RunnableFuture,这里就是 FutureTask 框架的内容了,就不进行展开。
TaskProcess 会在每次处理任务时,创建一个 CountDownLatch,并在任务结束后执行 CountDownLatch.countDown(),这样就能保证所有任务在执行完成阻塞当前线程,直至所有任务处理完后统一获取结果并返回。
十、总结
JDK 虽然为开发人员提供了 Executors 工具类以及内置的多种线程池,但那些线程池的使用非常局限,无法满足日益复杂的业务场景。阿里官方的编程规约中也推荐开发人员不要直接使用 JDK 自带的线程池,而是根据自身业务场景通过 ThreadPoolExecutor 进行创建线程池。因此,了解 ThreadPoolExecutor 内部原理对日常开发中熟练使用线程池也是至关重要的。
本文主要是对 ThreadPoolExecutor 内部核心原理进行探究,介绍了其构造方法及其各个构造参数的详细意义,以及线程池核心 ctl 参数的转化方法。随后花了大量篇幅深入 ThreadPoolExecutor 源码介绍线程池的启动与关闭流程、核心内置类 Worker 等。ThreadPoolExecutor 还有其他方法本文暂未介绍,读者可以在读完本文的基础上自行阅读其他源码,相信会有一定帮助。
作者:vivo 互联网服务器团队-Xu Weiteng
评论