写点什么

高性能队列 Disruptor 在测试中应用

作者:FunTester
  • 2021 年 12 月 28 日
  • 本文字数:3506 字

    阅读完需:约 12 分钟

最近在研究 goreplay 的源码的过程中,感觉有些思路还是很值得借鉴。所以自己立了一个 flag,实现一个千万级日志回放功能。但是在这个实现的过程中遇到一个棘手的问题:Java 自带的 LinkedBlockingQueue 比较难以直接满足需求场景和性能要求。


熟悉 goreplay 的测友应该清楚 Go 语言 chanel 在 goreplay 这个框架中应用是十分广泛的,加上 Go 语言自身较高的性能,可以说双剑合并。所以我也想照葫芦画瓢写一个类似思路的实现。这个后面会有专题讲这个。


基于此,我搜到了 Disruptor 这个高性能队列。Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与 I/O 操作处于同样的数量级)。基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单。


测试使用 Disruptor 时候不用像 Springboot 框架中那样,创建各类对象,抽象各种对象方法,我的原则就是怎么简单怎么来,下面分享一下 Disruptor 在测试中的基础实践和简单案例演示。

依赖

// https://mvnrepository.com/artifact/com.lmax/disruptorimplementation group: 'com.lmax', name: 'disruptor', version: '3.4.2'
复制代码


只列出了 Gradle 的,版本建议使用 3+,有些 Lambda 语法支持需要。

Event 对象

首先我们要定义一个 Event 类型,当然也可以直接使用类似java.lang.String使用已经存在的类,但是在设置 Event 对象时候,需要使用new关键字以及构造新的 Event 时,使用set方法总比直接=赋值的一致性更好一些。单独写一个 Event 类,可以更加简单,是的代码逻辑性更强,不用收到其他类的影响。


这里我定义了简单的 Event 类:


    public static class FunEvent {
String id;
public String getId() { return id; }
public void setId(String id) { this.id = id; }
}
复制代码

Disruptor 创建

Disruptor 对象创建首先需要我们定义一个 Event 类型,调用构造方法,参数有五个。1.Event 初始化方法;2.ringbuffsize,这里设置需要是 2 的整数倍;3.threadfactory,创建线程的工厂类,这里我用了com.funtester.frame.execute.ThreadPoolUtil#getFactory();4.生产者模式,分成单生产者和多生产者枚举类;5.等待策略,官方提供了一些实现类供选择,我使用了com.lmax.disruptor.YieldingWaitStrategy


创建方法如下:


        Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>(                FunEvent::new,                1024 * 1024,                ThreadPoolUtil.getFactory(),                ProducerType.MULTI,                new YieldingWaitStrategy()        );
复制代码

生产者

对于消息队列来讲,需要两个重要的角色,生产者和消费者。这里先将一下 Disruptor 生产者,我搜到不少资料,都是需要创建一个生产者的类,然后实现一个方法,这个方法内容基本一致的,内容如下:


           long sequence = ringBuffer.next();            try {                FunEvent funEvent = ringBuffer.get(sequence);                funEvent.setId(orderId);            } finally {                ringBuffer.publish(sequence);            }
复制代码


然后使用生产者对象调用这个方法,我觉得有点多此一举了,幸好有一篇文章介绍了 Disruptor 一些新特性的时候提到支持了 Lambda 语法,这下就可以不用创建生产者对象了。语法如下:


                ringBuffer.publishEvent((Event, sequence) -> Event.setId(StringUtil.getString(10)));
复制代码

消费者

消费者创建需要实现两个接口com.lmax.disruptor.EventHandlercom.lmax.disruptor.WorkHandler,这俩一个是处理单消费者模式,另外一个多消费者模式。


创建方法如下:


    /**     * 消费者     */    private static class FunEventHandler implements EventHandler<FunEvent>, WorkHandler<FunEvent> {
public void onEvent(FunEvent Event, long sequence, boolean endOfBatch) { output("消费消息:" + Event.getId() + TAB + sequence); }
public void onEvent(FunEvent Event) { output("消费消息:" + Event.getId()); }
}
复制代码

