RocketMQ 这样做,压测后性能提高 30%
从官方这边获悉,RocketMQ 在 4.9.1 版本中对消息发送进行了大量的优化,性能提升十分显著,接下来请跟着我一起来欣赏大神们的杰作。
根据 RocketMQ4.9.1 的更新日志,我们从中提取到关于消息发送性能优化的【Issues:2883】,详细链接如下:
https://github.com/apache/rocketmq/issues/2883
具体优化点如截图所示:
首先先尝试对上述优化点做一个简单的介绍:
对 WaitNotifyObject 的锁进行优化(item2)
移除 HAService 中的锁(item3)
移除 GroupCommitService 中的锁(item4)
消除 HA 中不必要的数组拷贝(item5)
调整消息发送几个参数的默认值(item7)sendMessageThreadPoolNumsuseReentrantLockWhenPutMessageflushCommitLogTimedendTransactionThreadPoolNums
减少琐的作用范围(item8-12)
接下来我们逐一来看看其优化点,并简单加以分析。
通过阅读相关的变更,优化手段主要包括:
移除不必要的锁
降低锁粒度(范围)
修改消息发送相关参数
接下来根据上述手段,从中挑选具有代表性功能进行详细剖析,一起领悟 Java 高并发编程。
1、移除不必要的锁
本次性能优化,主要针对的是 RocketMQ 同步复制场景。
我们首先先来简单介绍一下 RocketMQ 主从同步在编程方面的技巧。
RocketMQ 主节点将消息写入内存后, 如果采用的是同步复制,需要等待从节点成功写入后才能向消息发送客户端返回成功,在代码编写方面也极具技巧性,其序列图入下图所示:
温馨提示:在 RocketMQ4.7 版本开始对消息发送进行了优化,同步消息发送模型引入了 jdk 的 CompletableFuture 实现消息的异步发送。
核心步骤解读:
消息发送线程调用 Commitlog 的 aysncPutMessage 方法写入消息。
Commitlog 调用 submitReplicaRequest 方法,将任务提交到 GroupTransferService 中,并获取一个 Future,实现异步编程。值得注意的是这里需要等待,待数据成功写入从节点(内部基于 CompletableFuture 机制的内部线程池 ForkJoin)。
GroupTransferService 中对提交的任务依次进行判断,判断对应的请求是否已同步到从节点。
如果已经复制到从节点,则通过 Future 唤醒,并将结果返回给消息发送端。
GroupTransferService 代码如下图所示:
为了更加方便大家理解接下来的优化点,首先再总结提炼一下 GroupTransferService 的设计理念:
首先引入两个 List 结合,分别命名为读、写链表。
外部调用 GroupTransferService 的 putRequest 请求,将存储在写链表中(requestWrite)。
GroupTransferService 的 run 方法从 requestRead 链表中获取任务,判断这些任务对应的请求的数据是否成功写入到从节点。
每当 requestRead 中没有数据可读时,两个队列进行交互,从而实现读写分离,降低锁竞争。
新版本的优化点主要包括:
更改 putRequest 的锁类型,用自旋锁替换 synchronized
去除 doWaitTransfer 方法中多余的锁
1.1 使用自旋锁替换 synchronized
场景分析:正入下图所示,GroupTransferService 向外提供一个接口 putRequest 用来接受外部的同步任务,需要对线程不安全的 ArrayList 加锁进行保护,往 ArrayList 中添加数据属于一个内存操作,操作耗时小。
故这里没必要采取 synchronized 这种 synchronized,而是可以自旋锁,自旋锁的实现非常轻量级,其实现如下图所示:
整个锁的实现就只需引入一个 AtomicBoolean,加锁、释放锁都是基于 CAS 操作,非常的轻量,并且自旋锁不会发生线程切换。
1.2 去除多余的锁
“锁”的滥用是一个非常普遍的现象,多线程环境编程是一个非常复杂的交互过程,在编写代码过程中我们可能觉得自己无法预知这段代码是否会被多个线程并发执行,为了谨慎起见,就直接简单粗暴的对其进行加锁,带来的自然是性能的损耗,这里将该锁去除,我们就要结合该类的调用链条,判断是否需要加锁。
整个 GroupTransferService 中在多线程环境中运行需要被保护的主要是 requestRead 与 requestWrite 集合,引入的锁的目的也是确保这两个集合在多线程环境下安全访问,故我们首先应该梳理一下 GroupTransferService 的核心方法的运作流程:
doWaitTransfer 方法操作的主要对象是 requestRead 链表,而且该方法只会被 GroupTransferService 线程调用,并且 requestRead 中方法会在 swapRequest 中被修改,但这两个方法是串行执行,而且在同一个线程中,故无需引入锁,该锁可以移除。
但由于该锁被移除,在 swapRequests 中进行加锁,因为 requestWrite 这个队列会被多个线程访问,优化后的代码如下:
从这个角度来看,其实主要是将锁的类型由 synchronized 替换为更加轻量的自旋锁。
2、降低锁的范围
被锁包裹的代码块是串行执行,即无法并发,在无法避免锁的情况下,降低锁的代码块,能有效提高并发度,图解如下:
如果多个线程区访问 lock1,lock2,在 lock1 中 domSomeThing1、domSomeThing2 这两个方法都必须串行执行,而多个线程同时访问 lock2 方法,doSomeThing1 能被多个线程同时执行,只有 doSomething2 时才需要串行执行,其整体并发效果肯定是 lock2,基于这样理论:得出一个锁使用的最佳实践:被锁包裹的代码块越少越好。
在老版本中,消息写入加锁的代码块比较大,一些可以并发执行的动作也被锁包裹,例如生成 offsetMsgId。
新版本采用函数式编程的思路,只是定义来获取 msgId 的方法,在进行消息写入时并不会执行,降低锁的粒度,使得 offsetMsgId 的生成并行化,其编程手段之巧妙,值得我们学习。
3、调整消息发送相关的参数
sendMessageThreadPoolNums
Broker 端消息发送端线程池数量,该值在 4.9.0 版本之前默认为 1,新版本调整为操作系统的 CPU 核数,并且不小于 4。
useReentrantLockWhenPutMessage
MQ 消息写入时对内存加锁使用的锁类型,低版本之前默认为 false,表示默认使用自旋锁;新版本使用 ReentrantLock。自旋主要的优势是没有线程切换成本,但自旋容易造成 CPU 的浪费,内存写入大部分情况下是很快,但 RocketMQ 比较依赖页缓存,如果出现也缓存抖动,带来的 CPU 浪费是非常不值得,在 sendMessageThreadPoolNums 设置超过 1 之后,锁的类型使用 ReentrantLock 更加稳定。
flushCommitLogTimed
首先我们通过观察源码了解一下该参数的含义:
其主要作用是控制刷盘线程阻塞等待的方式,低版本 flushCommitLogTimed 为 false,默认使用 CountDownLatch,而高版本则直接使用 Thread.sleep。猜想的原因是刷盘线程比较独立,无需与其他线程进行直接的交互协作,故无需使用 CountDownLatch 这种专门用来线程协作的“外来和尚”。
endTransactionThreadPoolNums
主要用于设置事务消息线程池的大小。
新版本主要是可通过调整发送线程池来动态调节事务消息的值,这个大家可以根据压测结果动态调整。
文章首发于https://www.codingw.net/posts/fbea8b3.html
作者简介:丁威,《RocketMQ 技术内幕》一书作者、RocketMQ 开源社区优秀布道师,公众号「中间件兴趣圈」维护者,主打成体系剖析 Java 主流中间件,已发布 Kafka、RocketMQ、Dubbo、Sentinel、Canal、ElasticJob 等中间件 15 个专栏。
版权声明: 本文为 InfoQ 作者【中间件兴趣圈】的原创文章。
原文链接:【http://xie.infoq.cn/article/705b63c00742aac0a233126fd】。文章转载请联系作者。
评论