写点什么

Java&Go 高性能队列之 Disruptor 性能测试

作者:FunTester
  • 2022 年 2 月 14 日
  • 本文字数:8249 字

    阅读完需:约 27 分钟

之前写过Java&Go高性能队列之LinkedBlockingQueue性能测试之后,就一直准备这这篇文章,作为准备内容的过程中也写过一些Disruptor高性能消息队列的应用文章:高性能队列Disruptor在测试中应用千万级日志回放引擎设计稿


Disruptor以高性能出名,下面我来测试一下三种场景下性能表现。


有一些基本的设定和用词规范,大家可以翻看Java&Go高性能队列之LinkedBlockingQueue性能测试

结论

总体来说,com.lmax.disruptor.dsl.Disruptor消费性能是非常厉害的,几乎是测不到顶。但是在生产方面,性能会随着 Event 的增加会下降很多还是在 50 万 QPS 级别上,满足现在压测需求,唯一需要避免的就是队列较长时性能不稳定。总结起来几点比较通用的参考:


  • Disruptor消费者能力超强,即使在超高消费者数量(1000),依然保持非常高性能

  • 保证无消息积压前提下,com.lmax.disruptor.AbstractSequencer#bufferSize大小对性能影响不大

  • 在单生产者场景下,Disruptor生产速率与java.util.concurrent.LinkedBlockingQueue一样具有性能不稳定的问题

  • Disruptor性能瓶颈在于生产者,消息对象大小对性能影响较大,多生产者对总体性能影响不大,队列积压对性能影响也不大

  • 如果降低 Event 体积会极大提升性能,以后尽量使用java.lang.String,这点已经在日志回放系统印证了

简介

这里再多唠叨两句。


Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与 I/O 操作处于同样的数量级)。基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单。

测试结果

这里性能只记录每毫秒处理消息(对象)个数作为评价性能的唯一标准。这里我采用的是com.lmax.disruptor.dsl.ProducerType#MULTI消费模式,注册消费者用的是com.lmax.disruptor.dsl.Disruptor#handleEventsWithWorkerPool方法。

数据说明

这里我用了三种org.apache.http.client.methods.HttpGet,创建方法均使用原生 API,为了区分大小的区别,我会响应增加一些 header 和 URL 长度。


小对象:


def get = new HttpGet()
复制代码


中对象:


def get = new HttpGet(url)get.addHeader("token", token)get.addHeader(HttpClientConstant.USER_AGENT)get.addHeader(HttpClientConstant.CONNECTION)
复制代码


大对象:


def get = new HttpGet(url + token)get.addHeader("token", token)get.addHeader("token1", token)get.addHeader("token5", token)get.addHeader("token4", token)get.addHeader("token3", token)get.addHeader("token2", token)get.addHeader(HttpClientConstant.USER_AGENT)get.addHeader(HttpClientConstant.CONNECTION)
复制代码

生产者



测试过程中超大com.lmax.disruptor.AbstractSequencer#bufferSize会导致com.lmax.disruptor.dsl.Disruptor耗时非常长,自测 1024 * 1024 再高就感觉很吃力了,所以没测试超过 1 百万的消息队列长度。由于并没有设定com.lmax.disruptor.AbstractSequencer#bufferSize的测试场景,所以本次测试总是用这个设置。


测试结果规律倒是挺明显的:


  1. 消息总量越大,QPS 越大

  2. 生产者线程数对 QPS 影响不大

  3. 消息体尽可能小

消费者

对于Disruptor框架来讲,单独的消费者用例比较难构建,我用了一个取巧的办法,会对性能测试结果有一些影响,这里可以通过后来分享测试用例的时候会详细说说。不过对于Disruptor逆天的消费能力,这点误差可以忽略。



测试结论也挺明显的,基本与java.util.concurrent.LinkedBlockingQueue一致。


  1. 数据上看长度越长越好

  2. 消费者线程越少越好

  3. 消息体尽可能小


PS:关于Disruptor消费能力,我测试了一个 1 百万大对象消息,1000 线程的消费者用例,QPS=3412/ms,这个跟我后面基于Disruptor设计的新性能测试模型有关系,表明消费者线程数即使增加到 1000,Disruptor依然保持了非常高的性能。

生产者 & 消费者

这里的线程数指的是生产者或者消费者的数量,总体线程数是此数值的 2 倍。



次轮整个测试过程都是几乎崩溃的,因为同样的用例执行起来误差太大了,最大的能有接近 2 倍的差距。以下结论仅供参考:


  1. 消息队列积累消息越少,速率越快

  2. 消费速率随时间推移越来越快

  3. 消息体尽可能小


