之前写过Java&Go高性能队列之LinkedBlockingQueue性能测试之后,就一直准备这这篇文章,作为准备内容的过程中也写过一些Disruptor高性能消息队列的应用文章:高性能队列Disruptor在测试中应用和千万级日志回放引擎设计稿。
Disruptor以高性能出名,下面我来测试一下三种场景下性能表现。
有一些基本的设定和用词规范,大家可以翻看Java&Go高性能队列之LinkedBlockingQueue性能测试。
结论
总体来说,com.lmax.disruptor.dsl.Disruptor消费性能是非常厉害的,几乎是测不到顶。但是在生产方面,性能会随着 Event 的增加会下降很多还是在 50 万 QPS 级别上,满足现在压测需求,唯一需要避免的就是队列较长时性能不稳定。总结起来几点比较通用的参考:
从Disruptor消费者能力超强,即使在超高消费者数量(1000),依然保持非常高性能
保证无消息积压前提下,com.lmax.disruptor.AbstractSequencer#bufferSize大小对性能影响不大
在单生产者场景下,Disruptor生产速率与java.util.concurrent.LinkedBlockingQueue一样具有性能不稳定的问题
Disruptor性能瓶颈在于生产者,消息对象大小对性能影响较大,多生产者对总体性能影响不大,队列积压对性能影响也不大
如果降低 Event 体积会极大提升性能,以后尽量使用java.lang.String,这点已经在日志回放系统印证了
简介
这里再多唠叨两句。
Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与 I/O 操作处于同样的数量级)。基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单。
测试结果
这里性能只记录每毫秒处理消息(对象)个数作为评价性能的唯一标准。这里我采用的是com.lmax.disruptor.dsl.ProducerType#MULTI消费模式,注册消费者用的是com.lmax.disruptor.dsl.Disruptor#handleEventsWithWorkerPool方法。
数据说明
这里我用了三种org.apache.http.client.methods.HttpGet,创建方法均使用原生 API,为了区分大小的区别,我会响应增加一些 header 和 URL 长度。
小对象:
中对象:
def get = new HttpGet(url)get.addHeader("token", token)get.addHeader(HttpClientConstant.USER_AGENT)get.addHeader(HttpClientConstant.CONNECTION)
复制代码
大对象:
def get = new HttpGet(url + token)get.addHeader("token", token)get.addHeader("token1", token)get.addHeader("token5", token)get.addHeader("token4", token)get.addHeader("token3", token)get.addHeader("token2", token)get.addHeader(HttpClientConstant.USER_AGENT)get.addHeader(HttpClientConstant.CONNECTION)
复制代码
生产者
测试过程中超大com.lmax.disruptor.AbstractSequencer#bufferSize会导致com.lmax.disruptor.dsl.Disruptor耗时非常长,自测 1024 * 1024 再高就感觉很吃力了,所以没测试超过 1 百万的消息队列长度。由于并没有设定com.lmax.disruptor.AbstractSequencer#bufferSize的测试场景,所以本次测试总是用这个设置。
测试结果规律倒是挺明显的:
消息总量越大,QPS 越大
生产者线程数对 QPS 影响不大
消息体尽可能小
消费者
对于Disruptor框架来讲,单独的消费者用例比较难构建,我用了一个取巧的办法,会对性能测试结果有一些影响,这里可以通过后来分享测试用例的时候会详细说说。不过对于Disruptor逆天的消费能力,这点误差可以忽略。
测试结论也挺明显的,基本与java.util.concurrent.LinkedBlockingQueue一致。
数据上看长度越长越好
消费者线程越少越好
消息体尽可能小
PS:关于Disruptor消费能力,我测试了一个 1 百万大对象消息,1000 线程的消费者用例,QPS=3412/ms,这个跟我后面基于Disruptor设计的新性能测试模型有关系,表明消费者线程数即使增加到 1000,Disruptor依然保持了非常高的性能。
生产者 & 消费者
这里的线程数指的是生产者或者消费者的数量,总体线程数是此数值的 2 倍。
次轮整个测试过程都是几乎崩溃的,因为同样的用例执行起来误差太大了,最大的能有接近 2 倍的差距。以下结论仅供参考:
消息队列积累消息越少,速率越快
消费速率随时间推移越来越快
消息体尽可能小
其中当线程数超过 10 的时候,出现了非常明显的性能下滑,这个可以通过上面两组数据得出原因,Disruptor消费太快了,是生产者的数倍之多。最后测试出来的结果其实就是生产者的速率。当线程数比较少的时候,Disruptor总是有消息堆积的,所以生产者速率不会成为瓶颈,这个也跟用例设计有关系。
基准测试
请翻阅上期的测试文章内容Java&Go高性能队列之LinkedBlockingQueue性能测试。
测试用例
测试用例使用 Groovy 语言编写,自从我自定义了异步关键字fun和复习了闭包的语法之后,感觉就像开了光一样,有点迷上了各类多线程的语法实现。本期我又额外使用了自定义统计时间的关键字time以及利用闭包实现自定义等待方法,其他内容均与上期文章相同。
Disruptor有个先天的优势,就是必需得设置ringBufferSize,相当于提前分配内存了。这点是我之前没想到的,当我回去复测LinkedBlockingQueue的时候发现并没有明显的性能差异,对于测试结果影响可忽略。
PS:这里用到了一些sleep(),会导致一些误差,这个以我能力暂无法避免,经过测试对结论影响不大,对数据影响有限。
生产者
import com.funtester.config.HttpClientConstantimport com.funtester.frame.SourceCodeimport com.funtester.frame.execute.ThreadPoolUtilimport com.funtester.utils.Timeimport com.lmax.disruptor.EventHandlerimport com.lmax.disruptor.RingBufferimport com.lmax.disruptor.WorkHandlerimport com.lmax.disruptor.YieldingWaitStrategyimport com.lmax.disruptor.dsl.Disruptorimport com.lmax.disruptor.dsl.ProducerTypeimport org.apache.http.client.methods.HttpGetimport org.apache.http.client.methods.HttpRequestBase
import java.util.concurrent.CountDownLatchimport java.util.concurrent.atomic.AtomicInteger
class DisProduce extends SourceCode {
static AtomicInteger index = new AtomicInteger(1)
static int total = 50_0000
static int size = 10
static int threadNum = 10
static int piece = total / size
static def url = "http://localhost:12345/funtester"
static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"
public static void main(String[] args) { Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>( FunEvent::new, 1024 * 1024, ThreadPoolUtil.getFactory(), ProducerType.MULTI, new YieldingWaitStrategy() ); disruptor.start(); RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer(); def latch = new CountDownLatch(threadNum) def ss = Time.getTimeStamp() def funtester = { fun { (total / threadNum).times { if (index.getAndIncrement() % piece == 0) { def l = Time.getTimeStamp() - ss output("${formatLong(index.get())}添加总消耗${formatLong(l)}") ss = Time.getTimeStamp() } // def get = new HttpGet()
// def get = new HttpGet(url)// get.addHeader("token", token)// get.addHeader(HttpClientConstant.USER_AGENT)// get.addHeader(HttpClientConstant.CONNECTION)
def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) ringBuffer.publishEvent((event, sequence) -> event.setRequest(get)) } latch.countDown() } } // fun { // while (true) { // sleep(1.0) // output(disruptor.getRingBuffer().getBufferSize()) // } // } def start = Time.getTimeStamp() threadNum.times {funtester()} latch.await() def end = Time.getTimeStamp() outRGB("每毫秒速率${total / (end - start)}")
disruptor.shutdown();
}
/** * 消费者 */ private static class FunEventHandler implements EventHandler<FunEvent>, WorkHandler<FunEvent> {
public void onEvent(FunEvent event, long sequence, boolean endOfBatch) {
}
public void onEvent(FunEvent event) {
}
}
private static class FunEvent {
HttpRequestBase request
HttpRequestBase getRequest() { return request }
void setRequest(HttpRequestBase request) { this.request = request };
}
复制代码
消费者
import com.funtester.config.HttpClientConstantimport com.funtester.frame.SourceCodeimport com.funtester.frame.event.EventThreadimport com.funtester.frame.execute.ThreadPoolUtilimport com.funtester.utils.Timeimport com.lmax.disruptor.EventHandlerimport com.lmax.disruptor.RingBufferimport com.lmax.disruptor.WorkHandlerimport com.lmax.disruptor.YieldingWaitStrategyimport com.lmax.disruptor.dsl.Disruptorimport com.lmax.disruptor.dsl.ProducerTypeimport org.apache.http.client.methods.HttpGetimport org.apache.http.client.methods.HttpRequestBase
import java.util.concurrent.atomic.AtomicIntegerimport java.util.stream.Collectors
class DisConsumer extends SourceCode {
static AtomicInteger index = new AtomicInteger(1)
static int total = 50_0000
static int threadNum = 10
static def url = "http://localhost:12345/funtester"
static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"
static def key = true
public static void main(String[] args) {
Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>( FunEvent::new, 1024 * 1024, ThreadPoolUtil.getFactory(), ProducerType.MULTI, new YieldingWaitStrategy() ); def funs = range(threadNum).mapToObj(f -> new FunEventHandler()).collect(Collectors.toList()) disruptor.handleEventsWithWorkerPool(funs as FunEventHandler[]) disruptor.start(); RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer(); def ss = Time.getTimeStamp() time { total.times {// def get = new HttpGet()
// def get = new HttpGet(url)// get.addHeader("token", token)// get.addHeader(HttpClientConstant.USER_AGENT)// get.addHeader(HttpClientConstant.CONNECTION)
def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION)
ringBuffer.publishEvent((event, sequence) -> event.setRequest(get)); } } output("数据$total 构建完成!") def start = Time.getTimeStamp() key = false waitFor {!disruptor.hasBacklog()} , 0.01 def end = Time.getTimeStamp() output(end - start) outRGB("每毫秒速率${total / (end - start)}")
disruptor.shutdown();
}
/** * 消费者 */ private static class FunEventHandler implements EventHandler<FunEvent>, WorkHandler<FunEvent> {
public void onEvent(FunEvent event, long sequence, boolean endOfBatch) { if (key) sleep(0.05) }
public void onEvent(FunEvent event) { if (key) sleep(0.05) }
}
private static class FunEvent {
HttpRequestBase request
HttpRequestBase getRequest() { return request }
void setRequest(HttpRequestBase request) { this.request = request };
}
复制代码
生产者 & 消费者
import com.funtester.config.HttpClientConstantimport com.funtester.frame.SourceCodeimport com.funtester.frame.execute.ThreadPoolUtilimport com.funtester.utils.Timeimport com.lmax.disruptor.EventHandlerimport com.lmax.disruptor.RingBufferimport com.lmax.disruptor.WorkHandlerimport com.lmax.disruptor.YieldingWaitStrategyimport com.lmax.disruptor.dsl.Disruptorimport com.lmax.disruptor.dsl.ProducerTypeimport org.apache.http.client.methods.HttpGetimport org.apache.http.client.methods.HttpRequestBase
import java.util.concurrent.atomic.AtomicIntegerimport java.util.stream.Collectors
class DisBoth extends SourceCode {
static AtomicInteger index = new AtomicInteger(1)
static int total = 100_0000
static int threadNum = 5
static int buffer = 20_0000
static def url = "http://localhost:12345/funtester"
static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"
static def key = true
public static void main(String[] args) {
Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>( FunEvent::new, 1024 * 256, ThreadPoolUtil.getFactory(), ProducerType.MULTI, new YieldingWaitStrategy() ); def funs = range(threadNum).mapToObj(f -> new FunEventHandler()).collect(Collectors.toList()) disruptor.handleEventsWithWorkerPool(funs as FunEventHandler[]) disruptor.start(); RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer(); def produces = { fun { while (true) { if (index.getAndIncrement() > total) break // def get = new HttpGet()
// def get = new HttpGet(url)// get.addHeader("token", token)// get.addHeader(HttpClientConstant.USER_AGENT)// get.addHeader(HttpClientConstant.CONNECTION)
def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) ringBuffer.publishEvent((event, sequence) -> event.setRequest(get)); } } } time { buffer.times { // def get = new HttpGet()
// def get = new HttpGet(url) // get.addHeader("token", token) // get.addHeader(HttpClientConstant.USER_AGENT) // get.addHeader(HttpClientConstant.CONNECTION)
def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) ringBuffer.publishEvent((event, sequence) -> event.setRequest(get)); } } output("数据$buffer 构建完成!") def start = Time.getTimeStamp() key = false threadNum.times {produces()} waitFor {!disruptor.hasBacklog()} , 0.01 def end = Time.getTimeStamp() def l = end - start output(l) outRGB("每毫秒速率${(total + buffer) / l}")
disruptor.shutdown();
}
/** * 消费者 */ private static class FunEventHandler implements EventHandler<FunEvent>, WorkHandler<FunEvent> {
public void onEvent(FunEvent event, long sequence, boolean endOfBatch) { if (key) sleep(0.05) }
public void onEvent(FunEvent event) { if (key) sleep(0.05) }
}
private static class FunEvent {
HttpRequestBase request
HttpRequestBase getRequest() { return request }
void setRequest(HttpRequestBase request) { this.request = request };
}
复制代码
Have Fun ~ Tester !
评论