写点什么

ThreadPoolExecutor 源码解读(一)重新认识 ThreadPoolExecutor(核心参数、生命周期、位运算、ThreadFactory、拒接策略)

用户头像
徐同学呀
关注
发布于: 2021 年 04 月 17 日

一、前言

在 Java 中,创建一个线程new Thread,就像创建一个对象一样简单,但实际上创建线程远不是创建一个对象那么简单。创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,并且要为线程分配一系列的资源,所以线程是一个重量级的对象,应该避免频繁创建和销毁。线程池的出现就是为了管理线程,让线程复用。


阿里巴巴 java 开发手册中有一条强制的编程规约:


【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样

的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。


既然推荐通过ThreadPoolExecutor 的方式使用线程池,则有必要了解一些底层原理,如:线程池的几个核心参数,提交任务的过程,任务是如何在线程池中运行,线程是如何复用,线程池的状态流转等。了解了这些原理知识,不仅可以轻松应对面试,更重要的是在实际工作中更好、更正确的使用线程池;同时如果出了 bug,懂些原理也能快速排查问题所在。

二、提交任务涉及到的核心参数

public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue<Runnable> workQueue,                          ThreadFactory threadFactory,                          RejectedExecutionHandler handler)
复制代码


从 ThreadPoolExecutor 构造函数来看,不得不说的几个参数,核心线程数corePoolSize、最大线程数maximumPoolSize、工作队列workQueue、线程工厂threadFactory以及拒绝策略RejectedExecutionHandler,他们之间有着千丝万缕的关系:


  1. 当创建的线程数小于核心线程数corePoolSize时,提交任务会继续创建新线程执行任务。

  2. 当创建的线程数大于等于corePoolSize时,此时再提交任务将被添加到工作队列workQueue中。

  3. 当工作队列workQueue已满,此时再提交任务会创建新线程,触发第二个阈值的判断maximumPoolSize

  4. 当创建的线程数大于等于最大线程数maximumPoolSize时,此时再提交任务将触发拒绝策略RejectedExecutionHandler


白纸黑字总是苍白的,如下是提交任务至线程池的流程图:



除了具有主角光环的参数外,还有几个参数决定着工作线程的生死存亡。keepAliveTime决定非核心线程数的线程的存活时长。当线程池中创建的线程数量超过设置的 corePoolSize,在某些线程处理完任务后,如果等待 keepAliveTime 时间,仍然没有新的任务分配给它们,那么这些线程就属于空闲线程,将会被回收。线程池回收线程,没有所谓的“核心线程”和“非核心线程”之分,直到线程池的线程数等于最小核心线程数corePoolSize,回收才会停止。


看样子线程池一定会有小于等于corePoolSize数量的线程一直存活,这样如果这个线程池是非核心线程池,一直占用着线程势必会影响到核心线程池的运行,所以核心线程数内的线程也有被回收的需求。


在创建线程池时,构造函数中并没有显式设置核心线程数内的线程过期回收的参数,但是可以通过调用allowCoreThreadTimeOut(true)方法将属性allowCoreThreadTimeOut设置为 true,从而使得核心线程数内的线程空闲等待keepAliveTime 时间后,依然没有任务分配时被回收。


public void allowCoreThreadTimeOut(boolean value) {    if (value && keepAliveTime <= 0)        throw new IllegalArgumentException("Core threads must have nonzero keep alive times");    if (value != allowCoreThreadTimeOut) {        allowCoreThreadTimeOut = value;        if (value)            interruptIdleWorkers();    }}
复制代码

三、线程池的生命周期

世间万物都有生死轮回,线程池也不例外,它也有自己的生命周期。而巧妙的是,作者 Doug Lea 用一个 32 位的 int 变量表示两种含义:高 3 位表示线程池的运行状态,低 29 位表示工作线程数。


