写点什么

☕【Java 技术之旅】走进线程池的世界(基础篇)

发布于: 2021 年 05 月 14 日
☕【Java技术之旅】走进线程池的世界(基础篇)

本章内容属于线程原理分析专题的基础篇,之后还有源码篇和深入篇。

🚀 前提概要

线程的创建和销毁对操作系统来说都是比较重量级的操作,所以通过线程池可以大大的减少操作系统为创建线程所消耗的成本和资源。它的作者是:Doug Lea,我们使用的时候是非常方便,但也可能会因为不了解其具体实现,对线程池的配置参数存在误解。

🚀 线程池定义

线程池(ThreadPoolExecutor)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如 MySQL。线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。

线程池维护多个线程等待监督管理者分配可并发执行的任务一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

🚀 线程池优点

而本文描述线程池是 JDK 中提供的 ThreadPoolExecutor 类。

当然,使用线程池可以带来一系列好处:

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。

  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。

  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。

  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池 ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

🚀 线程池的运作流程

📚 创建线程任务

  1. 当一个任务被提交后,线程池首先检查正在运行的线程数是否达到核心线程数,如果未达到则创建一个线程。

  2. 如果线程池内正在运行的线程数已经达到了核心线程数,任务将会被放到 BlockingQueue 内。

  3. 如果 BlockingQueue 已满,线程池将会尝试将线程数扩充到最大线程池容量。

  4. 如果当前线程池内线程数量已经达到最大线程池容量,则会执行拒绝策略拒绝任务提交。


整体流程如下图所示:

流程描述没有问题,但如果某些点未经过推敲,容易导致误解,而且描述中的情境太理想化,如果配置时不考虑运行时环境,也会出现一些非常诡异的问题。

📚 线程池的构造器

使用线程池离不开 ThreadPoolExecutor 类,该类实现了 ExecutorService 接口,其构造方法如下:

public ThreadPoolExecutor(int corePoolSize,	                          int maximumPoolSize,	                          long keepAliveTime,	                          TimeUnit unit,	                          BlockingQueue<Runnable> workQueue,	                          ThreadFactory threadFactory,	                          RejectedExecutionHandler handler);
复制代码
参数说明如下
  • corePoolSize:核心池大小

  • maximumPoolSize:线程池大小(maximumPoolSize >= corePoolSize)

  • keepAliveTime:没有任务时线程的存活时间,默认情况下只有线程数目大于 corePoolSize 时,此参数才起作用,若线程数目等于 corePoolSize,则这些线程会一直存活。但若调用 allowCoreThreadTimeOut(boolean)方法,则线程数目不大于 corePoolSize 时,此参数也起作用,直到线程数目为 0

  • unit:keepAliveTime 的时间单位,有以下七种取值:

  • TimeUnit.DAYS;//天

  • TimeUnit.HOURS;//小时

  • TimeUnit.MINUTES;//分钟

  • TimeUnit.SECONDS;//秒

  • TimeUnit.MILLISECONDS;//毫秒

  • TimeUnit.MICROSECONDS;//微秒

  • TimeUnit.NANOSECONDS;//纳秒

  • workQueue:指定构成缓冲区的阻塞队列,即指定了线程池的排队策略,常用的有以下三种:

  • ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小

  • LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为 Integer.MAX_VALUE

  • synchronousQueue:这个队列不会保存提交的任务,而是将直接新建一个线程来执行新来的任务

  • threadFactory(可选):线程工厂,用来创建线程,可自定义

  • handler(可选):拒绝策略,有以下四种策略可选:

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常。

  • ThreadPoolExecutor.DiscardPolicy:丢弃任务,不抛出异常。

  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务

  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

🚀 线程池的运作原理

📚 核心线程池

线程池内线程数量小于等于 coreSize 的部分我称为核心线程池,核心池是线程池的常驻部分,内部的线程一般不会被销毁,我们提交的任务也应该绝大部分都由核心池内的线程来执行。

📚 创建过程

📚 线程初始化

默认情况下,线程池构造完成后是没有线程的,需要等任务提交时才会创建线程,如果需要在线程池构造完成时就创建线程,可以调用以下两个方法:

prestartCoreThread():初始化一个核心线程;

prestartAllCoreThreads():初始化所有核心线程

📚 任务提交

任务提交有两种方法,execute()和 submit()

