本文首发于公众号【苦味代码】
前言
Java 8 给大家带来了一个非常便捷的多线程工具:并行流,一改往日 Java 多线程繁琐的编程规范,只需要一行代码,就可以让一个多线程跑起来,似乎让很多人忘记了被多线程支配的恐惧,这篇文章给大家分享一个真实的生产故障,由于在消费消息的处理器中使用了 Java 8 的并行流,导致集群消费消息的能力急速下降,造成线上消息堆积,引发故障。可能有朋友会好奇,到底是什么场景让并行流起了反作用?
并行流执行速度一定比串行快吗?
答案是:还真不一定,得看是哪种场景
我把产生线上问题的代码抽闲成下面的示例代码:
void testParallelStream() throws InterruptedException {
ExecutorService threadPool = new ThreadPoolExecutor(50, 200, 20, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("test-parallel-thread").build(), new ThreadPoolExecutor.CallerRunsPolicy());
Long time1 = System.currentTimeMillis();
// 1. 多线程+foreach执行时长
for (int i = 0; i < ARRAY_LENGTH; i++) {
CommonExecutor commonExecutor = new CommonExecutor();
commonExecutor.array = arrays[i];
threadPool.submit(commonExecutor);
}
commonCountDownLatch.await();
Long time2 = System.currentTimeMillis();
System.out.println("for循环耗时: " + (time2 - time1));
threadPool = new ThreadPoolExecutor(50, 200, 20, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("test-parallel-thread").build(), new ThreadPoolExecutor.CallerRunsPolicy());
// 2. 多线程+并行流执行时长
for (int i = 0; i < ARRAY_LENGTH; i++) {
ParallelStreamExecutor parallelStreamExecutor = new ParallelStreamExecutor();
parallelStreamExecutor.array = arrays[i];
threadPool.submit(parallelStreamExecutor);
}
parallelCountDownLatch.await();
Long time3 = System.currentTimeMillis();
System.out.println("并行流耗时: " + (time3 - time2));
}
复制代码
其中两次提交给线程池的执行器如下所示:
@Data
private static class CommonExecutor implements Runnable {
private long[] array;
@Override
public void run() {
// 选择排序法进行排序
for (int i = 0; i < array.length; i++) {
array[i] = i * i;
}
commonCountDownLatch.countDown();
}
}
复制代码
@Data
private static class ParallelStreamExecutor implements Runnable {
private long[] array;
@Override
public void run() {
// 选择排序法进行排序
IntStream.range(0, array.length).parallel().forEach(i -> array[i] = i * i);
parallelCountDownLatch.countDown();
}
}
复制代码
这段代码的思路非常简单,就是对一个二维数组arrays
的每一行,计算其列下标的平方数,并且回填到数组中,只不过这个过程是通过线程池去完成的,提交给线程池的执行器有两种,一种是普通的 for 循环,通过游标遍历每一个元素的下标,并计算平方数。另一种使用了并行流去完成同样的事情。简单起见,我们把这段代码循环执行 10 次,并统计了每次两种实现方式的耗时(单位是毫秒),大家可以猜猜看,到底哪种实现方式更快。
下面是真实的耗时记录:
执行的结果竟然是并行流的执行速度明显慢于 for 循环,到底是哪里出现问题了呢?
并行流的实现原理
其实问题就出现在并行流的实现上,同一个进程中提交给并行流的 Action 都会被同一个公共的线程池处理。也就是说上文构造的代码无论线程池threadPool
的线程数开到多大,最终实际处理 Action 的线程数都由并行流的公共线程池大小决定,这一点我们可以从并行流的源码上看个大概:
@Override
@SuppressWarnings("unchecked")
public final S parallel() {
sourceStage.parallel = true;
return (S) this;
}
复制代码
调用 parallel 只会将java.util.stream.AbstractPipeline
中的sourceStage.parallel
置为true
,到调用foreach
的时候,会调用到下面这个方法
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
复制代码
这里的isParallel()
就会判断上面设置的sourceStage.parallel
字段,从而使程序的执行流程走到terminalOp.evaluateParallel
这个分支,再往后跟的话会发现最终任务会提交到ForEachTask
@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
if (ordered)
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
else
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
return null;
}
复制代码
ForEachTask
这里稍微提一嘴,是java 1.7
引入的一个轻量级多线程任务,逻辑还是比较多的,后面有机会我们再看下它的实现原理,通过断点跟进去,发现最后提交的任务都会调用到ForEachTask
的compute
方法
public void compute() {
// 以我们初始提交的任务为例spliterator的类型是一个RangeIntSpliterator,其中from = 0, upTo = 10000, last = 0
Spliterator<S> rightSplit = spliterator, leftSplit;
// estimateSize = upTo - from + last
long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
if ((sizeThreshold = targetSize) == 0L)
// 目标大小会根据上文提到的公共线程池计算,值等于 sizeEstimate/线程池大小 * 4
targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
boolean forkRight = false;
Sink<S> taskSink = sink;
ForEachTask<S, T> task = this;
// 【核心逻辑】进入任务切分逻辑,
while (!isShortCircuit || !taskSink.cancellationRequested()) {
// 切分直至子任务的大小小于阈值
if (sizeEstimate <= sizeThreshold ||
// trySplit()会将rightSplit等比例切分,并返回切分的第一个子任务,切分比例跟待切分的任务总数相关
// 如果待切分的子任务大小小于等于1,则返回null,停止切分
(leftSplit = rightSplit.trySplit()) == null) {
task.helper.copyInto(taskSink, rightSplit);
break;
}
ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
task.addToPendingCount(1);
ForEachTask<S, T> taskToFork;
// 这里通过forkRight控制本线程切割任务的顺序是
// 左->右->左->右->左->右直至子任务大小满足阈值,这样可以让整个任务执行更离散
// 关于这样做的好处也欢迎大家在评论区讨论
if (forkRight) {
forkRight = false;
rightSplit = leftSplit;
taskToFork = task;
task = leftTask;
}
else {
forkRight = true;
taskToFork = leftTask;
}
// 通过fork将将切分好的子任务提交到线程池
taskToFork.fork();
sizeEstimate = rightSplit.estimateSize();
}
task.spliterator = null;
task.propagateCompletion();
}
复制代码
这段代码是整个并行流实现的核心逻辑,其本质就是将刚开始提交的串行大任务切分成更小的任务提交到线程池,并行流的秘密就藏在这段代码中:
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
复制代码
这里涉及到 ForkJoinPool 的一个设计,为了避免常规线程池中各个线程访问任务队列产生竞争,ForkJoinPool 除了有一个公共的任务队列之外,每个线程自身还持有一个任务队列,外部线程需要提交任务到公共队列,线程池线程切分的更小的任务则直接提交到自身的工作队列中,因此就有了上面看到的这段逻辑。整个 ForkJoinPool 的逻辑如下图所示:
至于 ForkJoinPool.common,就是我们上文一直提及的公共线程池,其初始化方法在 ForkJoinPool 的静态调用块中调用
private static ForkJoinPool makeCommonPool() {
final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =
new CommonPoolForkJoinWorkerThreadFactory();
int parallelism = -1;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
// 可以通过设置这个值来改变公共线程池的大小
("java.util.concurrent.ForkJoinPool.common.parallelism");
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if (pp != null)
parallelism = Integer.parseInt(pp);
if (fp != null)
factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
if (factory == null) {
if (System.getSecurityManager() == null)
factory = commonPoolForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
// 获取线程池线程数量,其值等于当前可用处理器减一
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
复制代码
这段代码比较简单,就是返回了一个固定的线程池,线程池大小默认等于可用处理器减一,这是因为在 ForkJoinPool 的设计中外部线程也是可以参与到执行子任务的,这个看似巧妙的设计其实很容易误用,尤其是遇到跟线程状态相关的全局变量时。
并行流比串行更慢的原因
在了解了并行流的实现原理后我们也就能理解为什么在文章开头,针对同一段逻辑,并行流的执行反而比串行慢了。
当在多线程场景下使用并行流是,由于并行流使用的是一个公共的线程池,所以无论外部有多少个线程,这些线程都会把任务提交给同一个线程池,所以你会发现,无论你咱么调整外面线程池的大小,都不能使任务加速。回到文章刚开始的例子,采用并行流的实现中真实的线程数为 7,而采用串行的实现中真实的线程数为 100,由于线程数差别巨大,因此造成了最终的耗时也有很明显的差距。
总结
并行流在的设计是比较讨巧的,其中有三个地方容易采坑
同一个进程提交给并行流的任务都会被同一个公共线程池处理,因此,如果在多线程的环境中使用了并行流,反而会降低并发,使得处理变慢
并行流的公共线程池大小为可用处理器减一,并且并行流会使用外部线程去处理内部子任务,搭配ThreadLocal
使用的时候务必要慎重,在一些与ThreadLocal
强耦合的场景,可能会导致ThreadLocal
误清理,其他线程相关的全局变量同理
并行流的设计是为了应对计算密集型的场景的,如果有较多的 IO 场景,比如常见的 RPC 调用,在高并发的场景下会导致外部线程阻塞,引起外部线程数增多,且这类问题在测试的时候不容易发现,极易引起生产故障。
给朋友们送福利了
Rocket MQ 是开源社区炽手可热的消息中间件,支持了阿里庞大的业务和复杂的场景,其设计上也有很多巧妙的地方,也是很多面试官热衷去挖掘的一个点,关注公众号【苦味代码】,即可领取这本电子书
评论