配置 handler

这里分两类:配置单个消费者和配置多个消费者。


单个消费者:


disruptor.handleEventsWith(new FunEventHandler());


多个消费者:


disruptor.handleEventsWithWorkerPool(new FunEventHandler(), new FunEventHandler());


不管是单个还是多个,每次调用都会产生一个com.lmax.disruptor.dsl.EventHandlerGroup,每个com.lmax.disruptor.dsl.EventHandlerGroup都会完整消费每一个生产者产生的 Event,如果设置了 5 次,那么一个 Event 就会被消费 5 次,每个com.lmax.disruptor.dsl.EventHandlerGroup消费一次,而且是阻塞的,加入某一个com.lmax.disruptor.dsl.EventHandlerGroup对象消费慢了,会阻塞其他消费者消费下一个 Event。

启动

组装完成之后就可以启动了 Disruptor 了,语法如下:disruptor.start();,关闭语法disruptor.shutdown();,此处的关闭不会清空getRingBuffer已经存在的 Event,看官方文档应该是停止生产,然后等待消费。

演示 Demo

Java 版本

    public static void main(String[] args) {        Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>(                FunEvent::new,                1024 * 1024,                ThreadPoolUtil.getFactory(),                ProducerType.MULTI,                new YieldingWaitStrategy()        );        disruptor.handleEventsWithWorkerPool(new FunEventHandler(), new FunEventHandler());        disruptor.handleEventsWith(new FunEventHandler());        disruptor.start();        RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer();        for (int i = 0; i < 3; i++) {            ringBuffer.publishEvent((event, sequence) -> event.setId(StringUtil.getString(10)));        }        sleep(5.0);        disruptor.shutdown();
}
复制代码


控制台输出:


INFO-> main 当前用户:oker,工作目录:/Users/oker/IdeaProjects/funtester/,系统编码格式:UTF-8,系统Mac OS X版本:10.16INFO-> main   ###### #     #  #    # ####### ######  #####  ####### ######  #####  #      #     #  ##   #    #    #       #         #    #       #    #  ####   #     #  # #  #    #    ####    #####     #    ####    #####  #      #     #  #  # #    #    #            #    #    #       #   #  #       #####   #    #    #    ######  #####     #    ######  #    #
INFO-> F-3 消费消息:i3OrH2ZnxD 0INFO-> F-1 消费消息:i3OrH2ZnxDINFO-> F-2 消费消息:whhoxoMxmRINFO-> F-3 消费消息:whhoxoMxmR 1INFO-> F-2 消费消息:IeP9fIRpKpINFO-> F-3 消费消息:IeP9fIRpKp 2
Process finished with exit code 0
复制代码


可以看到,每个消息会消费了两次。其中 F-3 线程消费量=F-1 和 F-2 线程消费量总和,这就跟家理解了com.lmax.disruptor.dsl.EventHandlerGroup的功能。

Groovy+异步版本

    public static void main(String[] args) {        Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>(                FunEvent::new,                1024 * 1024,                ThreadPoolUtil.getFactory(),                ProducerType.MULTI,                new YieldingWaitStrategy()        )        disruptor.handleEventsWithWorkerPool(new FunEventHandler(), new FunEventHandler())        disruptor.handleEventsWith(new FunEventHandler())        disruptor.start()        RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer();        def funtester = {            fun {                100.times {ringBuffer.publishEvent((event, sequence) -> event.setId(StringUtil.getString(10)));}            }        }        10.times {funtester()}        sleep(5.0)        disruptor.shutdown()    }
复制代码

Have Fun ~ Tester !

发布于: 刚刚
用户头像

FunTester

关注

公众号:FunTester,650+原创,欢迎关注 2020.10.20 加入

Have Fun,Tester! 公众号FunTester,坚持原创文章的测试人。 FunTester测试框架作者,DCS_FunTester分布式性能测试框架作者。

评论

发布
暂无评论
高性能队列Disruptor在测试中应用