JAVA 原生线程池源码解析及使用建议( 程序员必看!)
线程池概念
0 1
线程池的基本概念
线程池(Thread pool)是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。(来源:维基百科)
Java 线程池相关类是在 1.5 新增的,所属包是 rt.jar,包路径是 java.util.concurrent,作者是:Doug Lea,从属 JSR-166。
Java 线程池也遵循线程池的核心设计思路,复用线程,降低线程创建销毁的资源消耗,提供了多种线程池的实现模型,同时也允许开发者定制化开发其他特色线程池。
0 2
java 线程池优势
A) 降低资源消耗,提升效率 :通过重复利用已创建的线程,降低线程创建和销毁造成的消耗,从而提高整体的执行效率。
B) 提高线程的管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
C) 可扩展的开发模式:除 JVM 提供的三种线程池,可以通过实现 AbstractExecutorService 类定制自己的线程池而支持不同的业务场景。
(这里很多资料都会有额外一条:提高响应速度,即通过复用以创建好的线程,而无需等待新线程的创建,这里个人认为严格讲这一条不完全符合线程池的使用情况,而且复用线程和第一条优势基本吻合,所以合并到一起,至于特殊场景后面学习的过程中会详细讲解。)
0 3
java 原生线程池简介
首先介绍下 java 原生的三个线程池,ForkJoinPool,ThreadPoolExecutor,ScheduledThreadPoolExecutor。三个线程池的 UML 类图如下(version:1.8.0_131,后面的源码都是基于这个版本)。
A) ForkJoinPool 是 Java 1.7 引入的一种新的并发框架,核心思想是将大的任务拆分成多个小任务(fork),然后在将多个小任务处理汇总到一个结果上(join),充分利用多 cpu,多核 CPU 的优势,引入了“work-stealing”机制,更有效的利用线程。
B) ThreadPoolExecutor 是 java 常用的线程池,提供基础的线程池功能。初始化传入不同类型的工作队列和拒绝策略参数,可以定制不同类型和功能的线程池,应用最为广泛。
C) ScheduledThreadPoolExecutor 从类图上可以看出,它继承了 ThreadPoolExecutor,并实现了 ScheduledExecutorService,是对 ThreadPoolExecutor 做的功能扩展,本质上是一个使用线程池执行定时任务的类,可以用来在给定延时后执行异步任务或者周期性执行任务,较任务调度的 Timer 来说,其功能更加强大。
线程池的实现原理
接下来以 java 的基础线程池 ThreadPoolExecutor 为主介绍下线程池的工作原理和实现方式。
0 1
ThreadPoolExector 类
2.1.1 ThreadPoolExector 介绍
A) ThreadPoolExecutor 实现的顶层接口是 Executor,内部只有一个方法 execute(Runable),标识出执行任务这个核心方法。限制了任务类型为:Runable,即线程的接口类。
B)ExecutorService 接口扩展了很多能力,比如对线程池的管理。以及扩展了执行任务的能力,支持多个任务批量执行。
C)AbstractExecutorService 是对 ExecutorService 抽象类,这里对任务的执行和调用做了基础的实现,可以看出目前都是在对任务的执行做层层抽象,也规范了任务的基础类型。
D)ThreadPoolExecutor 是 java 原生线程池的一个基础实现类,完成了线程池的各种功能,内部维护了存储任务的阻塞队列,以及执行任务的 worker 线程,还有线程池的相关状态管理及任务管理。同时提供了一些扩展方法。供开发者定制特色能力。
2.1.2 线程池的基础参数
接下来说一下创建 ThreadPoolExecutor 比较重要的参数。
corePoolSize:线程池核心线程个数。
queue:用于保存等待执行的任务的阻塞队列;如基于数组的有界 ArrayBlockingQueue,基于链表的无界 LinkedBlockingQueue,优先级队列 PriorityBlockingQueue 等。
maximunPoolSize:线程池最大线程数量。
ThreadFactory:创建线程的工厂。可以自定义工工厂,控制产生的线程名称辅助排查问题。
RejectedExecutionHandler:饱和策略,当队列满了且线程个数到 达 maximunPoolSize 后采取的策略,如 AbortPolicy 抛出拒绝执行;DiscardPolicy 丢弃该任务。
keeyAliveTime:存活时间。如果当前线程池中的线程数量比核心线程数量要多,并且是闲置状态的话,这些闲置的线程能存活的最大时间。
TimeUnit,存活时间的时间单位。
2.1.3 ThreadPoolExecutor.Worker 内部类
ThreadPoolExecutor.Worker 这个工作线程,实现了 Runnable 接口,同时继承 AQS 类,并持有一个线程 thread,一个初始化的任务 firstTask。负责处理任务,同时维护工作线程的状态。
thread 是在调用构造方法时通过 ThreadFactory 来创建的线程,用来执行任务。
firstTask 是传入的第一个任务,如果非空,那么线程在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是 null,那么就需要创建一个线程去执行 workQueue 中的任务,也就是非核心线程的创建。单个任务执行完毕后,worker 会继续在 workQueue 中获取下一个任务继续执行。
0 2
ThreadPoolExecutor 工作流程
2.2.1 总体介绍
线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分: 任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的执行策略。主要有以下几种:
(1)直接申请线程执行该任务;
(2)缓冲到队列中等待线程执行;
(3)拒绝该任务,执行拒绝策略。
线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。
2.2.2 状态变化
ThreadPoolExector 内部使用 AtomicInteger 类型的变量 ctl 来维护线程池的状态和线程池中的线程数量,即高三位为状态位,其余表示线程数目。
每种状态均可以通过调用不同的方法完成相应的状态转化。
1. RUNNING(-1 : 111):可以接受新的任务,也可以处理阻塞队列里的任务。
2. SHUTDOWN(0 : 000) :不接受新的任务,但是可以处理阻塞队列里的任务。
3. STOP(1 : 001):不接受新的任务,不处理阻塞队列里的任务,中断正在处理的任务。
4. TIDYING(2 : 010):过渡状态,也就是说所有的任务都执行完了,当前线程池已经没有有效的线程,这个时候线程池的状态将会 TIDYING,并且将要调用 terminated 方法。
5. TERMINATED(3: 011) :终止状态。terminated 方法调用完成以后的状态。
2.2.3 源码解析-execute
Execute 是提交任务的方法入口方法,根据核心线程池的数量、线程池的状态,任务队列大小、最大数量、分成不同情况,创建不同线程,存储到不同位置,或执行拒绝策略。
2.2.4 源码解析-addWorker
addWorker 是添加工作线程的方法,通过 Worker 内部类封装一个 Thread 实例维护工作线程的执行,同时根据线程池的状态来判断是否增加相应的任务。
2.2.5 源码解析-runWorker
runWorker 真正执行任务的地方,先执行第一个任务,再源源不断从任务队列中取任务来执行;如果线程池调用了 shutDownNow,这里也会收到影响。
2.2.6 源码解析-getTask
从队列取任务的地方,默认情况下,根据工作线程数量与核心数量的关系判断使用队列的 poll()还是 take()方法,keepAliveTime 参数也是在这里使用的。
2.2.7 任务队列
阻塞队列 BlockingQueue 是用来存放任务的。当线程池中有空闲线程时就回去任务队列中拿任务并处理。
多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。
常见的集中队列有:
无界队列: 使用无界队列(如 LinkedBlockingQueue)将所有的任务都存储到阻塞队列中。这样,创建的线程就不会超过 corePoolSize。
有界队列: 当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue, PriorityBlockingQueue)有助于防止资源耗尽, PriorityBlockingQueue 还可以定制任务的优先级。但是需要开发人员根据实际任务情况调整队列大小和线程池大小。
同步移交队列,如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用 SynchronousQueue 作为等待队列。SynchronousQueue 不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入 SynchronousQueue 中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。
2.2.8 拒绝策略
1. CallerRunsPolicy 由调用线程执行该任务,不抛弃任务,会影响效率和性能。
2. AbortPolicy 抛出拒绝执行的异常,java 线程池的默认拒绝策略,保证线程池整体的执行效率。
3. DiscardPolicy 丢弃该任务,不做任何处理,同时也不抛异常。使用中不太友好。
4. DiscardOldestPolicy 如果线程池未关闭,则弹出任务队列的第一个,然后运行。
以上是默认提供的四种拒绝策略。除此之外还有一些其他框架中的值得参考的处理。
AbortPolicyWithReport 这是 dubbo 中的拒绝策略,继承了 AbortPolicy 拒绝策略。但是在抛出异常前,打印了当前线程池的重要参数信息,以及运行状态,同时定制输出了此时的堆栈信息。方便后续排查问题处理。
线程池的应用场景
0 1
Tomcat 中的线程池
Tomcat 作为一款优秀的 web 服务器,为了保证其性能,其内部也有自己的线程池对象:org.apache.tomcat.util.threads.ThreadPoolExecutor 继承自 java.util.concurrent.ThreadPoolExecutor。
不同于原生 ThreadPoolExecute 达到最大线程后,对新增任务立即执行拒绝策略。Tomcat 线程池会在此时再次尝试向队列中添加任务,失败后再执行拒绝策略。最大限度保证任务执行。
同时 Tomcat 内置了 TaskQueue 作为任务的缓存队列。继承了 LinkedBlockingQueue 但是重写了 offer 方法,即当前线程大于核心线程,且提交的任务数大于当前线程数,表示有线程空闲的情况下,返回 false,也就是创建线程。主要是为了控制在线程队列无限增长时,无法创建更多的线程而达到最大线程数的问题。
0 2
Sirector 中的线程池
Sirector 是 JD 内一个事件处理编排框架,内置 ExecutorService 对象,负责对任务的分配处理。
初始化的对象是 WorkerExecutor, WorkerExecutor 继承自 ThreadPoolExecutor,扩展了 submit 方法用于执行通过 sirector 编排的具体任务。
WorkerExecutor 内置了工厂 WorkerThreadFactory 主要记录了当前线程池的名称、工厂创建的线程数目等。
使用的拒绝策略为 RejectedTaskController,继承自 RejectedExecutionHandler,处理方法类似于 AbortPolicy 策略,丢弃任务抛出异常,抛出异常前也打印了一些异常信息,辅助排查问题。
0 3
个人开发中的线程池
参考上面几种框架的线程池,可以得出大概几点结论。
如果个人开发中涉及线程池,要先确认任务场景,是 I/O 密集还是 CPU 密集任务,从而确定线程池类型。
再通过使用场景,是最大限度保证任务执行,还是为了保证服务性能,来定制自己的执行策略,并且确定选择任务队列以及拒绝策略。拒绝策略可以参考 Dubbo 中,同时打印线程信息,辅助排查问题。
然后确定是否需要自定义线程工厂,这里建议自定义线程工厂,在创建线程的时候打上标识,和系统线程加以区分。
在根据任务类型,配置上合理的线程池参数。一个属于你的线程池就搭好了!
线程池参数设置
这里个人认为,没有一种万能的参数一定适合所有的线程池使用场景。
但是有通用的思路来寻找适合当前线程池的最佳参数。
1、确定当前任务类型,是 CPU 密集还是 I/O 密集型任务。这两者差别很大。CPU 密集和 CPU 核数以及 CPU 超线程有关。而 I/O 密集则和服务处理的任务有很大关联。
2、如果使用一些已有的技术框架中的线程池。初期建议以默认参数为佳,如 Tomcat 默认范围 25-200,JSF 默认 cached 线程池 20-200。
3、在服务稳定之后的性能调优。需要对服务进行多次高保真压测,期间不断控制、调整线程池参数,这样尽可能得到当前服务最优的线程池参数。
4、只有最合适的、没有一定不变的,随着业务不断迭代,每隔一段时间对服务进行压测,通过结果调整相应的参数。
线程池使用过程中的建议
1 、当提交一个任务到线程池时,若线程数量 < corePoolSize,线程池会创建一个新线程放入 workers(一个 HashSet)中执行任务, 即使其他空闲的基本线程能够执行新任务也还是会创建新线程,直至达到 corePoolSize。
2 、默认最初的线程池启动的时候是不初始化线程的,通过调用 prestartAllCoreThreads 方法,可以初始化所有核心线程。
3 、Worker 中处理 task 如果抛出异常,这个 work thread 不会继续执行任务,但是会创建新的线程, 新线程可以运行其他 task。
4 、最好不要使用 Executors 创建新线程池,因为 Executors 提供的很多方法,没有指定实际核心及最大线程池参数,容易发生 OOM,推荐自己创建相应的线程池,适合自己的才是最好的,同时线程池中有很多钩子方法可以用来定制特色功能。
评论