写点什么

jdk 源码系列之 TheadPoolExecutor

用户头像
sinsy
关注
发布于: 2021 年 03 月 15 日
jdk 源码系列之 TheadPoolExecutor

jdk 源码系列之 ThreadPoolExecutor

前言

最近在做消息系统重构,许多代码耦合在业务层,不利后续需求的更迭。经过讨论,将日志、消息、邮件抽离出来,放在切面层里面进行异步处理。

在使用的过程中,产生了不少疑问。

  • 为何 worker 是线程安全的?

  • 私有参数的作用是什么?比如(workQueuemaximumPoolSize 等)

以下源码都是基于 JDK 11 的阅读

了解和使用

首先了解下,线程池存在的意义。

Thread pools address two different problems: they usuallyprovide improved performance when executing large numbers ofasynchronous tasks, due to reduced per-task invocation overhead,and they provide a means of bounding and managing the resources,including threads, consumed when executing a collection of tasks.Each ThreadPoolExecutor also maintains some basic statistics, such as the number of completed tasks. 
复制代码

线程池解决了两个问题。

  1. 减少了每个任务的资源开销,换句话说,池的存在就是为了复用资源(例如:CPU 数量)。

  2. 方便管理每个任务的基本情况。(这个应该说的是线程名字、线程数量、任务数量等,// 推测)

直接 new 线程池的 ThreadPoolExecutor。需要以下参数。



corePoolSize

空闲状态的线程池中保留的线程数量。如果设置了 allowCoreThreadTimeOut 这个属性,也会收到 keepAliveTime 的影响。

maximumPoolSize

允许创建的最大线程数。

keepAliveTime

核心线程数以外的线程最长等待任务的时间。

unit

keepAliveTime 参数的时间的时间单位。

workQueue

工作队列,用于保存任务、提交任务的队列。

threadFactory

用于创建新线程的工厂。默认创建 DefaultThreadFactory 类。

handler

达到最大的线程数、或者是放不进工作队列时,所触发阻止的策略。默认拒绝策略 AbortPolicy

实例如下

public class Test {
public static void main(String[] args) {

int corePoolSize = 16;
int maximumPoolSize = 24;
long keepAliveTime = 10L;
TimeUnit timeUnit = TimeUnit.SECONDS;
BlockingQueue<Runnable> work = new LinkedBlockingQueue<>(16);
String prefix = "test"; boolean daemon = true; ThreadFactory factory = new MyThreadFactory(prefix, daemon);
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, work, factory, handler );
for (int i = 0; i < 10; i++) { MyTask myTask = new MyTask(); threadPoolExecutor.execute(myTask); }
threadPoolExecutor.shutdown();

}}

public class MyTask implements Runnable{
@Override public void run() { System.err.println(System.currentTimeMillis()); }}
public class MyThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix;
private final boolean daemonThread;
public MyThreadFactory(String namePrefix, boolean daemonThread) {
if (namePrefix == null || "".equals(namePrefix)) { this.namePrefix = namePrefix + "-thread-" + POOL_NUMBER; } else { this.namePrefix = ""; }
this.daemonThread = daemonThread;
SecurityManager s = System.getSecurityManager(); group = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); }
@Override public Thread newThread(Runnable r) {
String name = namePrefix + threadNumber.getAndIncrement();
Thread t = new Thread(group, r, name, 0); t.setDaemon(daemonThread); return t; }
}

复制代码

类似 ThreadFactory 的实例,其实可以看下 ThreadPoolExecutor 类里面的实例,参考下,了解大概的写法。

小结

对于 corePoolSize、maximumPoolSize 两个参数而言,我们说任务分 CPU 密集型、IO 密集型两种。CPU 密集型可能只有少量长 CPU 执行,IO 密集型则通常具有大量短 CPU 执行。

