jdk 源码系列之 TheadPoolExecutor
jdk 源码系列之 ThreadPoolExecutor
前言
最近在做消息系统重构,许多代码耦合在业务层,不利后续需求的更迭。经过讨论,将日志、消息、邮件抽离出来,放在切面层里面进行异步处理。
在使用的过程中,产生了不少疑问。
为何 worker 是线程安全的?
私有参数的作用是什么?比如(workQueue、maximumPoolSize 等)
以下源码都是基于 JDK 11 的阅读
了解和使用
首先了解下,线程池存在的意义。
Thread pools address two different problems: they usually
provide improved performance when executing large numbers of
asynchronous tasks, due to reduced per-task invocation overhead,
and they provide a means of bounding and managing the resources,
including threads, consumed when executing a collection of tasks.
Each ThreadPoolExecutor also maintains some basic statistics,
such as the number of completed tasks.
线程池解决了两个问题。
减少了每个任务的资源开销,换句话说,池的存在就是为了复用资源(例如:CPU 数量)。
方便管理每个任务的基本情况。(这个应该说的是线程名字、线程数量、任务数量等,// 推测)
直接 new 线程池的 ThreadPoolExecutor。需要以下参数。
corePoolSize
空闲状态的线程池中保留的线程数量。如果设置了 allowCoreThreadTimeOut 这个属性,也会收到 keepAliveTime 的影响。
maximumPoolSize
允许创建的最大线程数。
keepAliveTime
核心线程数以外的线程最长等待任务的时间。
unit
keepAliveTime 参数的时间的时间单位。
workQueue
工作队列,用于保存任务、提交任务的队列。
threadFactory
用于创建新线程的工厂。默认创建 DefaultThreadFactory 类。
handler
达到最大的线程数、或者是放不进工作队列时,所触发阻止的策略。默认拒绝策略 AbortPolicy。
实例如下
public class Test {
public static void main(String[] args) {
int corePoolSize = 16;
int maximumPoolSize = 24;
long keepAliveTime = 10L;
TimeUnit timeUnit = TimeUnit.SECONDS;
BlockingQueue<Runnable> work = new LinkedBlockingQueue<>(16);
String prefix = "test";
boolean daemon = true;
ThreadFactory factory = new MyThreadFactory(prefix, daemon);
RejectedExecutionHandler handler =
new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
timeUnit,
work,
factory,
handler
);
for (int i = 0; i < 10; i++) {
MyTask myTask = new MyTask();
threadPoolExecutor.execute(myTask);
}
threadPoolExecutor.shutdown();
}
}
public class MyTask implements Runnable{
@Override
public void run() {
System.err.println(System.currentTimeMillis());
}
}
public class MyThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private final boolean daemonThread;
public MyThreadFactory(String namePrefix, boolean daemonThread) {
if (namePrefix == null || "".equals(namePrefix)) {
this.namePrefix = namePrefix + "-thread-" + POOL_NUMBER;
} else {
this.namePrefix = "";
}
this.daemonThread = daemonThread;
SecurityManager s = System.getSecurityManager();
group = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
}
@Override
public Thread newThread(Runnable r) {
String name = namePrefix + threadNumber.getAndIncrement();
Thread t = new Thread(group, r, name, 0);
t.setDaemon(daemonThread);
return t;
}
}
类似 ThreadFactory 的实例,其实可以看下 ThreadPoolExecutor 类里面的实例,参考下,了解大概的写法。
小结
对于 corePoolSize、maximumPoolSize 两个参数而言,我们说任务分 CPU 密集型、IO 密集型两种。CPU 密集型可能只有少量长 CPU 执行,IO 密集型则通常具有大量短 CPU 执行。
但是由于业务是具有多样性,今天配置参数,可能只适用于今天,哪天流量起来,就可能触发拒绝策略,而从导致后续的业务都失效了。目前市面上统一或者很好的方案,不过,这几个参数可以设置到后台配置,每次看实际效果,动态调整 corePoolSize、maximumPoolSize、wokrQueue 等参数。
源码
首先看下 ThreadPoolExecutor 继承关系
间接继承了 Executor、ExecutorService 两个接口类,直接继承了 AbstractExecutorService。
顶级父类 Executor 只抽象了一个 execute 的执行方法
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
用于执行新线程的一个抽象方法。
ExecutorService 接口类则更多的是定义执行方法的行为。
比如 shutdown 关闭、awaitTermination 等待终止、submit 提交等,赋予了 Executor 更多的功能,不过这只是做了抽象、都未作实现。
来到 AbstractExecutorService 这个类,实现了部分功能(submit 提交),同时继续加强 Executor 类。
ThreadPoolExecutor 的构造函数
创建一个实例出来
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor();
构造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// 默认线程工厂
private static class DefaultThreadFactory implements ThreadFactory {
// 线程池的编号,一个池底下可以有多个线程
private static final AtomicInteger poolNumber = new AtomicInteger(1);
// 线程组
private final ThreadGroup group;
// 线程编号
private final AtomicInteger threadNumber = new AtomicInteger(1);
// 线程名前缀
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
// 创建线程
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
// 是否是守护线程,默认非守护线程
if (t.isDaemon())
t.setDaemon(false);
// 优先级别,数字越大优先度越高,默认正常级别
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
// 默认拒绝策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
//
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 参数不允许设置小于0 或则是 最大线程数小于核心线程数
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 参数不允许设置 null
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 赋值到全局私有变量
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
4 个必传参数,corePoolSize maximumPoolSize workQueue keepAliveTime,同时 RejectedExecutionHandler 是 AbortPolicy 策略模式。
顺路提一下,线程池拒绝策略有 4 种。
AbortPolicy
/**
* A handler for rejected tasks that throws a
* {@link RejectedExecutionException}.
*
* This is the default handler for {@link ThreadPoolExecutor} and
* {@link ScheduledThreadPoolExecutor}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
当前任务无法进入工作队列,触发拒绝策略,直接抛出异常。
CallerRunsPolicy
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 是否关闭
if (!e.isShutdown()) {
r.run();
}
}
}
只要不关闭,就不会丢弃任务。
DiscardOldestPolicy
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 不是关闭状态
if (!e.isShutdown()) {
// 移除队列头部
e.getQueue().poll();
// 执行当前任务
e.execute(r);
}
}
}
丢弃队列最前面的任务,然后执行当前被线程池拒绝的任务。
DiscardPolicy
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
什么都不做,既不抛错、也不操作工作队列,静静的丢弃任务。
小结
总结下这 4 个线程池拒绝策略。
策略作用个人思考以及建议 AbortPolicy 不能提交任务后,抛出异常,并丢失任务比较重要的业务使用这个,可以快速定位到那个任务丢失 CallerRunsPolicy 只要不关闭,就不会丢弃任务必须要让所有任务都执行的业务,可以选择这个 DiscardOldestPolicy 丢弃队列最前面的任务,然后执行当前被线程池拒绝的任务使用这个,必须要抛弃之前的任务,感觉有点鸡肋,慎重使用 DiscardPolicy 静静的丢弃任务不是重要的计算业务,可以考虑这个
执行过程
在上面的 实例 ,执行的方法是 execute 方法。接下来我们解析剖析这个函数。
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@link RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
// 任务没null,直接抛出空指针异常
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 这里应该是将工作队列任务数量设置到高位
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 添加进工作队列,true 则使用核心线程来绑定
// 这里貌似是直接分配任务下去执行了,直接使用核心线程数
if (addWorker(command, true))
return;
c = ctl.get();
}
// 是运行状态,同时将任务,添加至任务队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 不是运行状态,移除任务
if (! isRunning(recheck) && remove(command))
// 触发线程池运行策略
reject(command);
// 其他任务都执行完毕
else if (workerCountOf(recheck) == 0)
// 增加执行线程的工作对象
// 不是首次执行任务,使用最大线程数
addWorker(null, false);
}
// 无法添加工作线程对象,触发线程池拒绝策略
else if (!addWorker(command, false))
reject(command);
}
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
// 添加工作任务
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 这里的 C 应该是 bit 高位的运行状态
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
// 检查是否是结束、或者是停止状态,必须是运行状态
// 首次的任务不是 null、且队列是 null,不需要添加进工作队列
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
// 添加失败
return false;
for (;;) {
// core 等于 true 的时候
// 比较的是 corePoolSize,是否超过工作数量
// 反之,比较的 maximumPoolSize
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
// 超过无法添加
return false;
// 增加工作数量
if (compareAndIncrementWorkerCount(c))
// 增加成功后,结束整个循环
break retry;
// 重新获取增加一后的工作数量
c = ctl.get(); // Re-read ctl
// 检查是否处于关闭状态
if (runStateAtLeast(c, SHUTDOWN))
// 没有处于关闭状态、跳过循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 初始化第一个工作任务
w = new Worker(firstTask);
final Thread t = w.thread;
// 开始执行程序
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 使用 ReentrantLock 来加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
// 这里依然判断是否是运行状态
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 判断线程是否正在处于活跃状态
if (t.isAlive()) // precheck that t is startable
// 是活跃状态,这说明有其他任务正在利用这个线程
// 所以直接抛出错误
throw new IllegalThreadStateException();
// 添加进 workers,这个任务执行者
workers.add(w);
int s = workers.size();
// 设置当前最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
// 执行工作添加成功
workerAdded = true;
}
} finally {
// 无论有没有成功,都要释放锁,避免产生死锁
mainLock.unlock();
}
// 执行工作添加成功
if (workerAdded) {
// 线程启动
t.start();
// 执行工作正式工作状态
workerStarted = true;
}
}
} finally {
// 可能由于抛出导致 workerStarted 还是为 false
// 则重新回滚在试试
if (! workerStarted)
addWorkerFailed(w);
}
// 执行工作线程状态
return workerStarted;
}
/**
* Rolls back the worker thread creation.
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
if (w != null)
// 移除执行执行工作
workers.remove(w);
// 减一操作
decrementWorkerCount();
// 尝试进行中断
tryTerminate();
} finally {
mainLock.unlock();
}
}
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
// 工作线程数不等于 0
if (workerCountOf(c) != 0) { // Eligible to terminate
// 打断工作线程
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 设置当前生命周期状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 终止
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒所有线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
线程池的原理就是,通过一个叫 workers 的 Hashset 添加所有工作线程,且不能大于最大线程数 maximumPoolSize 。这个 worker 的对象保存着线程对象,以及任务对象。
执行的时候,直接分配下去,当超过核心线程数 corePoolSize 的时候,任务则会去到工作队列 workQueue 等待空余的线程调用任务,否则直接执行。。
如果超过最大线程数 maximumPoolSize 或者线程池的生命周期不是处于 RUNNING,都会触线程池的拒绝策略。
另外添加进 workers 的这个 Hashset 都是加上了 ReentrantLock 的所有,这意味着在获取子线程的操作都是线程安全的,当前只有一个对象获取到这个线程。如果有其他任务使用到了这个线程,直接抛出。不在执行任何操作。
总结
总结以下,看了 ThreadPoolExecutor 的体会。
ThreadPoolExecutor 线程安全类。当前的线程必须是没有执行过任何任务,另外 ThreadPoolExecutor 类在请求子线程的都加了锁,其他任务无法在请求到这个线程。线程的局部变量只有该当前任务才能使用到,所以线程安全。
corePoolSize 是保留线程的数量,如果设置过高,很容易闲置了线程资源或者说是限制了 CPU 资源。要具体看情况设置。
线程池的拒绝策略,DiscardOldestPolicy 要慎重使用。
在使用线程池的时候,不需要考虑使用的容器是否是线程安全了、或者这个类线程安不安全。
声明
作者: Sinsy
本文链接:https://blog.sincehub.cn/2021/03/14/TheadPoolExecutor/
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文声明。 如您有任何商业合作或者授权方面的协商,请给我留言:550569627@qq.com
引用
版权声明: 本文为 InfoQ 作者【sinsy】的原创文章。
原文链接:【http://xie.infoq.cn/article/afccf2981b8c779fb96f03228】。文章转载请联系作者。
sinsy
还未添加个人签名 2019.10.18 加入
公众号:编程的那些年 个人博客网站:https://blog.sincehub.cn/
评论