Java 并发系列终结篇:彻底搞懂 Java 线程池的工作原理 (1)
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// ...省略校验相关代码
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
// ...
}
这个构造方法中有 7 个参数之多,我们逐个来看每个参数所代表的含义:
corePoolSize 表示线程池的核心线程数。当有任务提交到线程池时,如果线程池中的线程数小于 corePoolSize,那么则直接创建新的线程来执行任务。
workQueue 任务队列,它是一个阻塞队列,用于存储来不及执行的任务的队列。当有任务提交到线程池的时候,如果线程池中的线程数大于等于 corePoolSize,那么这个任务则会先被放到这个队列中,等待执行。
maximumPoolSize 表示线程池支持的最大线程数量。当一个任务提交到线程池时,线程池中的线程数大于 corePoolSize,并且 workQueue 已满,那么则会创建新的线程执行任务,但是线程数要小于等于 maximumPoolSize。
keepAliveTime 非核心线程空闲时保持存活的时间。非核心线程即 workQueue 满了之后,再提交任务时创建的线程,因为这些线程不是核心线程,所以它空闲时间超过 keepAliveTime 后则会被回收。
unit 非核心线程空闲时保持存活的时间的单位
threadFactory 创建线程的工厂,可以在这里统一处理创建线程的属性
handler 拒绝策略,当线程池中的线程达到 maximumPoolSize 线程数后且 workQueue 已满的情况下,再向线程池提交任务则执行对应的拒绝策略
2.线程池工作流程
=========
线程池提交任务是从 execute 方法开始的,我们可以从 execute 方法来分析线程池的工作流程。
(1)当 execute 方法提交一个任务时,如果线程池中线程数小于 corePoolSize,那么不管线程池中是否有空闲的线程,都会创建一个新的线程来执行任务。
?
(2)当 execute 方法提交一个任务时,线程池中的线程数已经达到了 corePoolSize,且此时没有空闲的线程,那么则会将任务存储到 workQueue 中。
(3)如果 execute 提交任务时线程池中的线程数已经到达了 corePoolSize,并且 workQueue 已满,那么则会创建新的线程来执行任务,但总线程数应该小于 maximumPoolSize。?
(4)如果线程池中的线程执行完了当前的任务,则会尝试从 workQueue 中取出第一个任务来执行。如果 workQueue 为空则会阻塞线程。
(5)如果 execute 提交任务时,线程池中的线程数达到了 maximumPoolSize,且 workQueue 已满,此时会执行拒绝策略来拒绝接受任务。
(6)如果线程池中的线程数超过了 corePoolSize,那么空闲时间超过 keepAliveTime 的线程会被销毁,但程池中线程个数会保持为 corePoolSize。
(7)如果线程池存在空闲的线程,并且设置了 allowCoreThreadTimeOut 为 true。那么空闲时间超过 keepAliveTime 的线程都会被销毁。
3.线程池的拒绝策略
如果线程池中的线程数达到了 maximumPoolSize,并且 workQueue 队列存储满的情况下,线程池会执行对应的拒绝策略。在 JDK 中提供了 RejectedExecutionHandler 接口来执行拒绝操作。实现 RejectedExecutionHandler 的类有四个,对应了四种拒绝策略。分别如下:
DiscardPolicy 当提交任务到线程池中被拒绝时,线程池会丢弃这个被拒绝的任务
DiscardOldestPolicy 当提交任务到线程池中被拒绝时,线程池会丢弃等待队列中最老的任务。
CallerRunsPolicy 当提交任务到线程池中被拒绝时,会在线程池当前正在运行的 Thread 线程中处理被拒绝额任务。即哪个线程提交的任务哪个线程去执行。
AbortPolicy 当提交任务到线程池中被拒绝时,直接抛出 RejectedExecutionException 异常。
三、线程池源码分析
=========
从上一章对线程池的工作流程解读来看,线程池的原理似乎并没有很难。但是开篇时我说过想要读懂线程池的源码并不难,主要原因是线程池内部运用到了大量并发相关知识,另外还与线程池中用到的位运算有关。
1.线程池中的位运算(了解内容)
================
在向线程池提交任务时有两个比较重要的参数会决定任务的去向,这两个参数分别是线程池的状态和线程池中的线程数。在 ThreadPoolExecutor 内部使用了一个 AtomicInteger 类型的整数 ctl 来表示这两个参数,代码如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
// Integer.SIZE = 32.所以 COUNT_BITS= 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 00001111 11111111 11111111 11111111 这个值可以表示线程池的最大线程容量
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 将-1 左移 29 位得到 RUNNING 状态的值
private static final int RUNNING = -1 << COUNT_BITS;
// 线程池运行状态和线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
// ...
}
因为涉及多线程的操作,这里为了保证原子性,ctl 参数使用了 AtomicInteger 类型,并且通过 ctlOf 方法来计算出了 ctl 的初始值。如果你不了解位运算大概很难理解上述代码的用意。
我们知道,int 类型在 Java 中占用 4byte 的内存,一个 byte 占用 8bit,所以 Java 中的 int 类型共占用 32bit。对于这个 32bit,我们可以进行高低位的拆分。做 Android 开发的同学应该都了解 View 测量流程中的 MeasureSpec 参数,这个参数将 32bit 的 int 拆分成了高 2 位和低 30 位,分别表示 View 的测量模式和测量值。而这里的 ctl 与 MeasureSpec 类似,ctl 将 32 位的 int 拆分成了高 3 位和低 29 位,分别表示线程池的运行状态和线程池中的线程个数。
下面我们通过位运算来验证一下 ctl 是如何工作的,当然,如果你不理解这个位运算的过程对理解线程池的源码影响并不大,所以对以下验证内容不感兴趣的同学可以直接略过。
可以看到上述代码中 RUNNING 的值为-1 左移 29 位,我们知道在计算机中**负数是以其绝对值的补码来表示的,而补码是由反码加 1 得到。**因此-1 在计算机中存储形式为 1 的反码+1
1 的原码:00000000 00000000 00000000 00000001
1 的反码:11111111 11111111 11111111 11111110
-1 存储: 11111111 11111111 11111111 11111111
接下来对-1 左移 29 位可以得到 RUNNING 的值为:
// 高三位表示线程状态,即高三位为 111 表示 RUNNING
11100000 00000000 00000000 00000000
而 AtomicInteger 初始线程数量是 0,因此 ctlOf 方法中的“|”运算如下:
RUNNING: 11100000 00000000 00000000 00000000
|
线程数为 0: 00000000 00000000 00000000 00000000
得到 ctl: 11100000 00000000 00000000 00000000
通过 RUNNING|0(线程数)即可得到 ctl 的初始值。同时还可以通过以下方法将 ctl 拆解成运行状态和线程数:
// 00001111 11111111 11111111 11111111
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 获取线程池运行状态
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
// 获取线程池中的线程数
private static int workerCountOf(int c) { return c & COUNT_MASK; }
假设此时线程池为 RUNNING 状态,且线程数为 0,验证一下 runStateOf 是如何得到线程池的运行状态的:
COUNT_MASK: 00001111 11111111 11111111 11111111
~COUNT_MASK: 11110000 00000000 00000000 00000000
&
ctl: 11100000 00000000 00000000 00000000
RUNNING: 11100000 00000000 00000000 00000000
复制代码
如果不理解上边的验证流程没有关系,只要知道通过 runStateOf 方法可以得到线程池的运行状态,通过 workerCountOf 可以得到线程池中的线程数即可。
接下来我们进入线程池的源码的源码分析环节。
2.ThreadPoolExecutor 的 execute
============================
向线程池提交任务的方法是 execute 方法,execute 方法是 ThreadPoolExecutor 的核心方法,以此方法为入口来进行剖析,execute 方法的代码如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取 ctl 的值
int c = ctl.get();
// 1.线程数小于 corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 线程池中线程数小于核心线程数,则尝试创建核心线程执行任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.到此处说明线程池中线程数大于核心线程数或者创建线程失败
if (isRunning(c) && workQueue.offer(command)) {
// 如果线程是运行状态并且可以使用 offer 将任务加入阻塞队列未满,offer 是非阻塞操作。
int recheck = ctl.get();
// 重新检查线程池状态,因为上次检测后线程池状态可能发生改变,如果非运行状态就移除任务并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果是运行状态,并且线程数是 0,则创建线程
else if (workerCountOf(recheck) == 0)
// 线程数是 0,则创建非核心线程,且不指定首次执行任务,这里的第二个参数其实没有实际意义
addWorker(null, false);
}
// 3.阻塞队列已满,创建非核心线程执行任务
else if (!addWorker(command, false))
// 如果失败,则执行拒绝策略
reject(command);
}
execute 方法中的逻辑可以分为三部分:
1.如果线程池中的线程数小于核心线程,则直接调用 addWorker 方法创建新线程来执行任务。
2.如果线程池中的线程数大于核心线程数,则将任务添加到阻塞队列中,接着再次检验线程池的运行状态,因为上次检测过之后线程池状态有可能发生了变化,如果线程池关闭了,那么移除任务,执行拒绝策略。如果线程依然是运行状态,但是线程池中没有线程,那么就调用 addWorker 方法创建线程,注意此时传入任务参数是 null,即不指定执行任务,因为任务已经加入了阻塞队列。创建完线程后从阻塞队列中取出任务执行。
3.如果第 2 步将任务添加到阻塞队列失败了,说明阻塞队列任务已满,那么则会执行第三步,即创建非核心线程来执行任务,如果非核心线程创建失败那么就执行拒绝策略。
可以看到,代码的执行逻辑和我们在第二章中分析的线程池的工作流程是一样的。
接下来看下 execute 方法中创建线程的方法 addWoker,addWoker 方法承担了核心线程和非核心线程的创建,通过一个 boolean 参数 core 来区分是创建核心线程还是非核心线程。先来看 addWorker 方法前半部分的代码:
// 返回值表示是否成功创建了线程
private boolean addWorker(Runnable firstTask, boolean core) {
// 这里做了一个 retry 标记,相当于 goto.
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
// 根据 core 来确定创建最大线程数,超过最大值则创建线程失败,注意这里的最大值可能有 s 三个 corePoolSize、maximumPoolSize 和线程池线程的最大容量
if (workerCountOf(c)
= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 通过 CAS 来将线程数+1,如果成功则跳出循环,执行下边逻辑
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 线程池的状态发生了改变,退回 retry 重新执行
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}
// ...省略后半部分
return workerStarted;
}
这部分代码会通过是否创建核心线程来确定线程池中线程数的值,如果是创建核心线程,那么最大值不能超过 corePoolSize,如果是创建非核心线程那么线程数不能超过 maximumPoolSize,另外无论是创建核心线程还是非核心线程,最大线程数都不能超过线程池允许的最大线程数 COUNT_MASK(有可能设置的 maximumPoolSize 大于 COUNT_MASK)。如果线程数大于最大值就返回 false,创建线程失败。
接下来通过 CAS 将线程数加 1,如果成功那么就 break retry 结束无限循环,如果 CAS 失败了则就 continue retry 从新开始 for 循环,注意这里的 retry 不是 Java 的关键字,是一个可以任意命名的字符。
接下来,如果能继续向下执行则开始执行创建线程并执行任务的工作了,看下 addWorker 方法的后半部分代码:
private boolean addWorker(Runnable firstTask, boolean core) {
// ...省略前半部分
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 实例化一
个 Worker,内部封装了线程
w = new Worker(firstTask);
// 取出新建的线程
final Thread t = w.thread;
if (t != null) {
// 这里使用 ReentranLock 加锁保证线程安全
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
// 拿到锁湖重新检查线程池状态,只有处于 RUNNING 状态或者处于 SHUTDOWN 并且 firstTask==null 时候才会创建线程
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 线程不是处于 NEW 状态,说明线程已经启动,抛出异常
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// 将线程加入线程队列,这里的 worker 是一个 HashSet
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 开启线程执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
这部分逻辑其实比较容易理解,就是创建 Worker 并开启线程执行任务的过程,Worker 是对线程的封装,创建的 worker 会被添加到 ThreadPoolExecutor 中的 HashSet 中。也就是线程池中的线程都维护在这个名为 workers 的 HashSet 中并被 ThreadPoolExecutor 所管理,HashSet 中的线程可能处于正在工作的状态,也可能处于空闲状态,一旦达到指定的空闲时间,则会根据条件进行回收线程。
我们知道,线程调用 start 后就会开始执行线程的逻辑代码,执行完后线程的生命周期就结束了,那么线程池是如何保证 Worker 执行完任务后仍然不结束的呢?当线程空闲超时或者关闭线程池又是怎样进行线程回收的呢?这个实现逻辑其实就在 Worker 中。看下 Worker 的代码:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 执行任务的线程
final Thread thread;
// 初始化 Worker 时传进来的任务,可能为 null,如果不空,则创建和立即执行这个 task,对应核心线程创建的情况
Runnable firstTask;
Worker(Runnable firstTask) {
// 初始化时设置 setate 为-1
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 通过线程工程创建线程
this.thread = getThreadFactory().newThread(this);
}
// 线程的真正执行逻辑
public void run() {
runWorker(this);
}
// 判断线程是否是独占状态,如果不是意味着线程处于空闲状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 获取锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// ...
}
Worker 是位于 ThreadPoolExecutor 中的一个内部类,它继承了 AQS,使用 AQS 来实现了独占锁的功能,但是并没支持可重入。这里使用不可重入的特性来表示线程的执行状态,即可以通过 isHeldExclusively 方法来判断,如果是独占状态,说明线程正在执行任务,如果非独占状态,说明线程处于空闲状态。关于 AQS 我们前边文章中已经详细分析过了,不了解 AQS 的可以翻看前边 ReentranLock 的文章。
另外,Worker 还实现了 Runnable 接口,因此它的执行逻辑就是在 run 方法中,run 方法调用的是线程池中的 runWorker(this)方法。任务的执行逻辑就在 runWorker 方法中,它的代码如下:
final void runWorker(Worker w) {
评论