void execute(Runnable task),无返回值

Future<T>submit(Runnable task, T result)

Future<T>submit(Callable<T> task),有返回值

使用 jstack 打印出线程栈并数一下线程池内线程数量,会发现线程池内的线程数会随着任务的提交而逐渐增大,直到达到 coreSize。因为核心池的设计初衷是想它能作为常驻池,承载日常流量,所以它应该被尽快初始化,于是线程池的逻辑是在没有达到 coreSize 之前,每一个任务都会创建一个新的线程,对应的源码为:

  public void execute(Runnable command) {  if (command == null)    throw new NullPointerException();  int c = ctl.get();//获取ctl  //检查当前核心线程数,是否小于核心线程数的大小限制  if (workerCountOf(c) < corePoolSize) {    //没有达到核心线程数的大小限制,那么添家核心线程执行该任务    if (addWorker(command, true))      return;    //如果添加失败,刷新ctl值    c = ctl.get();  }  //再次检查线程池的运行状态,将任务添加到等待队列中  if (isRunning(c) && workQueue.offer(command)) {    int recheck = ctl.get();//刷新ctl值    //如果当前线程池的装不是运行状态,那么移除刚才添加的任务    if (! isRunning(recheck) && remove(command))      reject(command);//移除成功后,使用拒绝策略处理该任务;    else if (workerCountOf(recheck) == 0)      //当前工作线程数为0      //线程池正在运行,或者移除任务失败。      //添加一个非核心线程,并不指定该线程的运行任务。      //等线程创建完成之后,会从等待队列中获取任务执行。      addWorker(null, false);  }   //逻辑到这里说明线程池已经不是RUNNING状态,或者等待队列已满,需要创建一个新的非核心线程执行该任务;  //如果创建失败,那么非核心线程已满,使用拒绝策略处理该任务;  else if (!addWorker(command, false))    reject(command);}
复制代码

我们也知道线程被创建后,会在一个 while 循环里尝试从 BlockingQueue 里获取并执行任务,说它正在 running 也不为过。基于此,我们对一些高并发服务进行的预热,其实并不是期望 JVM 能对热点代码做 JIT 等优化,对线程池、连接池和本地缓存的预热才是重点

🚀 阻塞队列

BlockingQueue 是线程池内的另一个重要组件,首先它是线程池”生产者-消费者”模型的中间媒介,另外它也可以为大量突发的流量做缓冲,但理解和配置它也经常会出错。

基于”生产者-消费者”模型,我们可能会认为如果配置了足够的消费者,线程池就不会有任何问题。其实不然,我们还必须考虑并发量这一因素。

设想以下情况:有 1000 个任务要同时提交到线程池内并发执行,在线程池被初始化完成的情况下,它们都要被放到 BlockingQueue 内等待被消费,在极限情况下,消费线程一个任务也没有执行完成,那么这 1000 个请求需要同时存在于 BlockingQueue 内,如果配置的 BlockingQueue Size 小于 1000,多余的请求就会被拒绝。

那么这种极限情况发生的概率有多大呢?答案是非常大,因为操作系统对 I/O 线程的调度优先级是非常高的,一般我们的任务都是由 I/O 的准备或完成(如 tomcat 受理了 http 请求)开始的,所以很有可能被调度到的都是 tomcat 线程,它们在一直往线程池内提交请求,而消费者线程却调度不到,导致请求堆积。

我负责的服务就发生过这种请求被异常拒绝的情况,压测时 QPS 2000,平均响应时间为 20ms,正常情况下,40 个线程就可以平衡生产速度,不会堆积。但在 BlockingQueue Size 为 50 时,即使线程池 coreSize 为 1000,还会出现请求被线程池拒绝的情况。

这种情况下,BlockingQueue 的重要的意义就是它是一个能长时间存储任务的容器,能以很小的代价为线程池提供缓冲。根据上文可知,线程池能支持BlockingQueue Size个任务同时提交,我们把最大同时提交的任务个数,称为并发量,配置线程池时,了解并发量异常重要

🚀 GC 回收问题

Full GC 是 Stop the World 的,但这里的 World 指的是 JVM,而一个请求 I/O 的准备和完成是操作系统在进行的,JVM 停止了,但操作系统还是会正常受理请求,在 JVM 恢复后执行,所以 GC 是会堆积请求的。

上文中提到的并发量计算一定要考虑到 GC 时间内堆积的请求同时被受理的情况,堆积的请求数可以通过 QPS*GC时间 来简单得出,还有一定要记得留出冗余。

🚀 业务峰值考虑

除此之外,配置线程池参数时,一定要考虑业务场景。

假如接口的流量大部分来自于一个定时程序,那么平均 QPS 就没有了任何意义,线程池设计时就要考虑给 BlockingQueue 的 Size 设置一个大一些的值;而如果流量非常不平均,一天内只有某一小段时间才有高流量的话,而且线程资源紧张的情况下,就要考虑给线程池的 maxSize 留下较大的冗余;在流量尖刺明显而响应时间不那么敏感时,也可以设置较大的 BlockingQueue,允许任务进行一定程度的堆积。

当然除了经验和计算外,对服务做定时的压测无疑更能帮助掌握服务真实的情况。

并发量的计算

我们常用 QPS 来衡量服务压力,所以配置线程池参数时也经常参考这个值,但有时候 QPS 和并发量有时候相关性并没有那么高,QPS 还要搭配任务执行时间推算峰值并发量。

比如请求间隔严格相同的接口,平均 QPS 为 1000,它的并发量峰值是多少呢?我们并没有办法估算,因为如果任务 执行时间为 1ms,那么它的并发量只有 1;而如果任务执行时间为 1s,那么并发量峰值为 1000。

可是知道了任务执行时间,就能算出并发量了吗?也不能,因为如果请求的间隔不同,可能 1min 内的请求都在一秒内发过来,那这个并发量还要乘以 60,所以上面才说知道了 QPS 和任务执行时间,并发量也只能靠推算。

计算并发量,我一般的经验值是 QPS*平均响应时间,再留上一倍的冗余,但如果业务重要的话,BlockingQueue Size 设置大一些也无妨(1000 或以上),毕竟每个任务占用的内存量很有限。

运行模型

明确的一点是线程池并没有准确的调度功能,即它无法感知有哪些线程是处于空闲状态的,并把提交的任务派发给空闲线程。线程池采用的是”生产者-消费者”模式,除了触发线程创建的任务(线程的 firstTask)不会入 BlockingQueue 外,其他任务都要进入到 BlockingQueue,等待线程池内的线程消费,而任务会被哪个线程消费到完全取决于操作系统的调度。

对应的生产者源码如下:

    public void execute(Runnable command) {        ...        if (isRunning(c) && workQueue.offer(command)) {           isRunning() 是判断线程池处理戚状态            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        ...    }
复制代码

对应的消费者源码如下:

private Runnable getTask() {        for (;;) {            ...            Runnable r = timed ?                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                workQueue.take();            if (r != null)                return r;            ...        }    }
复制代码

🚀 可选线程池模型

📚 newCachedThreadPool(会创建过多的线程任务)
new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,                    TimeUnit.SECONDS,new SynchronousQueue<Runnable>())
复制代码
📚 newFixedThreadPool(会挤压过多的任务到队列里)
new ThreadPoolExecutor(nThreads,nThreads,0L,                    TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())
复制代码
📚 newSingleThreadPool(会挤压过多的任务到队列里)
new ThreadPoolExecutor(1,1,0L,                 TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())
复制代码
📚 newScheduledThreadPool(会创建过多的线程任务)
new ScheduledThreadPoolExecutor(nThreads,Integer.MAX_VALUE,0L,                TimeUnit.NANOSECONDS,new DelayedWorkQueue())
复制代码


调用实例:构造线程池时优先选用线程池模型,如果这些模型不能满足要求,再自定义 ThreadPoolExecutor 线程池


🚀 线程池的关闭

shutdown():不会立即关闭线程池,而是不再接受新的任务,等当前所有任务处理完之后关闭线程池

shutdownNow():立即关闭线程池,打断正在执行的任务,清空缓冲队列,返回尚未执行的任务

用户头像

我们始于迷惘,终于更高水平的迷惘。 2020.03.25 加入

🏆 【酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“】 🏅 【Java技术领域,MySQL技术领域,APM全链路追踪技术及微服务、分布式方向的技术体系等】 🤝未来我们希望可以共同进步🤝

评论

发布
暂无评论
☕【Java技术之旅】走进线程池的世界(基础篇)