知道这些线程池底层源码的知识,你也能和面试官扯半小时
pool-1-thread-2 的 i 值为: 47
pool-1-thread-2 的 i 值为: 49
pool-1-thread-1 的 i 值为: 0
pool-1-thread-1 的 i 值为: 2
pool-1-thread-1 的 i 值为: 4
pool-1-thread-1 的 i 值为: 6
pool-1-thread-1 的 i 值为: 8
pool-1-thread-1 的 i 值为: 10
pool-1-thread-1 的 i 值为: 12
pool-1-thread-1 的 i 值为: 14
pool-1-thread-1 的 i 值为: 16
pool-1-thread-1 的 i 值为: 18
pool-1-thread-1 的 i 值为: 20
pool-1-thread-1 的 i 值为: 22
pool-1-thread-1 的 i 值为: 24
pool-1-thread-1 的 i 值为: 26
pool-1-thread-1 的 i 值为: 28
pool-1-thread-1 的 i 值为: 30
pool-1-thread-1 的 i 值为: 32
pool-1-thread-1 的 i 值为: 34
pool-1-thread-1 的 i 值为: 36
pool-1-thread-1 的 i 值为: 38
pool-1-thread-1 的 i 值为: 40
pool-1-thread-1 的 i 值为: 42
pool-1-thread-1 的 i 值为: 44
pool-1-thread-1 的 i 值为: 46
pool-1-thread-1 的 i 值为: 48
pool-1-thread-1 的 i 值为: 50
?
五、剖析 ThreadPoolExecutor 底层源码
?
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
?
1. 创建一个线程池时的参数
corePoolSize
表示核心池的大小。
当提交一个任务到线程池的时候,线程池将会创建一个线程来执行任务。这个值的大小设置非常关键,设置过小将频繁地创建或销毁线程,设置过大则会造成资源浪费。
线程池新建线程的时候,如果当前线程总数小于 corePoolSize,则新建的是核心线程;如果超过 corePoolSize,则新建的是非核心线程。
maximumPoolSize
表示线程池的最大数量。
如果队列 (workQueue) 满了,并且已创建的线程数小于 maximumPoolSize,则线程池会进行创建新的线程执行任务。 如果
等待执行的线程数 大于 maximumPoolSize,缓存在队列中; 如果 maximumPoolSize 等于 corePoolSize,即是固定大小线程池。
keepAliveTime
表示线程活动的保持时间
当需要执行的任务很多,线程池的线程数大于核心池的大小时,keepAliveTime 才起作用;
当一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉
TimeUnit
表示线程活动保持时间的单位
它的单位有:TimeUnit.DAYS,TimeUnit.HOURS,TimeUnit.MINUTES,TimeUnit.MILLISECONDS,TimeUnit.MICRODECONDS
workQueue
表示线程池中存放被提交但尚未被执行的任务的队列
维护着等待执行的 Runnable 对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。
threadFactory
用于设置创建线程的工厂
它用来给每个创建出来的线程设置一个名字,就可以知道线程任务是由哪个线程工厂产生的。
handler
表示拒绝处理策略
线程数量大于最大线程数,当超过 workQueue 的任务缓存区上限的时候,就可以调用该策略,这是一种简单的限流保护。
从 JDK 源码里面可以看到:
?
其中里面其设置的 corePoolSize
(核心池的大小) 和 maximumPoolSize
(最大线程数) 都是 nThreads,其设定的阻塞队列是无界的,则饱和策略将失效,所有请求将一直排队等待被执行,可能会产生内存溢出的风险。因此阿里巴约编码规范不推荐用 Executors 来创建 ThreadPoolExecutor。
使用 Executors 创建线程池时要明确创建的阻塞队列是否有界。因此最好自己创建 ThreadPoolExecutor。
??
ThreadPoolExecutor pool1 = (ThreadPoolExecutor) pool;
// 设置核心池的大小
pool1.setCorePoolSize(15);
// setkeepAliveTime()方法 设置线程没有任务时最多保持多长时间后会停止
pool1.setKeepAliveTime(60000, TimeUnit.HOURS);
Executors 的工厂方法主要就返回了 ThreadPoolExecutor 对象
继承结构:
ThreadPoolExecutor 继承了 AbstractExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 表示线程池数量的位数,很明显是 29,Integer.SIZE=32
private static final int COUNT_BITS = Integer.SIZE - 3;
// 表示线程池最大数量,2^29 - 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
priv
ate static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
Bit field accessors that don't require unpacking ctl.
These depend on the bit layout and on workerCount being never negative.
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
Decrements the workerCount field of ctl. This is called only on
abrupt termination of a thread (see processWorkerExit). Other
decrements are performed within getTask.
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
/**
The queue used for holding tasks and handing off to worker
threads. We do not require that workQueue.poll() returning
null necessarily means that workQueue.isEmpty(), so rely
solely on isEmpty to see if the queue is empty (which we must
do for example when deciding whether to transition from
SHUTDOWN to TIDYING). This accommodates special-purpose
queues such as DelayQueues for which poll() is allowed to
return null even if it may later return non-null when delays
expire.
*/
// 用于存放线程任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
/**
Lock held on access to workers set and related bookkeeping.
While we could use a concurrent set of some sort, it turns out
to be generally preferable to use a lock. Among the reasons is
that this serializes interruptIdleWorkers, which avoids
unnecessary interrupt storms, especially during shutdown.
Otherwise exiting threads would concurrently interrupt those
that have not yet interrupted. It also simplifies some of the
associated statistics bookkeeping of largestPoolSize etc. We
also hold mainLock on shutdown and shutdownNow, for the sake of
ensuring workers set is stable while separately checking
permission to interrupt and actually interrupting.
*/
// 重入锁
private final ReentrantLock mainLock = new ReentrantLock();
/**
Set containing all worker threads in pool. Accessed only when
holding mainLock.
*/
// 线程池当中的线程集合,只有当拥有 mainLock 锁的时候,才可以进行访问
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
Wait condition to support awaitTermination
*/
// 等待条件支持终止
private final Condition termination = mainLock.newCondition();
/**
Tracks largest attained pool size. Accessed only under
mainLock.
*/
private int largestPoolSize;
/**
Counter for completed tasks. Updated only on termination of
worker threads. Accessed only under mainLock.
*/
private long completedTaskCount;
/*
All user control parameters are declared as volatiles so that
ongoing actions are based on freshest values, but without need
for locking, since no internal invariants depend on them
changing synchronously with respect to other actions.
*/
/**
Factory for new threads. All threads are created using this
factory (via method addWorker). All callers must be prepared
for addWorker to fail, which may reflect a system or user's
policy limiting the number of threads. Even though it is not
treated as an error, failure to create threads may result in
new tasks being rejected or existing ones remaining stuck in
the queue.
We go further and preserve pool invariants even in the face of
errors such as OutOfMemoryError, that might be thrown while
trying to create threads. Such errors are rather common due to
the need to allocate a native stack in Thread.start, and users
will want to perform clean pool shutdown to clean up. There
will likely be enough memory available for the cleanup code to
complete without encountering yet another OutOfMemoryError.
*/
// 创建新线程的线程工厂
private volatile ThreadFactory threadFactory;
/**
Handler called when saturated or shutdown in execute.
*/
// 饱和策略
private volatile RejectedExecutionHandler handler;
总结:
ctl 是主要的控制状态
workerCount:表示有效的线程数目
runState:表示线程池里线程的运行状态
2. 重要方法
① execute() 方法
源码剖析
// 调用 execute 方法将线程提交到线程池中
public void execute(Runnable command) {
// 如果执行的任务为空,则会抛出空指针异常
if (command == null)
throw new NullPointerException();
/*
Proceed in 3 steps:
If fewer than corePoolSize threads are running, try to
start a new thread with the given command as its first
task. The call to addWorker atomically checks runState and
workerCount, and so prevents false alarms that would add
threads when it shouldn't, by returning false.
If a task can be successfully queued, then we still need
to double-check whether we should have added a thread
(because existing ones died since last checking) or that
the pool shut down since entry into this method. So we
recheck state and if necessary roll back the enqueuing if
stopped, or start a new thread if there are none.
If we cannot queue task, then we try to add a new
thread. If it fails, we know we are shut down or saturated
and so reject the task.
*/
// 获取线程池的控制状态
int c = ctl.get();
// 如果 workerCount 值小于 corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 添加任务到 worker 集合当中,成功的话返回
if (addWorker(command, true))
return;
// 如果失败,再次获取线程池的控制状态
c = ctl.get();
}
// 如果 corePoolSize 已经满了,则需要加入到阻塞队列
// 判断线程池的状态以及是否可以往阻塞队列中继续添加 runnable
if (isRunning(c) && workQueue.offer(command)) {
// 获取线程池的状态
int recheck = ctl.get();
// 再次检查状态,线程池不处于 RUNNING 状态,将任务从 workQueue 队列中移除
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果此时队列已满,则会采取相应的拒绝策略
else if (!addWorker(command, false))
reject(command);
}
总结:
往 corePoolSize 中加入任务进行执行
当 corePoolSize 满时往阻塞队列中加入任务
阻塞队列满时并且 maximumPoolSize 已满,则采取相应的拒绝策略
② addWorker()方法
源码剖析
private boolean addWorker(Runnable firstTask, boolean core) {
// 外部循环标志
retry:
for (;;) {
// 获取线程池的状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取 workerCount
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 实例化 worker 对象
w = new Worker(firstTask);
// 获取 worker 的线程
final Thread t = w.thread;
// 如果获取的线程不为空
if (t != null) {
// 初始线程池的锁
final ReentrantLock mainLock = this.mainLock;
// 得到锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 获取锁后再次检查,获取线程池 runState
int rs = runStateOf(ctl.get());
// 判断
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 检查线程是否启动
if (t.isAlive()) // precheck that t is startable
// 如果未启动存活,则抛出异常
throw new IllegalThreadStateException();
评论