写点什么

详细剖析 Java 动态线程池的扩容以及缩容操作

  • 2025-04-29
    福建
  • 本文字数:3879 字

    阅读完需:约 13 分钟

前言


在项目中,我们经常会使用到线程来处理加快我们的任务。但为了节约资源,大多数程序员都会把线程进行池化,使用线程池来更好的支持我们的业务。


Java 线程池ThreadPoolExecutor有几个比较核心的参数,如corePoolSize、maximumPoolSize等等。无论是在工作中还是在面试中,都会被问到,如何正确的设置这几个参数。


线程池的参数并不好配置。一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识。项目 IO 密集型还是 CPU 密集型等等,总归很难确定一个完美的参数,此时就有了动态线程池的诞生。


动态线程池(DTP)原理


其实动态线程池并不是很高大上的技术,它底层依旧是依赖了ThreadPoolExecutor的一些核心接口方法。我们通过下面图片可以很清楚的看到,ThreadPoolExecutor本身就给我们提供了很多钩子方法,让我们去定制化。



那么其原理也非常简单了,我们在运行中假设有一个线程池叫做TaskExecutor


  1. 他的核心线程池默认假设是 10,现在我发觉不够用了,此时我想把他的核心线程池调整为 20

  2. 我可以写一个远程配置(可以阿波罗,zk,redis 什么都可以)。然后监听到了这个配置变为了 core.pool.size=20

  3. 然后我获取到了这个线程池TaskExecutor,并且调用setCorePoolSize(20),那么这个TaskExecutor核心线程数就变为了 20


就是这么简单,拨开表面,探究原理,内部其实非常的简单。当时公司里面的线程池还有加一些友好的界面、监控告警、操作日志、权限校验、审核等等,但本质就是监听配置,然后调用 setCorePoolSize 方法去实现的,最大线程数类似。


public void setCorePoolSize(int corePoolSize) {    if (corePoolSize < 0)        throw new IllegalArgumentException();    int delta = corePoolSize - this.corePoolSize;    this.corePoolSize = corePoolSize;    if (workerCountOf(ctl.get()) > corePoolSize)        interruptIdleWorkers();    else if (delta > 0) {        int k = Math.min(delta, workQueue.size());        while (k-- > 0 && addWorker(null, true)) {            if (workQueue.isEmpty())                break;        }    }}
复制代码



动态线程池缩容


首先提出几个问题


  1. 核心线程数为 5,现在有 3 个线程在执行,并且没有执行完毕,我修改核心线程数为 4,是否修改成功

  2. 核心线程数为 5,现在有 3 个线程在执行,并且没有执行完毕,我修改核心线程数为 1,是否修改成功


让我们带着疑问去思考问题。


  1. 首先第一个问题,因为核心线程池数为 5,仅有 3 个在执行,我修改为 4,那么因为有 2 个空闲的线程,它只需要销毁 1 个空闲线程即可,因此是成功的

  2. 第二个问题,核心线程池数为 5,仅有 3 个在执行,我修改为 1。虽然有 2 个空闲线程,但是我需要销毁 4 个线程。因为有 2 个空闲线程,2 个非空闲线程。我只能销毁 2 个空闲线程,另外 2 个执行的任务不能被打断,也就是执行后仍然为 3 个核心线程数。

  3. 那什么时候销毁剩下 2 个执行的线程呢,等到 2 个执行的任务完毕之后,就会销毁它了。假设这个任务是一个死循环,永远不会结束,那么核心线程数永远是 3,永远不能设置为 1


我们举一个代码的例子如下