//一个ctl 表示两种含义,高3位为runState,低29为workerCountprivate final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;//536870911//0001 1111 1111 1111 1111 1111 1111 1111private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits//-536870912//111 0 0000 0000 0000 0000 0000 0000 0000private static final int RUNNING = -1 << COUNT_BITS;//0//000 0 0000 0000 0000 0000 0000 0000 0000private static final int SHUTDOWN = 0 << COUNT_BITS;//536870912//001 0 0000 0000 0000 0000 0000 0000 0000private static final int STOP = 1 << COUNT_BITS;//1073741824//010 0 0000 0000 0000 0000 0000 0000 0000private static final int TIDYING = 2 << COUNT_BITS;//1610612736//011 0 0000 0000 0000 0000 0000 0000 0000private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl//从变量ctl中解析出runState//先将CAPACITY做按位非操作,即~n = - ( n+1 ),就是 RUNNING//然后再做按位与,可得出高3位private static int runStateOf(int c) { return c & ~CAPACITY; }//从变量ctl中解析出workerCount//对CAPACITY按位与,可得出低29位//1 & 1 = 1,1 & 0 = 0,0 & 1 = 0,0 & 0 = 0private static int workerCountOf(int c) { return c & CAPACITY; }//将rs和wc转为二进制 再进行按位或计算,位上只要有1就是该位就是1// 1 & 1 = 1,1 & 0 = 1,0 & 1 = 1,0 & 0 = 0private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码


作者是如何将一个 32 位 int 变量ctl表示为两个含义的呢?这就涉及到二进制数的位运算:


  • CAPACITY = (1 << COUNT_BITS) - 1,1 转为二进制数0000 0000 0000 0000 0000 0000 0000 0001向左移 29 位,得到001 0 0000 0000 0000 0000 0000 0000 0000,这个过程相当于 1*2^29,但是此时得到的是高 3 位的第一位(最小值),在其基础上减 1 就是低 29 位的最大值了,得到0001 1111 1111 1111 1111 1111 1111 1111,将其设置为线程池的容量CAPACITY

  • RUNNING = -1 << COUNT_BITS,创建线程池后,线程池就处于正在运行状态RUNNING,其是-1 向左移 29 位,-1 的二进制是1111 1111 1111 1111 1111 1111 1111 1111,向左移 29 位后剩下 3 个 1,低 29 位补 0,得到1110 0000 0000 0000 0000 0000 0000 0000,正好 1 占满了高 3 位。(-1 的二进制是 1 的补码,原码取反+1 就是补码,如 1 的原码是0000 0000 0000 0000 0000 0000 0000 0001,取反后是1111 1111 1111 1111 1111 1111 1111 1110,再加 1 就是1111 1111 1111 1111 1111 1111 1111 1111。)

  • SHUTDOWN = 0 << COUNT_BITS,当调用shutdown(),线程池进入SHUTDOWN状态,SHUTDOWN是 0 向左移 29 位依然是 0。

  • STOP = 1 << COUNT_BITS,当调用shutdownNow(),线程池进入STOP状态,1 左移 29 位得到001 0 0000 0000 0000 0000 0000 0000 0000

  • TIDYING = 2 << COUNT_BITSTIDYING是一个过渡状态,当线程池调用shutdown()shutdownNow()后,当线程池中没有正在运行的线程且工作队列为空,此时设置线程池状态为TIDYING。2 左移 29 位得到010 0 0000 0000 0000 0000 0000 0000 0000

  • TERMINATED = 3 << COUNT_BITSTERMINATED才是代表线程池真正的寿终正寝。3 左移 29 位得到011 0 0000 0000 0000 0000 0000 0000 0000


还有三个方法才是将两个含义揉捻成一个变量,又分别拆出两个含义:


  • ctlOf(int rs, int wc) { return rs | wc; },运行状态rs和工作线程数wc,二者进行按位或|计算合成一个变量ctl。(|操作,1 & 1 = 1,1 & 0 = 1,0 & 1 = 1,0 & 0 = 0,二者比较,只要位上有 1,该位就是 1。)

  • runStateOf(int c) { return c & ~CAPACITY; },从中拆出运行状态。先对CAPACITY做按位非~操作,即~CAPACITY = - ( CAPACITY+1 ),就是 RUNNING。然后按位与&操作,可得高 3 位。(RUNNING高 3 位都是 1,低 29 位都是 0,所以&运算后,1 只会出现在高 3 位,故而可得高 3 位)

  • workerCountOf(int c) { return c & CAPACITY; },从中拆出工作线程数。对CAPACITY按位与&,可得低 29 位。(因为CAPACITY低 29 位都是 1,高 3 位都是 0,所以&运算后,1 只会出现在低 29 位,故而可得低 29 位)


