写点什么

Java&Go 高性能队列之 LinkedBlockingQueue 性能测试

作者:FunTester
  • 2022 年 1 月 10 日
  • 本文字数:7170 字

    阅读完需:约 24 分钟

在写完高性能队列Disruptor在测试中应用千万级日志回放引擎设计稿之后,我就一直在准备 Java & Go 语言几种高性能消息队列的性能测试,其中选取了几种基准测试场景以及在性能测试中的应用场景。


测试场景设计的思路参考的两个方面:


  • 消息体大小,我用的不同大小 GET 请求区分

  • 生产者和消费者线程数,Go 语言中称协程 goroutine


PS:后续的文章中,Go 语言文章中如果出现线程,均指 goroutine。

结论

总体来说,java.util.concurrent.LinkedBlockingQueue性能还是在 50 万 QPS 级别上,满足现在压测需求,唯一需要避免的就是队列较长时性能不稳定。总结起来三点比较通用的参考:


  • 消息体尽可能小

  • 线程数增益有限

  • 尽量避免消息积压

简介

首先介绍一下第一个被测试的对象java.util.concurrent.LinkedBlockingQueue,分解名字可以得到这是个由链表实现的阻塞单向的对象。官方给的定义是:


基于链接节点的可选有界阻塞队列。此队列对元素进行 FIFO(先进先出)排序。队列的头部是在队列中时间最长的元素。队列的尾部是在队列中时间最短的元素。新元素被插入到队列的尾部,队列检索操作获取队列头部的元素。链接队列通常比基于数组的队列具有更高的吞吐量,但在大多数并发应用程序中性能更不可预测。


在我查到的几种 JDK 自带的队列实现类中,java.util.concurrent.LinkedBlockingQueue性能是最高的,还有一个候选的类java.util.concurrent.ArrayBlockingQueue,资料说java.util.concurrent.LinkedBlockingQueue性能大概是java.util.concurrent.ArrayBlockingQueue性能的 2 ~ 3 倍,差距过于明显,这个有机会再来测试。

测试结果

这里性能只记录每毫秒处理消息(对象)个数作为评价性能的唯一标准。

数据说明

这里我用了三种org.apache.http.client.methods.HttpGet,创建方法均使用原生 API,为了区分大小的区别,我会响应增加一些 header 和 URL 长度。


小对象:


def get = new HttpGet()
复制代码


中对象:


def get = new HttpGet(url)get.addHeader("token", token)get.addHeader(HttpClientConstant.USER_AGENT)get.addHeader(HttpClientConstant.CONNECTION)
复制代码


大对象:


def get = new HttpGet(url + token)get.addHeader("token", token)get.addHeader("token1", token)get.addHeader("token5", token)get.addHeader("token4", token)get.addHeader("token3", token)get.addHeader("token2", token)get.addHeader(HttpClientConstant.USER_AGENT)get.addHeader(HttpClientConstant.CONNECTION)
复制代码

生产者


中间两次测试失败,是因为等待时间太长了,进行到 300 万左右开始停滞,所以放弃了。


针对org.apache.http.client.methods.HttpRequestBase消息体结论如下:


  1. 长度保持在十万量级

  2. 生产者线程数 5-10 线程

  3. 消息体尽可能小

消费者


针对org.apache.http.client.methods.HttpRequestBase消息体结论如下:


  1. 数据上看长度越长越好

  2. 消费者线程越少越好

  3. 消息体尽可能小


这里跟生产者标准有点不一样,基本上就是锁的竞争越少越好,测试消息数越多越好(这个工作中暂时用不到)。

生产者 & 消费者

这里的线程数指的是生产者或者消费者的数量,总体线程数是此数值的 2 倍。



针对org.apache.http.client.methods.HttpRequestBase消息体结论如下:


  1. 消息队列积累消息越少,速率越快

  2. 消费速率随时间推移越来越快,不明显

  3. 消息体尽可能小

测试用例

测试用例使用 Groovy 语言编写,自从我自定义了异步关键字fun和复习了闭包的语法之后,感觉就像开了光一样,有点迷上了各类多线程的语法实现。所以这个用例对于 Java 同学来讲可能有点看着熟悉,仔细阅读起来有点费劲,我会尽量写一些注释。大家可以把终点放在测试结果上,这可以对以后大家使用java.util.concurrent.LinkedBlockingQueue类有个基本的参考。


测试用例会根据上述的测试场景进行微调,例如线程数、消息体对象的大小等等,这个我会着重进行三种用例场景的测试。当然在工作中使用场景肯定比我提到的三种复杂多,各位有兴趣可以自己亲自上手测试,这里我就不班门弄斧了。

生产者场景