ThreadPoolExecutor es = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());es.prestartAllCoreThreads();  // 预启动所有核心线程
// 启动三个任务,执行次数不一样for (int i = 0; i < 3; i++) { int finalI = i; es.execute(() -> { int cnt = 0; while (true) { try { cnt++; TimeUnit.SECONDS.sleep(2);
if (cnt > finalI + 1) { log.info(Thread.currentThread().getName() + " 执行完毕"); break; } } catch (InterruptedException e) { e.printStackTrace(); } } });}TimeUnit.SECONDS.sleep(1); // 等待线程池中的线程执行log.info("修改前 es = {}", es); // 这里核心线程数必定是5
es.setCorePoolSize(1); // 修改核心线程数为1,但是核心线程数为5,并且有3个线程在执行任务,
while (true) { TimeUnit.SECONDS.sleep(1); // 等待 log.info("修改后 es = {}", es);}
复制代码


输出结果为如下


// 修改前核心线程数为5,运行线程数为3[修改前 es = java.util.concurrent.ThreadPoolExecutor@72d818d1[Running, pool size = 5, active threads = 3, queued tasks = 0, completed tasks = 0]]
// 因为有2个空闲线程,先把2个空闲线程给销毁了,剩下3个线程[修改后 es = java.util.concurrent.ThreadPoolExecutor@72d818d1[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0]]
// 等第1个任务执行完毕,剩下2个线程[Main.lambda$d$0:38] [pool-2-thread-1 执行完毕][修改后 es = java.util.concurrent.ThreadPoolExecutor@72d818d1[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 1]]
// 等第2个任务执行完毕,剩下1个线程[Main.lambda$d$0:38] [pool-2-thread-2 执行完毕][修改后 es = java.util.concurrent.ThreadPoolExecutor@72d818d1[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 2]]
// 等第3个任务执行完毕,剩下1个线程。因为我修改的就是1个核心线程[Main.lambda$d$0:38] [pool-2-thread-3 执行完毕][修改后 es = java.util.concurrent.ThreadPoolExecutor@72d818d1[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 3]]
复制代码


有兴趣的读者可以拿这块带去自己去试试,输出结果里面的注释 我写的非常详细,大家可以详细品品这块输出结果。


动态线程池扩容


扩容我就不提问问题了,和缩容异曲同工,但我希望读者可以先看下以下代码,不要看答案,认为会输出什么结果,看下是否和自己想的是否一样,如果一样,那说明你已经完全懂了,如果不一样,是什么原因。


// 核心线程数1,最大线程数10ThreadPoolExecutor es = new ThreadPoolExecutor(1, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());es.prestartAllCoreThreads();  // 预启动所有核心线程
for (int i = 0; i < 5; i++) { int finalI = i; es.execute(() -> { int cnt = 0; while (true) { try { cnt++; TimeUnit.SECONDS.sleep(2);
if (cnt > finalI + 1) { log.info(Thread.currentThread().getName() + " 执行完毕"); break; } } catch (InterruptedException e) { e.printStackTrace(); } } });}
TimeUnit.SECONDS.sleep(1); // 等待线程池中的线程执行log.info("修改前 es = {}", es); // 这里核心线程数必定是1, 队列里面有4个任务
es.setCorePoolSize(3); // 修改核心线程数为3
while (true) { TimeUnit.SECONDS.sleep(1); // 等待 log.info("修改后 es = {}", es);}
复制代码


输出结果为如下 (注意观察输出 queued tasks 的变化!!!


[修改前 es = java.util.concurrent.ThreadPoolExecutor@1e397ed7[Running, pool size = 1, active threads = 1, queued tasks = 4, completed tasks = 0]][修改后 es = java.util.concurrent.ThreadPoolExecutor@1e397ed7[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]]
[Main.lambda$a$1:73] [pool-2-thread-1 执行完毕][修改后 es = java.util.concurrent.ThreadPoolExecutor@1e397ed7[Running, pool size = 3, active threads = 3, queued tasks = 1, completed tasks = 1]]
[Main.lambda$a$1:73] [pool-2-thread-2 执行完毕][修改后 es = java.util.concurrent.ThreadPoolExecutor@1e397ed7[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 2]]
[Main.lambda$a$1:73] [pool-2-thread-3 执行完毕][修改后 es = java.util.concurrent.ThreadPoolExecutor@1e397ed7[Running, pool size = 3, active threads = 2, queued tasks = 0, completed tasks = 3]]
[Main.lambda$a$1:73] [pool-2-thread-1 执行完毕][修改后 es = java.util.concurrent.ThreadPoolExecutor@1e397ed7[Running, pool size = 3, active threads = 1, queued tasks = 0, completed tasks = 4]]
[Main.lambda$a$1:73] [pool-2-thread-2 执行完毕][修改后 es = java.util.concurrent.ThreadPoolExecutor@1e397ed7[Running, pool size = 3, active threads = 0, queued tasks = 0, completed tasks = 5]]
复制代码


最后


在业务中,我们为了提高效率使用了线程,为了加快线程我们使用了线程池,而又为了更好的利用线程池的资源,我们又实现了动态化线程池。这也就是遇到问题、探索问题、解决问题的一套思路吧。


我们从底层原理分析,发现动态线程池的底层原理非常简单,希望大家不要恐惧,往往拨开外衣,发现里面最根本的原理,才能是我们更好的捋清楚其中的逻辑。希望本文提供的动态化线程池思路能对大家有帮助。


最终也极力希望读者朋友们,可以将上述两个例子详细分析一下原因,相信会有不小的进步,谢谢大家。


文章转载自:程序员博博

原文链接:https://www.cnblogs.com/wenbochang/p/18688107

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
详细剖析Java动态线程池的扩容以及缩容操作_Java_电子尖叫食人鱼_InfoQ写作社区