其中当线程数超过 10 的时候,出现了非常明显的性能下滑,这个可以通过上面两组数据得出原因,Disruptor消费太快了,是生产者的数倍之多。最后测试出来的结果其实就是生产者的速率。当线程数比较少的时候,Disruptor总是有消息堆积的,所以生产者速率不会成为瓶颈,这个也跟用例设计有关系。

基准测试

请翻阅上期的测试文章内容Java&Go高性能队列之LinkedBlockingQueue性能测试

测试用例

测试用例使用 Groovy 语言编写,自从我自定义了异步关键字fun和复习了闭包的语法之后,感觉就像开了光一样,有点迷上了各类多线程的语法实现。本期我又额外使用了自定义统计时间的关键字time以及利用闭包实现自定义等待方法,其他内容均与上期文章相同。


Disruptor有个先天的优势,就是必需得设置ringBufferSize,相当于提前分配内存了。这点是我之前没想到的,当我回去复测LinkedBlockingQueue的时候发现并没有明显的性能差异,对于测试结果影响可忽略。


PS:这里用到了一些sleep(),会导致一些误差,这个以我能力暂无法避免,经过测试对结论影响不大,对数据影响有限。

生产者


import com.funtester.config.HttpClientConstantimport com.funtester.frame.SourceCodeimport com.funtester.frame.execute.ThreadPoolUtilimport com.funtester.utils.Timeimport com.lmax.disruptor.EventHandlerimport com.lmax.disruptor.RingBufferimport com.lmax.disruptor.WorkHandlerimport com.lmax.disruptor.YieldingWaitStrategyimport com.lmax.disruptor.dsl.Disruptorimport com.lmax.disruptor.dsl.ProducerTypeimport org.apache.http.client.methods.HttpGetimport org.apache.http.client.methods.HttpRequestBase
import java.util.concurrent.CountDownLatchimport java.util.concurrent.atomic.AtomicInteger
class DisProduce extends SourceCode {
static AtomicInteger index = new AtomicInteger(1)
static int total = 50_0000
static int size = 10
static int threadNum = 10
static int piece = total / size
static def url = "http://localhost:12345/funtester"
static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"
public static void main(String[] args) { Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>( FunEvent::new, 1024 * 1024, ThreadPoolUtil.getFactory(), ProducerType.MULTI, new YieldingWaitStrategy() ); disruptor.start(); RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer(); def latch = new CountDownLatch(threadNum) def ss = Time.getTimeStamp() def funtester = { fun { (total / threadNum).times { if (index.getAndIncrement() % piece == 0) { def l = Time.getTimeStamp() - ss output("${formatLong(index.get())}添加总消耗${formatLong(l)}") ss = Time.getTimeStamp() } // def get = new HttpGet()
// def get = new HttpGet(url)// get.addHeader("token", token)// get.addHeader(HttpClientConstant.USER_AGENT)// get.addHeader(HttpClientConstant.CONNECTION)
def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) ringBuffer.publishEvent((event, sequence) -> event.setRequest(get)) } latch.countDown() } } // fun { // while (true) { // sleep(1.0) // output(disruptor.getRingBuffer().getBufferSize()) // } // } def start = Time.getTimeStamp() threadNum.times {funtester()} latch.await() def end = Time.getTimeStamp() outRGB("每毫秒速率${total / (end - start)}")

disruptor.shutdown();

}
/** * 消费者 */ private static class FunEventHandler implements EventHandler<FunEvent>, WorkHandler<FunEvent> {
public void onEvent(FunEvent event, long sequence, boolean endOfBatch) {
}
public void onEvent(FunEvent event) {
}
}

private static class FunEvent {
HttpRequestBase request
HttpRequestBase getRequest() { return request }
void setRequest(HttpRequestBase request) { this.request = request };
}
复制代码

消费者