package com.funtest.groovytest
import com.funtester.config.HttpClientConstantimport com.funtester.frame.SourceCodeimport com.funtester.utils.CountUtilimport com.funtester.utils.Timeimport org.apache.http.client.methods.HttpGetimport org.apache.http.client.methods.HttpRequestBase
import java.util.concurrent.CountDownLatchimport java.util.concurrent.LinkedBlockingQueueimport java.util.concurrent.atomic.AtomicInteger
class QueueT extends SourceCode {
static AtomicInteger index = new AtomicInteger(0)
static int total = 100_0000
static int size = 10
static int threadNum = 1
static int piece = total / size
static def url = "http://localhost:12345/funtester"
static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"
public static void main(String[] args) {
LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>()
def start = Time.getTimeStamp() def latch = new CountDownLatch(threadNum) def ts = [] def barrier = new CyclicBarrier(threadNum + 1) def funtester = {//创建异步闭包的方法 fun { barrier.await() while (true) { if (index.getAndIncrement() % piece == 0) { def l = Time.getTimeStamp() - start ts << l output("${formatLong(index.get())}添加总消耗${formatLong(l)}") start = Time.getTimeStamp() } if (index.get() > total) break
def get = new HttpGet(url) get.addHeader("token",token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) linkedQ.put(get) } latch.countDown() } } threadNum.times {funtester()} def st = Time.getTimeStamp() barrier.await() latch.await() def et = Time.getTimeStamp() outRGB("每毫秒速率${total / (et - st)}") outRGB(CountUtil.index(ts).toString()) }

}
复制代码

消费者场景

package com.funtest.groovytest
import com.funtester.config.HttpClientConstantimport com.funtester.frame.SourceCodeimport com.funtester.utils.CountUtilimport com.funtester.utils.Timeimport org.apache.http.client.methods.HttpGetimport org.apache.http.client.methods.HttpRequestBase
import java.util.concurrent.CountDownLatchimport java.util.concurrent.CyclicBarrierimport java.util.concurrent.LinkedBlockingQueueimport java.util.concurrent.TimeUnitimport java.util.concurrent.atomic.AtomicInteger
class QueueTconsume extends SourceCode {
static AtomicInteger index = new AtomicInteger(1)
static int total = 100_0000
static int size = 10
static int threadNum = 5
static int piece = total / size
static def url = "http://localhost:12345/funtester"
static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"
public static void main(String[] args) {
LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>() def pwait = new CountDownLatch(10) def produces = { fun { while (true) { if (linkedQ.size() > total) break def get = new HttpGet(url) get.addHeader("token", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) linkedQ.add(get) } pwait.countDown() } } 10.times {produces()} pwait.await() outRGB("数据构造完成!${linkedQ.size()}")

def start = Time.getTimeStamp() def barrier = new CyclicBarrier(threadNum + 1 ) def latch = new CountDownLatch(threadNum) def ts = [] def funtester = { fun { barrier.await() while (true) { if (index.getAndIncrement() % piece == 0) { def l = Time.getTimeStamp() - start ts << l output("${formatLong(index.get())}消费总消耗${formatLong(l)}") start = Time.getTimeStamp() } def poll = linkedQ.poll(100, TimeUnit.MILLISECONDS) if (poll == null) break } latch.countDown() } } threadNum.times {funtester()} def st = Time.getTimeStamp() barrier.await() latch.await() def et = Time.getTimeStamp() outRGB("每毫秒速率${total / (et - st)}") outRGB(CountUtil.index(ts).toString()) }

}
复制代码

生产者 & 消费者 场景

这里我引入了另外一个变量:初始队列长度 length,用例运行之前将队列按照这个长度进行单线程填充。


package com.funtest.groovytest
import com.funtester.frame.SourceCodeimport com.funtester.utils.Timeimport org.apache.http.client.methods.HttpGetimport org.apache.http.client.methods.HttpRequestBase
import java.util.concurrent.CountDownLatchimport java.util.concurrent.CyclicBarrierimport java.util.concurrent.LinkedBlockingQueueimport java.util.concurrent.TimeUnitimport java.util.concurrent.atomic.AtomicInteger
class QueueBoth extends SourceCode {
static AtomicInteger index = new AtomicInteger(1)
static int total = 500_0000
static int length = 50_0000
static int threadNum = 5
static def url = "http://localhost:12345/funtester"
static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"
public static void main(String[] args) { LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>()
def latch = new CountDownLatch(threadNum * 2) def barrier = new CyclicBarrier(threadNum * 2 + 1) def ts = [] def funtester = {f -> { fun { barrier.await() while (true) { if (index.getAndIncrement() > total) break f() } latch.countDown() } } } def produces = { def get = new HttpGet(url) get.addHeader("token", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) linkedQ.put(get) } length.times {produces()}
threadNum.times { funtester produces funtester {linkedQ.poll(100, TimeUnit.MILLISECONDS)} } def st = Time.getTimeStamp() barrier.await() latch.await() def et = Time.getTimeStamp() outRGB("每毫秒速率${total / (et - st) / 2}") }

}
复制代码