&操作,1 & 1 = 1,1 & 0 = 0,0 & 1 = 0,0 & 0 = 0,二者比较,位上都是 1,该位才是 1。)


如图所示线程池的生命周期流转图:


四、ThreadFactory 如何创建工作线程

创建线程池时可以不传ThreadFactory,此时会给一个默认线程工厂Executors.defaultThreadFactory(),而它究竟是怎样生产工作线程的呢?


DefaultThreadFactory实现了接口ThreadFactory,其主要做了 3 件事:


  • 创建工作线程,并设置分组和命名。

  • 工作线程是守护线程时,将线程设置为非守护线程。

  • 设置工作线程默认优先级NORM_PRIORITY


public interface ThreadFactory {    Thread newThread(Runnable r);}//java.util.concurrent.Executors.DefaultThreadFactorystatic class DefaultThreadFactory implements ThreadFactory {    private static final AtomicInteger poolNumber = new AtomicInteger(1);    private final ThreadGroup group;    private final AtomicInteger threadNumber = new AtomicInteger(1);    private final String namePrefix;
DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; }
public Thread newThread(Runnable r) { //创建线程时,设置分组和命名 Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }}
复制代码


用户可以自行实现ThreadFactory接口,设计特殊的线程工厂。

五、四种官方拒接策略

当工作线程数大于等于maximumPoolSize时,此时再提交任务将会触发拒绝策略。


创建线程池时也可不传RejectedExecutionHandler,此时会给一个默认的拒绝策略AbortPolicy


private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
复制代码


AbortPolicy总是会抛出一个RejectedExecutionException异常,再无其他操作。


public static class AbortPolicy implements RejectedExecutionHandler {    public AbortPolicy() { }
/** * Always throws RejectedExecutionException. */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }}
复制代码


另外官方还提供了 3 种拒绝策略:


  1. CallerRunsPolicy 直接调用run()执行任务代码。


public static class CallerRunsPolicy implements RejectedExecutionHandler {        public CallerRunsPolicy() { }        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        if (!e.isShutdown()) {            r.run();        }    }}
复制代码


  1. DiscardPolicy 什么也不做。


public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }}
复制代码


  1. DiscardOldestPolicy 丢弃工作队列中的下一个任务(排在 head 的任务),也是最老的任务,并调用 execute()处理新任务。


public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }}
复制代码


除了官方提供的四个拒绝策略外,用户还可以自行实现RejectedExecutionHandler接口设计特殊的拒绝策略。

六、总结

  • 【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

  • 线程数小于corePoolSize时,提交任务会继续创建线程,大于等于corePoolSize时,提交任务将加到工作队列中。

  • 当工作队列已满,此时提交任务会创建线程,直到线程数达到maximumPoolSize,再提交任务触发拒绝策略。

  • 默认情况下线程池中只会保留小于等于corePoolSize数量的线程,多余corePoolSize的空闲线程会等待keepAliveTime时间后,依然没有任务分配,则被回收。

  • 可以将allowCoreThreadTimeOut设置为 true,使得corePoolSize数量内的线程,空闲等待keepAliveTime时间后,依然没有任务分配,则被回收。

  • ctl高 3 位表示线程池运行状态,低 29 位表示工作线程数。

  • ThreadFactoryRejectedExecutionHandler都可自定义。


PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!

发布于: 2021 年 04 月 17 日阅读数: 14
用户头像

徐同学呀

关注

公众号:徐同学呀 2018.09.24 加入

专注于源码分析及Java底层架构开发领域。持续改进,坦诚合作!我是徐同学,愿与你共同进步!

评论

发布
暂无评论
ThreadPoolExecutor源码解读(一)重新认识ThreadPoolExecutor(核心参数、生命周期、位运算、ThreadFactory、拒接策略)