【线上问题】CompletableFuture 与线程池使用不当导致服务整个挂掉
### Informal Essay By English
It is always a pleasure to learn
### 背景
在某一个风和日丽的早上,小组同事说昨晚线上服务有 20 分钟左右的不可用,当时内心一紧,不会是我写的代码有 bug 导致的吧👀,我正了正心态,故作轻松地说有定位到是什么原因导致的吗?(内心慌的一批🌝)他开始滔滔不绝地说了一大堆是如何排查问题的(技术人的特性,对于解决问题非常热忱),虽然我当时一直保持着很认真的神态,但其实心里非常煎熬(是谁的代码导致的一直没有说!!)。10 分钟后.....,同事语重心长地说,这次这个线上问题暴露我们以前写的代码还是欠缺一些场景的考虑。听到这里我大概已经知道不是我的代码导致的(注:我刚进这个小组不久),这个时候我也开始语重心长地附和道“是啊,以前我们在这块的考量还是有些不足的............然后就开始和他就‘代码质量如何保证’话题讨论了半小时🌚”。
### 问题描述
问题最终定位到是一个并发问题。线上有一个接口是通过 CompletableFuture 与线程池结合使用去获取下游的数据(注:使用异步的方式去获取下游数据则是因为调用的下游的接口是一个耗时高的 soa 接口),大致代码如下:
```java
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ThreadPoolExecutor flowContractThreadPool = new ThreadPoolExecutor(2,
5, 15, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(5),
new ThreadPoolExecutor.DiscardPolicy());
List<CompletableFuture> threadList = Lists.newArrayList();
for (int i = 0; i < 100; i++) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
log.info("main for");
// do something
}, flowContractThreadPool);
threadList.add(future);
}
CompletableFuture.allOf(threadList.toArray(new CompletableFuture[threadList.size()])).join();
//获取结果,然后做相关业务处理
log.info("end");
}
```
现在暂时不去分析代码,先描述现象。出现问题的那个晚上,有一波突刺流量,由于我们没有针对接口的请求失败做短信告警(虽然有钉钉异常告警群,但是大家都不是很关心群里消息🌞),因此一开始出现问题的接口出现大面积的请求超时而我们都没有感知到,直到最终服务出现不可用,值班同事才发现这个问题(经过此事件,我们很自然地加上请求失败告警🌦️)。从结合链路和日志分析定位问题出现的原因到服务恢复的 MTTR 大概花了 10 分钟(在此特意表扬一下此同事☀️☀️)。
### 问题分析
通过上面背景与问题的描述,已经知道整个问题的全貌,现在从技术的角度去分析一下 CompletableFuture 结合线程池的原理与使用注意事项。
#### CompletableFuture
从上述的案例代码里面,涉及到 CompletableFuture 中的三个方法,他们分别是 runAsync、join、allOf,下面我们逐步去分析这几个方法:
##### runAsync
这个方法的效果返回一个新的 CompletableFuture,该 CompletableFuture 是在给定执行器中运行的任务在运行给定动作后异步完成的。
```java
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
//此方法是为了保证操作系统是多核的情况下走线程池,单核情况下不走线程池,由单个线程去跑相应的逻辑
static Executor screenExecutor(Executor e) {
if (!useCommonPool && e == ForkJoinPool.commonPool())
return asyncPool;
if (e == null) throw new NullPointerException();
return e;
}
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
e.execute(new AsyncRun(d, f));
return d;
}
```
runAsync 方法中对 CompletableFuture 进行了一层封装,通过 AsyncRun 对象组装一个空的 CompletableFuture 与 Runnable,然后将空的 CompletableFuture 返回,我们再来看看 AsyncRun 的一个结构:
```java
static final class AsyncRun extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<Void> dep; Runnable fn;
AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return true; }
public void run() {
CompletableFuture<Void> d; Runnable f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
//由于 f
f.run();
d.completeNull();
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}
```
AsyncRun 和 AsyncSupply 的实现略有不同,AsyncRun 的 run 中,计算的执行是通过调用传入的 Runnable(源码中的 f 变量)的 run 方法进行的。由于没有返回值,所以这里在设置 CompletableFuture 的值时,使用其 completeNull()方法,设置一个特殊的空值标记。
AsyncRun 的继承结构大致如下:
![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/ecb362eea81d44f4b1ec72dc9016bdd9.png)
##### allOf
allOf 的方法的作用是当所有给定的 CompletableFutures 完成时,返回一个新的 CompletableFuture。如果给定的任何一个 CompletableFuture 异常完成,那么返回的 CompletableFuture 也会异常完成,并使用 CompletionException 将此异常作为其原因。否则,给定的 CompletableFuture 的结果(如果有的话)不会反映在返回的 CompletableFuture 中,而是可以通过单独检查它们来获得。如果没有提供 CompletableFutures,则返回一个完整的 CompletableFuture,其值为 null。该方法的应用之一是在继续一个程序之前等待一组独立的 CompletableFuture 的完成,如:allOf(c1, c2, c3).join();
```java
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
int lo, int hi) {
CompletableFuture<Void> d = new CompletableFuture<Void>();
//对传入的 CompletableFuture 的参数校验,如果没有通过则返回 AltResult
if (lo > hi) // empty
d.result = NIL;
else {
CompletableFuture<?> a, b;
//通过右移操作获取中值
int mid = (lo + hi) >>> 1;
//通过递归的方式 a、b 的赋值操作(这个逻辑有点抽象,大家可以通过花图去理解一下)
if ((a = (lo == mid ? cfs[lo] :
andTree(cfs, lo, mid))) == null ||
(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
andTree(cfs, mid+1, hi))) == null)
throw new NullPointerException();
//判断任务是否执行
if (!d.biRelay(a, b)) {
BiRelay<?,?> c = new BiRelay<>(d, a, b);
a.bipush(b, c);
c.tryFire(SYNC);
}
}
return d;
}
//判断任务是否执行,可简单理解为:result 是 null 任务没执行,不是 null 任务已执行。
boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
Object r, s; Throwable x;
if (a == null || (r = a.result) == null ||
b == null || (s = b.result) == null)
return false;
if (result == null) {
if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
completeThrowable(x, r);
else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)
completeThrowable(x, s);
else
completeNull();
}
return true;
}
//这个方法是用于对象编排
final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
if (c != null) {
Object r;
while ((r = result) == null && !tryPushStack(c))
lazySetNext(c, null); // clear on failure
if (b != null && b != this && b.result == null) {
Completion q = (r != null) ? c : new CoCompletion(c);
while (b.result == null && !b.tryPushStack(q))
lazySetNext(q, null); // clear on failure
}
}
}
//用于获取 future 执行完的返回值
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d;
CompletableFuture<T> a;
CompletableFuture<U> b;
if ((d = dep) == null || !d.biRelay(a = src, b = snd))
return null;
src = null; snd = null; dep = null;
return d.postFire(a, b, mode);
}
```
##### join
完成时返回结果值,如果异常完成则抛出(未检查的)异常。为了更好地符合通用函数形式的使用,如果在 CompletableFuture 的完成过程中涉及的计算抛出了异常,则该方法抛出一个(未检查的)CompletionException,并将底层异常作为其原因。
```java
public T join() {
Object r;
return reportJoin((r = result) == null ? waitingGet(false) : r);
}
//此方法的的作用就是用于判断 AltResult 的 result 中是否有异常,如果有抛出来
private static <T> T reportJoin(Object r) {
if (r instanceof AltResult) {
Throwable x;
if ((x = ((AltResult)r).ex) == null)
return null;
if (x instanceof CancellationException)
throw (CancellationException)x;
if (x instanceof CompletionException)
throw (CompletionException)x;
throw new CompletionException(x);
}
@SuppressWarnings("unchecked") T t = (T) r;
return t;
}
//等待后返回原始结果,如果可中断且已中断则返回 null。
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
//spins 用于自旋,可不用关注
if (spins < 0)
spins = SPINS;
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
// 必会执行的分支,会将 q 的值赋值 new Signaller(interruptible, 0L, 0L);
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
//必会执行的分支,把 stack 设置为 q
else if (!queued)
queued = tryPushStack(q);
//线程中断时会执行
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
//这个是阻塞 api 执行的地方,着重看这个
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
postComplete();
return r;
}
//运行给定的可能阻塞的任务。当在 ForkJoinPool 中运行时,如果有必要,这个方法可能会安排一个空闲线程被激活,以确保当前线程在 blocker.block()中被阻塞时有足够的并行性。
//此方法重复调用 blocker.isReleasable()和 blocker.block(),直到其中一个方法返回 true。
//每次调用 block .block()之前都会调用 blockisreleasable(),返回 false。
//如果不在 ForkJoinPool 中运行,则此方法在行为上等同于 while (!block . isrelease ()) if (block .block()) break;如果在 ForkJoinPool 中运行,池可能首先被扩展以确保在调用 block .block()期间有足够的并行性。
public static void managedBlock(ManagedBlocker blocker)
throws InterruptedException {
ForkJoinPool p;
ForkJoinWorkerThread wt;
Thread t = Thread.currentThread();
if ((t instanceof ForkJoinWorkerThread) &&
(p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
WorkQueue w = wt.workQueue;
while (!blocker.isReleasable()) {
if (p.tryCompensate(w)) {
try {
do {} while (!blocker.isReleasable() &&
!blocker.block());
} finally {
U.getAndAddLong(p, CTL, AC_UNIT);
}
break;
}
}
}
else {
do {} while (!blocker.isReleasable() &&
!blocker.block());
}
}
```
上面我们对于 CompletableFuture 有了一个粗略的认识,想了解更多的话,推荐去看看[CompletableFuture 的 allOf 方法底层原理是什么](https://blog.csdn.net/every__day/article/details/129629863)。
#### 案例分析
```java
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ThreadPoolExecutor flowContractThreadPool = new ThreadPoolExecutor(2,
5, 15, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(5),
new ThreadPoolExecutor.DiscardPolicy());
List<CompletableFuture> threadList = Lists.newArrayList();
for (int i = 0; i < 100; i++) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
log.info("main for");
// do something
}, flowContractThreadPool);
threadList.add(future);
}
CompletableFuture.allOf(threadList.toArray(new CompletableFuture[threadList.size()])).join();
//获取结果,然后做相关业务处理
log.info("end");
}
```
再回到我们的案例代码里面,案例中我们给线程池设置的拒绝策略是 DiscardPolicy(造成此次线上问题的罪魁祸首),此策略的作用是当线程池中的队列满时再来任务会静默丢弃 task,现在问题来了,这个丢弃的任务就有可能是某些阻塞等待线程的 FutureTask,那么这些调用了 get()的无限时等待 api 的线程将无限时阻塞了,没人去唤醒他,如下图:
![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/ee89d17b5b6b489e99d8d380c6dea63d.png)
FutureTask 被丢弃的话,CompletableFuture.allOf(threadList.toArray(new CompletableFuture[threadList.size()])).join();这段代码就会一直阻塞线程获取 FutureTask 结果,此时结果是永远无法获取到(task 丢弃,因此 FutureTask 的引用无法获取数据),因此会一直夯住主线程,随着夯住的线程越来越多,tomcat 的线程也会被打满,整个服务就瘫痪了。
### 问题处理
那么如何去处理这种现象呢?如果认真看完上面的内容的小伙伴其实已经有答案了,我这里提供两种思路去处理。第一种就是根据线上流量峰值去增大队列长度(这种方式不推荐,治标不治本);第二种是通过修改线程池的拒绝策略避免这种情况(最好是自己实现拒绝策略,这种方式对于业务的扩展更佳灵活)。
最后提出一个问题,如果是你碰到这个问题,你会怎么去处理呢?
评论