补充

性能非常不稳定

其中有两个问题需要补充说明,java.util.concurrent.LinkedBlockingQueue性能在测试过程中非常不稳定,我每次打印日志以 1/10 为节点打印时间戳,下面分享一些在队列长度 100 万时,生产者模式中的日志:


INFO-> 23.731 F-2  107,942添加总消耗523INFO-> 23.897 F-10 200,061添加总消耗165INFO-> 24.137 F-9  300,024添加总消耗239INFO-> 24.320 F-2  400,037添加总消耗182INFO-> 25.200 F-5  500,065添加总消耗879INFO-> 25.411 F-2  600,094添加总消耗211INFO-> 25.604 F-8  700,090添加总消耗193INFO-> 26.868 F-1  800,047添加总消耗1,264INFO-> 26.927 F-4  900,053添加总消耗57INFO-> 28.454 F-3  1,000,009添加总消耗1,527INFO-> 28.457 main 每毫秒速率190.0779319521INFO-> 28.476 main 平均值:524.0 ,最大值1527.0 ,最小值:57.0 ,中位数:239.0 p99:1527.0 p95:1527.0

INFO-> 43.930 F-10 112,384添加总消耗385INFO-> 44.072 F-9 200,159添加总消耗140INFO-> 44.296 F-1 300,058添加总消耗223INFO-> 44.445 F-7 400,075添加总消耗149INFO-> 45.311 F-10 500,086添加总消耗866INFO-> 45.498 F-8 600,080添加总消耗187INFO-> 45.700 F-1 700,088添加总消耗202INFO-> 45.760 F-9 800,057添加总消耗59INFO-> 47.245 F-6 900,095添加总消耗1,485INFO-> 47.303 F-6 1,000,009添加总消耗58INFO-> 47.305 main 每毫秒速率262.7430373095INFO-> 47.320 main 平均值:375.4 ,最大值1485.0 ,最小值:58.0 ,中位数:202.0 p99:1485.0 p95:1485.0

INFO-> 00.916 F-1 100,000添加总消耗568INFO-> 01.269 F-1 200,000添加总消耗353INFO-> 01.461 F-1 300,000添加总消耗192INFO-> 01.635 F-1 400,000添加总消耗174INFO-> 02.536 F-1 500,000添加总消耗899INFO-> 02.777 F-1 600,000添加总消耗240INFO-> 03.015 F-1 700,000添加总消耗237INFO-> 03.107 F-1 800,000添加总消耗91INFO-> 04.519 F-1 900,000添加总消耗1,412INFO-> 05.940 F-1 1,000,000添加总消耗96INFO-> 05.943 main 每毫秒速率184.5358922310INFO-> 05.959 main 平均值:426.2 ,最大值1412.0 ,最小值:91.0 ,中位数:240.0 p99:1412.0 p95:1412.0
复制代码


可以看出最大值最小值能相差十几倍,甚至二十几倍,这种情况随着消息队列总长度增长而增长,大多数发生在 80 万 ~ 100 万阶段,如果将长度降低到 50 万,这种情况就会得到明显改善。所以还有一个附加观点:消息队列长度应当尽可能少一些。

基准测试

下面是我使用 FunTester 性能测试框架对三种消息对象的生产代码进行的测试结果。



测试用例如下:


package com.funtest.groovytest
import com.funtester.base.constaint.FixedThreadimport com.funtester.config.HttpClientConstantimport com.funtester.frame.execute.Concurrentimport com.funtester.httpclient.FunLibraryimport org.apache.http.client.methods.HttpGet
class TTT extends FunLibrary {
static int total = 100_0000
static int thread = 1
static int times = total / thread
static def url = "http://localhost:12345/funtester"
static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"
public static void main(String[] args) { RUNUP_TIME = 0 def tasks = [] thread.times {tasks << new FunTester(times)} new Concurrent(tasks,"测试生产者代码性能").start()
}
private static class FunTester extends FixedThread {
FunTester(int limit) { super(null, limit, true) }
@Override protected void doing() throws Exception {// def get = new HttpGet()
// def get = new HttpGet(url)// get.addHeader("token", token)// get.addHeader(HttpClientConstant.USER_AGENT)// get.addHeader(HttpClientConstant.CONNECTION)
def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION)
}
@Override FixedThread clone() { return new FunTester(limit) } }
}
复制代码

Have Fun ~ Tester !

发布于: 刚刚阅读数: 2
用户头像

FunTester

关注

公众号:FunTester,750篇原创,欢迎关注 2020.10.20 加入

公众号FunTester,坚持原创文章的测试人,一个有趣的灵魂。

评论

发布
暂无评论
Java&Go高性能队列之LinkedBlockingQueue性能测试