写点什么

性能测试中 Disruptor 框架 shutdown 失效的问题分享

作者:FunTester
  • 2022 年 3 月 02 日
  • 本文字数:2431 字

    阅读完需:约 8 分钟

在基于 Disruptor 开发新的性能测试 QPS 模型时候,中间遇到了很多问题,踩了很多坑。今天就分享一个比较典型的问题:shutdown 失效。


问题在于这么优秀的一个框架,怎么可能会存在这么明显的 BUG?


经过查阅资料,还真特么存在,只不过在极少数使用场景下会发生,刚好 FunTester 性能测试框架设计中就属于这个场景。下面听说娓娓道来。


首先我是把每一个消费者线程都当做性能测试线程使用,此为前提。下面是两个因此带来的设定:


  1. Disruptor 框架的消费者线程或者消费者线程数组数需要在 Disruptor 启动之前设定,也无法修改

  2. 由于性能测试需要 FunTester 性能框架中基于 Disruptor 写的 QPS 模型需要设置较大的消费者或者消费者组的线程数(要达到 10 万 QPS,这个值通常在 1024 以上)

  3. 性能测试 QPS 均为从低(多数为零)开始到设定的最大 QPS

  4. 性能测试一开始,自然有大量的消费者线程处于空闲状态,甚至未启动状态


以上是四个因为 Disruptor 框架特性和 FunTester 框架设计带来的难以避免,然后就会在线程数远超(难以量化界定)需求的时候,会导致性能测试结束之后,Disruptor 执行 shutdown 方法后,Disruptor 所有线程并没有全部结束,导致程序无法正常结束且 CPU 使用率飙升(线程数设定较多)。具体原因大家可以自行搜索,有大佬做了非常优秀的分析、分享和演示。总结起来就是两点:


  1. 生产者的生产线程必须执行在 disruptor.shutdown 方法之前。

  2. disruptor.shutdown 方法必须执行在所有消费者线程启动之前。


但是这两种情况其实除非特意构造,否则极难发生,重点还是了解一点点 Disruptor 源码的结构和运行逻辑。经过一阵子摸索和学习,我发现了问题所在,消费者线程太多了。


在我初步的测试中,有以下几条经验:


  1. 要依旧现有数据设置消费者数量,并非越多越好

  2. 先消费者数量足够多时,QPS 往往不够稳定,差异能达到 30%

  3. 线程数尽量控制在 2000 以下,否则很容易触发 Disruptor 框架 shutdown 失效问题


PS:以上数据在 QPS:5w,平均响应时间 10ms 设定下完成测试。


使用 Disruptor 做性能测试坑还是挺多的,可能之前也没人这么用过,还有几个大坑我后面会继续分享,目前总体来说,性能测试最好的模型还是线程模型,当 QPS 在万级别上时,QPS 模型的精确很难控制。


关于较多消费者时,Disruptor 框架 shutdown 失效的问题已经反馈给了开发者。下面是我的测试脚本,为了更容易验证,我特意写了 Java 版本的。



import com.lmax.disruptor.EventHandler;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.TimeoutBlockingWaitStrategy;import com.lmax.disruptor.WorkHandler;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ThreadFactory;import java.util.concurrent.TimeUnit;
public class DisJava {

public static void main(String[] args) { ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); return thread; } }; Disruptor<Event> disruptor = new Disruptor<Event>( Event::new, 256 * 256, threadFactory, ProducerType.MULTI, new TimeoutBlockingWaitStrategy(1000, TimeUnit.MILLISECONDS) ); RingBuffer<Event> ringBuffer = disruptor.getRingBuffer(); int num = 3000; EventFun[] consumers = new EventFun[num]; for (int i = 0; i < num; i++) { consumers[i] = new EventFun();
} disruptor.handleEventsWithWorkerPool(consumers); disruptor.start(); for (int i = 0; i < 10; i++) { ringBuffer.publishEvent((e, s) -> { e.setEvent("123"); System.out.println(System.currentTimeMillis()); }); } disruptor.shutdown(); System.out.println("结束了");
}
private static class EventFun implements EventHandler<Event>, WorkHandler<Event> {
public EventFun() { }
/** * 多消费者 * * @param event * @throws Exception */ @Override public void onEvent(Event event) throws Exception { sleep(10); }
/** * 单消费者 * * @param event * @param sequence * @param endOfBatch * @throws Exception */ @Override public void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception { sleep(10); }
}
/** * 消息体 */ private static class Event {
public String getEvent() { return event; }
public void setEvent(String event) { this.event = event; }
String event;
}
private static void sleep(long time) { try { Thread.sleep(time); } catch (InterruptedException e) {
} }
}
复制代码

Have Fun ~ Tester !

发布于: 刚刚阅读数: 2
用户头像

FunTester

关注

公众号:FunTester,750篇原创,欢迎关注 2020.10.20 加入

公众号FunTester,坚持原创文章的测试人,一个有趣的灵魂。

评论

发布
暂无评论
性能测试中Disruptor框架shutdown失效的问题分享_Disruptor_FunTester_InfoQ写作平台