import com.funtester.config.HttpClientConstantimport com.funtester.frame.SourceCodeimport com.funtester.frame.event.EventThreadimport com.funtester.frame.execute.ThreadPoolUtilimport com.funtester.utils.Timeimport com.lmax.disruptor.EventHandlerimport com.lmax.disruptor.RingBufferimport com.lmax.disruptor.WorkHandlerimport com.lmax.disruptor.YieldingWaitStrategyimport com.lmax.disruptor.dsl.Disruptorimport com.lmax.disruptor.dsl.ProducerTypeimport org.apache.http.client.methods.HttpGetimport org.apache.http.client.methods.HttpRequestBase
import java.util.concurrent.atomic.AtomicIntegerimport java.util.stream.Collectors
class DisConsumer extends SourceCode {
static AtomicInteger index = new AtomicInteger(1)
static int total = 50_0000
static int threadNum = 10
static def url = "http://localhost:12345/funtester"
static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"
static def key = true
public static void main(String[] args) {
Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>( FunEvent::new, 1024 * 1024, ThreadPoolUtil.getFactory(), ProducerType.MULTI, new YieldingWaitStrategy() ); def funs = range(threadNum).mapToObj(f -> new FunEventHandler()).collect(Collectors.toList()) disruptor.handleEventsWithWorkerPool(funs as FunEventHandler[]) disruptor.start(); RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer(); def ss = Time.getTimeStamp() time { total.times {// def get = new HttpGet()
// def get = new HttpGet(url)// get.addHeader("token", token)// get.addHeader(HttpClientConstant.USER_AGENT)// get.addHeader(HttpClientConstant.CONNECTION)
def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION)
ringBuffer.publishEvent((event, sequence) -> event.setRequest(get)); } } output("数据$total 构建完成!") def start = Time.getTimeStamp() key = false waitFor {!disruptor.hasBacklog()} , 0.01 def end = Time.getTimeStamp() output(end - start) outRGB("每毫秒速率${total / (end - start)}")

disruptor.shutdown();

}
/** * 消费者 */ private static class FunEventHandler implements EventHandler<FunEvent>, WorkHandler<FunEvent> {
public void onEvent(FunEvent event, long sequence, boolean endOfBatch) { if (key) sleep(0.05) }
public void onEvent(FunEvent event) { if (key) sleep(0.05) }
}

private static class FunEvent {
HttpRequestBase request
HttpRequestBase getRequest() { return request }
void setRequest(HttpRequestBase request) { this.request = request };
}
复制代码

生产者 & 消费者


import com.funtester.config.HttpClientConstantimport com.funtester.frame.SourceCodeimport com.funtester.frame.execute.ThreadPoolUtilimport com.funtester.utils.Timeimport com.lmax.disruptor.EventHandlerimport com.lmax.disruptor.RingBufferimport com.lmax.disruptor.WorkHandlerimport com.lmax.disruptor.YieldingWaitStrategyimport com.lmax.disruptor.dsl.Disruptorimport com.lmax.disruptor.dsl.ProducerTypeimport org.apache.http.client.methods.HttpGetimport org.apache.http.client.methods.HttpRequestBase
import java.util.concurrent.atomic.AtomicIntegerimport java.util.stream.Collectors
class DisBoth extends SourceCode {
static AtomicInteger index = new AtomicInteger(1)
static int total = 100_0000
static int threadNum = 5
static int buffer = 20_0000
static def url = "http://localhost:12345/funtester"
static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"
static def key = true
public static void main(String[] args) {
Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>( FunEvent::new, 1024 * 256, ThreadPoolUtil.getFactory(), ProducerType.MULTI, new YieldingWaitStrategy() ); def funs = range(threadNum).mapToObj(f -> new FunEventHandler()).collect(Collectors.toList()) disruptor.handleEventsWithWorkerPool(funs as FunEventHandler[]) disruptor.start(); RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer(); def produces = { fun { while (true) { if (index.getAndIncrement() > total) break // def get = new HttpGet()
// def get = new HttpGet(url)// get.addHeader("token", token)// get.addHeader(HttpClientConstant.USER_AGENT)// get.addHeader(HttpClientConstant.CONNECTION)
def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) ringBuffer.publishEvent((event, sequence) -> event.setRequest(get)); } } } time { buffer.times { // def get = new HttpGet()
// def get = new HttpGet(url) // get.addHeader("token", token) // get.addHeader(HttpClientConstant.USER_AGENT) // get.addHeader(HttpClientConstant.CONNECTION)
def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) ringBuffer.publishEvent((event, sequence) -> event.setRequest(get)); } } output("数据$buffer 构建完成!") def start = Time.getTimeStamp() key = false threadNum.times {produces()} waitFor {!disruptor.hasBacklog()} , 0.01 def end = Time.getTimeStamp() def l = end - start output(l) outRGB("每毫秒速率${(total + buffer) / l}")

disruptor.shutdown();

}
/** * 消费者 */ private static class FunEventHandler implements EventHandler<FunEvent>, WorkHandler<FunEvent> {
public void onEvent(FunEvent event, long sequence, boolean endOfBatch) { if (key) sleep(0.05) }
public void onEvent(FunEvent event) { if (key) sleep(0.05) }

}

private static class FunEvent {
HttpRequestBase request
HttpRequestBase getRequest() { return request }
void setRequest(HttpRequestBase request) { this.request = request };
}
复制代码

Have Fun ~ Tester !

发布于: 刚刚阅读数: 4
用户头像

FunTester

关注

公众号:FunTester,750篇原创,欢迎关注 2020.10.20 加入

公众号FunTester,坚持原创文章的测试人,一个有趣的灵魂。

评论

发布
暂无评论
Java&Go高性能队列之Disruptor性能测试