一、背景
最近系统出现了一次因为错误使用线程池而出现了内存溢出问题,遂打算写一篇线程池深入原理与实践,避免大家后面继续踩坑,更优雅的使用线程池。那么什么场景下该使用线程池、怎样设置核心参数、线程池是怎么运行调度的、实践参数应用设计等等,下面让我们带着这些问题来一探究竟。
二、什么场景下该使用线程池?
1.CPU 密集型任务(任务数量较多但任务执行时间较短,占用资源较少)
例如 OpenApi 服务计费消息消费或者客户余额调用推送等消息类业务,这类业务往往处理较快,消息内容简单,但在高并发下,会瞬间产生大量任务数,造成瞬时大量线程开销,增加 CPU 压力,为了避免这种问题我们可以使用线程池进行线程数控制,达到控制并发的要求;
2.IO 密集型任务 (任务数量较少但任务执行时间较长,占用资源较多)
常见于 IO 频繁的任务,例如客户账单的生成或者常见的日志收集场景,我们出现问题的业务场景就是这里。这类任务数量一般不会太多但是常常会消耗系统内存占用大量资源,为了避免这种问题我们可以使用线程池进行线程数量扩展,让系统处理 IO 的时候可以有空闲线程去处理别的任务,合理复线程。
3.IO&CPU 密集型任务(任务数量多且占用资源多)
在这种特殊场景下,已经不是完全靠线程池就能保障系统的稳定性了的,需要多处业务共同发挥作用,例如:当任务瞬时数量特别多且时间效率低,占用资源多,那么这个时候就要考虑在业务产生方(流量入口)做业务限流处理,同时线程池尽可能设置更多的线程去处理任务,充分利用系统资源。
三、怎样设置核⼼参数?
首先聊这个话题之前我们要先简单了解一下为什么不推荐使用 Executors 提供的线程池进行线程池业务处理,主要原因有二:
其一,Executors 封装好的线程池大多采用无界队列,很容易造成内存溢出,极不安全;
其二,为核心参数不可根据实际业务横向扩展,不够灵活,不适用于业务场景多变的情况。
书归正文,下面来说一下自定义线程池的方式及核心参数如何设置,自定义线程池一般采用 ThreadPoolExecutor 的方式进行线程池创建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
复制代码
主要参数的意义及运行逻辑:
corePoolSize(核心线程数)
当有任务需要执行时会先创建小于 corePoolSize 的线程去执行任务,并且线程池会保持核心线程数在池中,即使它是空闲的,除非设置了 allowCoreThreadTimeOut 为 true,那么在 keepAliveTime 时间内便会被回收;
maximumPoolSize (最大线程数)
当 workQueue 满了之后会创建小于 maximumPoolSize 线程数执行任务,注意这里线程池可以创建线程的最大值不是 corePoolSize+maximumPoolSize,而是 maximumPoolSize。
keepAliveTime (线程最大存活时间)
核心线程数并不是执行完任务就被销毁掉,在有新任务到来且当前任务数大于核心线程数时,这时候这个参数就有了意义,它意味着新来的任务是否可以在 keepAliveTime 内复用已经任务执行结束了的线程,也就是空闲线程的存活时间;
workQueue(任务阻塞队列)
当创建线程数大于 corePoolSize 时就会把任务丢进 workQueue 进行排队,等待被 take;
threadFactory(线程工厂)
自定义这个的好处是方便我们标记不同线程池的线程逻辑,更方便我们日常通过此排查问题;
RejectedExecutionHandler(拒绝策略)
当任务执行创建的线程数大于 maximumPoolSize 时就会触发定义好的拒绝策略(默认为直接抛弃任务),也可以继承这个类来实现自定义策略持久化存储不能处理的任务;
四、线程池怎么运行调度的?
首先我们来先了解下线程池的状态:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
复制代码
RUNNING:线程池处于这个状态下可以接受新的任务以及处理队列中的任务;
SHUTDOWN:不再接受新的任务,但是可以处理队列中的任务;
STOP:不再接受新的任务,不会处理队列中的任务,并且会中断处理中的任务;
TIDYING:所有任务都被终止,工作线程数量为 0,此时线程的事务状态为 TIDYING 并且会运行钩子函数 terminated(),进而线程池状态变为 TERMINATED;
TERMINATED:线程池中一个重要的角色:Woker,它围绕着整个线程池的生命周期进行任务提交、任务运行,也就说它是作为线程池的工作线程进行运转的。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{}
复制代码
通过源码我们可以看到,Worker 作为一个 Runnable 聪明的继承了 AQS,这使得其天然具有 AQS 的优秀设计特性,其中不乏包括任务的排队唤醒、自旋加锁等等。
任务提交:
线程池的任务提交可分为两种:
带返回值
executorService.submit((Callable<String>) () -> null);
复制代码
无返回值
executorService.execute(() -> {});
复制代码
大多数场景我们都是用 submit 的比较多,所以接下来的基于源码的运行原理探索也主要围绕着 submit 场景下来说,首先我们创建好一个线程池后,进行任务提交,那么依照源码我们来看底层的运行逻辑;
ExecutorService executorService = new ThreadPoolExecutor(
16,
8,
60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100),
new CustomThreadFactory(), new CustomRejectHandler());
executorService.submit((Callable<String>) () -> null);
复制代码
这里我们调用 ExecutorService 的 submit 方法后首先会将任务(Callable)封装成一个 FutureTask 丢给 Executor 的 execute 方法实现者(ThreadPoolExecutor)去执行
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask);return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}
复制代码
当线程池执行 exceute 时,首先会判断传入的 task(RunnableFuture)是否为空,为空则抛出异常,否则直接进行主逻辑进行运行,如果当前工作线程数 workerCountOf(c)小于调用 addWorker 方法创建任务工作线程 Woker,如果创建失败则尝试入队,如果失败再 addWorker 尝试创建新的工作线程,如果失败则执行 reject,进而触发拒绝策略;
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return; c = ctl.get(); }if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command)) reject(command);else if (workerCountOf(recheck) == 0) addWorker(null, false); }else if (!addWorker(command, false)) reject(command);}
复制代码
在创建工作线程时,addWorker 方法首先会检查线程池的运行状态、队列的空与否、是否已达到创建线程数峰值来决定是否要开启一个线程添加到
workers(HashSet,这也是为什么这里要添加可重入锁的一个原因,至于为什么这里没有用线程安全的集合例如:
ConcurrentHashMap
CopyOnWriteArrayList
CopyOnWriteArraySet 等等。
个人觉得原因主要有两个:
其一,HashSet 天然去重,再者 HashSet 本质上是具有 HashMap 的高效特性即效率更高;
其二,使用其他的集合容器,维护成本变高且不太符合)
中执行任务,如果满足条件则会通过 Worker 的构造方法创建工作线程并设置 Worker 状态(AQS 中的 State 标识)为允许中断,然后调用 worker 的 run 方法执行任务;
private boolean addWorker(Runnable firstTask, boolean core) {
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
复制代码
我们可以看到在创建 Worker 时,添加了 ReentrantLock 可重入锁,并且进行了 double check 保证了程序运行的原子性,并且我们可以看到 worker 的 run 方法本质上是调用了 runWorker 方法,这也是线程池的核心设计 runWorker;
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() { runWorker(this);}
复制代码
关于 runWorker 的实现可以总结为如果传入的 task(线程)为空则尝试不停从队列获取任务即 getTask,不为空则直接执行任务,在之前任务之前会调用 AQS 的 lock 方法进行上锁,保证原子性。此外,线程池也在任务执行的前后为我们提供了扩展的方法 beforeExecute(wt, task),afterExecute(task, thrown)。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
复制代码
看到这里我想我们应该知道线程池之所以可以不断执行 client 提交的任务,主要依赖于 getTask 这个方法的实现,其会一直尝试从队列获取任务,如果获取不到线程会被阻塞并被挂起(BlockQueue 核心概念)并不会占用 CPU 资源,直到被唤醒。getTask 直到线程池生命周期结束(包括正常和异常),下面是对应的主要逻辑;
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
} catch (InterruptedException retry) {
boolean timedOut = false;
}
}
}
复制代码
综上便是线程池最主要的运行原理,接下来我们开始这次的硬菜:实践参数应用设计。
五、实践参数如何设计
其实关于这点我们上述主题已经介绍过一些了,下面会对其进行分析阐述,正常情况下我们设计线程池参数时首先要考虑的就是任务类型,不同的任务类型决定了我们给出的参数设置
目前看到最多的答案都是 CPU 任务类型采用 n + 1,n*2,n 为当前系统 CPU 核数。但是在我们复杂多变的系统中,这种设置往往站不住脚,经常会设置一遍后不断根据线上业务场景不断调试,但是我们要知道的是,一个系统设计好之后,轻易的修改,重启往往会带来一些麻烦或者故障。
书归正传,那么该如何设置线程池线程数以及队列长度呢?
我想最优解的答案应该是线程池动态化,那么具体该如何实现呢,其实 juc 已经为我们指明了方向,我们来翻看一下源码会发现:
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
复制代码
这两个设置核心线程数的方法可以在运行时 Runtime 进行修改动态修改线程数,有了这一点我们完全可以将线程池参数配置在分布式配置中心中例如 Apollo、Nacos 诸如此类可以动态修改配置的配置中心,这样一来我们的服务无需重启即可根据业务场景灵活调整线程池参数。
其实还有一个比较重要的参数,没错,阻塞队列,juc 也没有为我们提供动态修改的方法,但是山重水复疑无路,柳暗花明又一村。没有的话我们可以造一个嘛,其实很简单,我们想一下,为什么 juc 不给我们提供这样一个方法呢,我们翻看 LinkedBlockingDeque 源码后就不难发现原因就在于阻塞队列的 capacity 被定义为 final。
/** Maximum number of items in the deque */
private final int capacity;
那么我们完全可以自定义一个CustomBlockingDeque,而后我们把capacity定义为可修改不就可以了。下面是实践的截图,供大家参考。
public static ThreadPoolExecutor getThreadPoolInstance(){
return new ThreadPoolExecutor(10,15,60, TimeUnit.SECONDS,
new CustomBlockingQueue<>(20), customThreadFactory, new ThreadPoolExecutor.AbortPolicy());
}
static ThreadPoolExecutor executor = getThreadPoolInstance();
static public void submitTask(String name){
for (int i = 0; i < 30; i++){
Callable<String> task = () -> {
System.out.println(LocalDateTime.now()+"-" +Thread.currentThread().getName()+name+"核心线程数:"
+executor.getCorePoolSize()+",最大线程数:"
+executor.getMaximumPoolSize()+",当前排队数:"
+executor.getQueue().size()+",任务完成数:"
+executor.getCompletedTaskCount()+",队列大小"
+(executor.getQueue().size()+executor.getQueue().remainingCapacity())
);
TimeUnit.SECONDS.sleep(10);
return null;
};
executor.submit(task);
}
}
public static void main(String[] args) throws InterruptedException {
submitTask("参数改变前-");
TimeUnit.SECONDS.sleep(1);
executor.setCorePoolSize(20);
executor.setMaximumPoolSize(20);
CustomBlockingQueue queue = (CustomBlockingQueue) executor.getQueue();
queue.setCapacity(100);
submitTask("参数改变后-");
}
复制代码
好了,我们这篇文章就到这里了,其中若有不足之处还望指出,不吝赐教。
关于领创集团(Advance Intelligence Group)
领创集团成立于 2016 年,致力于通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈。集团旗下包含企业业务和消费者业务两大板块,企业业务包含 ADVANCE.AI 和 Ginee,分别为银行、金融、金融科技、零售和电商行业客户提供基于 AI 技术的数字身份验证、风险管理产品和全渠道电商服务解决方案;消费者业务 Atome Financial 包括亚洲领先的先享后付平台 Atome 和数字金融服务。2021 年 9 月,领创集团宣布完成超 4 亿美元 D 轮融资,融资完成后领创集团估值已超 20 亿美元,成为新加坡最大的独立科技创业公司之一。
往期回顾 BREAK AWAY
Spring data JPA 实践和原理浅析
如何解决海量数据更新场景下的 Mysql 死锁问题
企业级 APIs 安全实践指南 (建议初中级工程师收藏)
Cypress UI 自动化测试框架
serverless让我们的运维更轻松
评论