但是由于业务是具有多样性,今天配置参数,可能只适用于今天,哪天流量起来,就可能触发拒绝策略,而从导致后续的业务都失效了。目前市面上统一或者很好的方案,不过,这几个参数可以设置到后台配置,每次看实际效果,动态调整 corePoolSize、maximumPoolSize、wokrQueue 等参数。

源码

首先看下 ThreadPoolExecutor 继承关系



间接继承了 Executor、ExecutorService 两个接口类,直接继承了 AbstractExecutorService。

顶级父类 Executor 只抽象了一个 execute 的执行方法

public interface Executor {
/** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command);}
复制代码

用于执行新线程的一个抽象方法。

ExecutorService 接口类则更多的是定义执行方法的行为。

比如 shutdown 关闭、awaitTermination 等待终止、submit 提交等,赋予了 Executor 更多的功能,不过这只是做了抽象、都未作实现。

来到 AbstractExecutorService 这个类,实现了部分功能(submit 提交),同时继续加强 Executor 类。

ThreadPoolExecutor 的构造函数

创建一个实例出来

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor();
复制代码

构造方法如下:

    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             Executors.defaultThreadFactory(), defaultHandler);    }
// 默认线程工厂 private static class DefaultThreadFactory implements ThreadFactory { // 线程池的编号,一个池底下可以有多个线程 private static final AtomicInteger poolNumber = new AtomicInteger(1); // 线程组 private final ThreadGroup group;
// 线程编号 private final AtomicInteger threadNumber = new AtomicInteger(1); // 线程名前缀 private final String namePrefix;
DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } // 创建线程 public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); // 是否是守护线程,默认非守护线程 if (t.isDaemon()) t.setDaemon(false); // 优先级别,数字越大优先度越高,默认正常级别 if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
// 默认拒绝策略 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { }
/** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ // public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 参数不允许设置小于0 或则是 最大线程数小于核心线程数 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); // 参数不允许设置 null if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); // 赋值到全局私有变量 this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
复制代码

4 个必传参数,corePoolSize maximumPoolSize workQueue keepAliveTime,同时 RejectedExecutionHandler 是 AbortPolicy 策略模式。

顺路提一下,线程池拒绝策略有 4 种。 


AbortPolicy


/** * A handler for rejected tasks that throws a * {@link RejectedExecutionException}. * * This is the default handler for {@link ThreadPoolExecutor} and * {@link ScheduledThreadPoolExecutor}. */ public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { }
/** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
复制代码

当前任务无法进入工作队列,触发拒绝策略,直接抛出异常。

CallerRunsPolicy

    /**     * A handler for rejected tasks that runs the rejected task     * directly in the calling thread of the {@code execute} method,     * unless the executor has been shut down, in which case the task     * is discarded.     */    public static class CallerRunsPolicy implements RejectedExecutionHandler {        /**         * Creates a {@code CallerRunsPolicy}.         */        public CallerRunsPolicy() { }
/** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 是否关闭 if (!e.isShutdown()) { r.run(); } } }
复制代码

只要不关闭,就不会丢弃任务。

DiscardOldestPolicy

    /**     * A handler for rejected tasks that discards the oldest unhandled     * request and then retries {@code execute}, unless the executor     * is shut down, in which case the task is discarded.     */    public static class DiscardOldestPolicy implements RejectedExecutionHandler {        /**         * Creates a {@code DiscardOldestPolicy} for the given executor.         */        public DiscardOldestPolicy() { }
/** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 不是关闭状态 if (!e.isShutdown()) { // 移除队列头部 e.getQueue().poll(); // 执行当前任务 e.execute(r); } } }
复制代码

丢弃队列最前面的任务,然后执行当前被线程池拒绝的任务。

DiscardPolicy


/** * A handler for rejected tasks that silently discards the * rejected task. */ public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { }
/** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
复制代码

什么都不做,既不抛错、也不操作工作队列,静静的丢弃任务。

小结

总结下这 4 个线程池拒绝策略。

策略作用个人思考以及建议 AbortPolicy 不能提交任务后,抛出异常,并丢失任务比较重要的业务使用这个,可以快速定位到那个任务丢失 CallerRunsPolicy 只要不关闭,就不会丢弃任务必须要让所有任务都执行的业务,可以选择这个 DiscardOldestPolicy 丢弃队列最前面的任务,然后执行当前被线程池拒绝的任务使用这个,必须要抛弃之前的任务,感觉有点鸡肋,慎重使用 DiscardPolicy 静静的丢弃任务不是重要的计算业务,可以考虑这个

执行过程

在上面的 实例 ,执行的方法是 execute 方法。接下来我们解析剖析这个函数。

    /**     * Executes the given task sometime in the future.  The task     * may execute in a new thread or in an existing pooled thread.     *     * If the task cannot be submitted for execution, either because this     * executor has been shutdown or because its capacity has been reached,     * the task is handled by the current {@link RejectedExecutionHandler}.     *     * @param command the task to execute     * @throws RejectedExecutionException at discretion of     *         {@code RejectedExecutionHandler}, if the task     *         cannot be accepted for execution     * @throws NullPointerException if {@code command} is null     */    public void execute(Runnable command) {        // 任务没null,直接抛出空指针异常        if (command == null)            throw new NullPointerException();        /*         * Proceed in 3 steps:         *         * 1. If fewer than corePoolSize threads are running, try to         * start a new thread with the given command as its first         * task.  The call to addWorker atomically checks runState and         * workerCount, and so prevents false alarms that would add         * threads when it shouldn't, by returning false.         *         * 2. If a task can be successfully queued, then we still need         * to double-check whether we should have added a thread         * (because existing ones died since last checking) or that         * the pool shut down since entry into this method. So we         * recheck state and if necessary roll back the enqueuing if         * stopped, or start a new thread if there are none.         *         * 3. If we cannot queue task, then we try to add a new         * thread.  If it fails, we know we are shut down or saturated         * and so reject the task.         */         // 这里应该是将工作队列任务数量设置到高位        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            // 添加进工作队列,true 则使用核心线程来绑定            // 这里貌似是直接分配任务下去执行了,直接使用核心线程数            if (addWorker(command, true))                return;            c = ctl.get();        }
// 是运行状态,同时将任务,添加至任务队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 不是运行状态,移除任务 if (! isRunning(recheck) && remove(command)) // 触发线程池运行策略 reject(command); // 其他任务都执行完毕 else if (workerCountOf(recheck) == 0) // 增加执行线程的工作对象 // 不是首次执行任务,使用最大线程数 addWorker(null, false); } // 无法添加工作线程对象,触发线程池拒绝策略 else if (!addWorker(command, false)) reject(command); }

/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ // 添加工作任务 private boolean addWorker(Runnable firstTask, boolean core) { retry: // 这里的 C 应该是 bit 高位的运行状态 for (int c = ctl.get();;) { // Check if queue empty only if necessary. // 检查是否是结束、或者是停止状态,必须是运行状态 // 首次的任务不是 null、且队列是 null,不需要添加进工作队列 if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) // 添加失败 return false;

for (;;) { // core 等于 true 的时候 // 比较的是 corePoolSize,是否超过工作数量 // 反之,比较的 maximumPoolSize if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) // 超过无法添加 return false; // 增加工作数量 if (compareAndIncrementWorkerCount(c)) // 增加成功后,结束整个循环 break retry; // 重新获取增加一后的工作数量 c = ctl.get(); // Re-read ctl // 检查是否处于关闭状态 if (runStateAtLeast(c, SHUTDOWN)) // 没有处于关闭状态、跳过循环 continue retry; // else CAS failed due to workerCount change; retry inner loop } }
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 初始化第一个工作任务 w = new Worker(firstTask); final Thread t = w.thread; // 开始执行程序 if (t != null) { final ReentrantLock mainLock = this.mainLock; // 使用 ReentrantLock 来加锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); // 这里依然判断是否是运行状态 if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { // 判断线程是否正在处于活跃状态 if (t.isAlive()) // precheck that t is startable // 是活跃状态,这说明有其他任务正在利用这个线程 // 所以直接抛出错误 throw new IllegalThreadStateException(); // 添加进 workers,这个任务执行者 workers.add(w); int s = workers.size(); // 设置当前最大线程数 if (s > largestPoolSize) largestPoolSize = s; // 执行工作添加成功 workerAdded = true; } } finally { // 无论有没有成功,都要释放锁,避免产生死锁 mainLock.unlock(); } // 执行工作添加成功 if (workerAdded) { // 线程启动 t.start(); // 执行工作正式工作状态 workerStarted = true; } } } finally { // 可能由于抛出导致 workerStarted 还是为 false // 则重新回滚在试试 if (! workerStarted) addWorkerFailed(w); } // 执行工作线程状态 return workerStarted; }
/** * Rolls back the worker thread creation. * - removes worker from workers, if present * - decrements worker count * - rechecks for termination, in case the existence of this * worker was holding up termination */ private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; // 加锁 mainLock.lock(); try { if (w != null) // 移除执行执行工作 workers.remove(w); // 减一操作 decrementWorkerCount(); // 尝试进行中断 tryTerminate(); } finally { mainLock.unlock(); } }
/** * Transitions to TERMINATED state if either (SHUTDOWN and pool * and queue empty) or (STOP and pool empty). If otherwise * eligible to terminate but workerCount is nonzero, interrupts an * idle worker to ensure that shutdown signals propagate. This * method must be called following any action that might make * termination possible -- reducing worker count or removing tasks * from the queue during shutdown. The method is non-private to * allow access from ScheduledThreadPoolExecutor. */ final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) return; // 工作线程数不等于 0 if (workerCountOf(c) != 0) { // Eligible to terminate // 打断工作线程 interruptIdleWorkers(ONLY_ONE); return; }
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 设置当前生命周期状态 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 终止 terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); // 唤醒所有线程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
复制代码

线程池的原理就是,通过一个叫 workers 的 Hashset 添加所有工作线程,且不能大于最大线程数 maximumPoolSize 。这个 worker 的对象保存着线程对象,以及任务对象。

执行的时候,直接分配下去,当超过核心线程数 corePoolSize 的时候,任务则会去到工作队列 workQueue 等待空余的线程调用任务,否则直接执行。。

如果超过最大线程数 maximumPoolSize 或者线程池的生命周期不是处于 RUNNING,都会触线程池的拒绝策略。

另外添加进 workers 的这个 Hashset 都是加上了 ReentrantLock 的所有,这意味着在获取子线程的操作都是线程安全的,当前只有一个对象获取到这个线程。如果有其他任务使用到了这个线程,直接抛出。不在执行任何操作。

总结

总结以下,看了 ThreadPoolExecutor 的体会。

  1. ThreadPoolExecutor 线程安全类。当前的线程必须是没有执行过任何任务,另外 ThreadPoolExecutor 类在请求子线程的都加了锁,其他任务无法在请求到这个线程。线程的局部变量只有该当前任务才能使用到,所以线程安全。

  2. corePoolSize 是保留线程的数量,如果设置过高,很容易闲置了线程资源或者说是限制了 CPU 资源。要具体看情况设置。

  3. 线程池的拒绝策略,DiscardOldestPolicy 要慎重使用。

  4. 在使用线程池的时候,不需要考虑使用的容器是否是线程安全了、或者这个类线程安不安全。

声明

作者: Sinsy

本文链接:https://blog.sincehub.cn/2021/03/14/TheadPoolExecutor/

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文声明。 如您有任何商业合作或者授权方面的协商,请给我留言:550569627@qq.com

引用


发布于: 2021 年 03 月 15 日阅读数: 13
用户头像

sinsy

关注

还未添加个人签名 2019.10.18 加入

公众号:编程的那些年 个人博客网站:https://blog.sincehub.cn/

评论

发布
暂无评论
jdk 源码系列之 TheadPoolExecutor