RocketMQ 实战—生产优化及运维方案
1.RocketMQ 集群如何进行权限机制的控制
(1)RocketMQ 进行权限控制的必要性
如果一个公司有很多技术团队,每个技术团队都会使用 RocketMQ 集群中的部分 Topic。那么此时可能就会有一个问题:如果订单团队使用的 Topic,被商品团队不小心写入了错误的脏数据,那就可能会导致订单团队的 Topic 里的数据出错。
所以此时就需要在 RocketMQ 中引入权限功能,也就是规定好订单团队的用户只能使用"OrderTopic"。然后商品团队的用户只能使用"ProductTopic",各团队互相之间不能使用对方的 Topic。
(2)在 RocketMQ 中实现权限控制的步骤
步骤一:首先需要在 Broker 端放一个额外的 ACL 权限控制配置文件。配置文件里面需要规定好权限,包括什么用户对哪些 Topic 有什么操作权限,这样各个 Broker 才知道每个用户的权限。
步骤二:然后在每个 Broker 的配置文件里需要设置 aclEnable=true,开启权限控制。
步骤三:接着在每个 Broker 机器的目录下放一个 plain_acl.yml 的配置文件。这个目录是 ${ROCKETMQ_HOME}/store/config,这个配置文件的具体权限配置如下:
上面配置需要注意的是:如果一个账号没有对某个 Topic 显式指定权限,那么就会采用默认 Topic 权限。
步骤四:最后在生产者和消费者中,指定分配到的 RocketMQ 账号。这样,当生产者或消费者使用一个账号时,就只能访问有权限的 Topic。
上面的代码就是在创建 Producer 时,传入了一个 AclClientRPCHook 实例。在 AclClientRPCHook 里就可以设置这个 Producer 的账号密码,对于创建 Consumer 也是同理。通过这样的方式就可以在 Broker 端设置好每个账号对 Topic 的访问权限,然后不同的技术团队使用不同的账号即可。
2.如何对 RocketMQ 集群进行消息堆积的追踪
(1)开启 RocketMQ 的消息轨迹功能的步骤
有时候需要了解一条消息的消息轨迹,来协助排查线上问题。比如想知道消息是什么时候从哪个 Producer 发出来的,什么时候进入到了哪个 Broker 的哪个 Topic,什么时候被哪个 Consumer 消费的。此时就可以使用 RocketMQ 的消息轨迹功能,其配置步骤如下:
步骤一:首先在 Broker 的配置文件里开启消息轨迹追踪的功能,也就是设置 traceTopicEnable = true。开启了该功能之后,启动 Broker 时就会自动创建出一个内部的 Topic:RMQ_SYS_TRACE_TOPIC,这个 Topic 会存储所有消息的轨迹追踪数据。
步骤二:然后在发送消息和消费消息时开启消息轨迹追踪的功能。也就是在创建 Producer 和 Consumer 时,其构造函数的第二个参数 enableMsgTrace 设置为 true。
(2)配置好消息轨迹功能后消息轨迹的处理流程
当 Broker、Producer、Consumer 都配置好消息轨迹追踪后:
首先,Producer 发送消息时就会上报这个消息的轨迹数据到 RMQ_SYS_TRACE_TOPIC 里。此时上报的数据包括:Producer 的信息、发送消息的时间、消息是否发送成功、发送消息的耗时。
接着,消息到达 Broker 之后,Broker 也会记录消息的轨迹数据。此时记录的数据包括:消息存储的 Topic、消息存储的位置、消息的 key 和 tags。
然后,消息被 Consumer 消费时,Consumer 也会上报一些轨迹数据到 RMQ_SYS_TRACE_TOPIC 里。此时上报的数据包括:Consumer 的信息、投递消息的时间、这是第几轮投递消息、消息消费是否成功、消费这条消息的耗时。
最后,如果想要查询消息轨迹,只需要在 RocketMQ 控制台里查询。其导航栏就有一个消息轨迹,在里面可以创建查询任务。可以根据 messageId、message key 或者 Topic 来查询。查询任务执行完毕后,就可以看到消息轨迹的界面了。在消息轨迹的界面里就会展示出 Producer、Broker、Consumer 上报的轨迹数据了。
3.如何处理 RocketMQ 的百万消息积压问题
(1)产生消息积压问题的案例背景
曾经有一个系统,它就是由生产者和消费者两部分组成的。生产者负责不停地把消息写入 RocketMQ 里,然后消费者负责从 RocketMQ 里消费消息。这个系统运行时是有高峰期和低谷期的,在晚上几个小时的高峰期内,大概会有 100 多万条消息进入 RocketMQ。此外,消费者从 RocketMQ 里获取到消息后,会依赖 NoSQL 数据库进行一些业务逻辑的处理。
有一天晚上,消费者依赖的 NoSQL 数据库挂了,导致消费者没法继续从 RocketMQ 里消费数据进行处理。然后生产者在晚上几个小时的高峰期内,往 RocketMQ 写入了 100 多万条消息,这些消息都被积压了。
处理这种紧急的线上事故,一般有如下几种方案。
(2)直接丢弃消息来解决消息积压问题
如果这些消息是允许丢失的,那么此时可以紧急修改消费者的代码:在代码里对所有获取到的消息直接丢弃,不做任何处理。这样可以迅速让积压在 RocketMQ 里的百万消息被处理掉,只不过处理方式就是全部丢弃而已。
(3)在旧 Topic 上扩容消费者来解决消息积压问题
如果这些消息是不允许丢失的,那么可以先等待消费者依赖的 NoSQL 数据库恢复,恢复后就可以根据线上 Topic 的 MessageQueue 数量来决定如何处理。
假设线上 Topic 有 20 个 MessageQueue,然后只有 4 个消费者在消费,那么每个消费者会从 5 个 MessageQueue 里获取消息。此时如果仅仅依靠 4 个消费者来消费肯定是会继续积压消息的,毕竟 RocketMQ 里已经积压了百万消息了。
所以此时可以临时申请 16 台机器多部署 16 个消费者实例,然后让 20 个消费者同时消费。每个消费者消费一个 MessageQueue 的消息,此时消费的速度会提高 5 倍,积压的百万消息很快就会被处理完。
但是这里需要考虑消费者依赖的 NoSQL 数据库必须要能抗住临时增加 5 倍的读写压力,因为原来只有 4 个消费者在读写 NoSQL,现在临时变成了 20 个消费者了。当处理完百万积压的消息后,就可以下线多余的 16 台机器了。
这是最常见的处理百万消息积压的办法。
(4)通过新 Topic 扩容消费者来解决消息积压问题
如果 Topic 总共就只有 4 个 MessageQueue,然后只有 4 个消费者呢?这时就没办法扩容消费者了,因为加再多的消费者,还是只有 4 个 MessageQueue,没法降低原来消费者的消费压力了。
所以此时需要临时修改那 4 个消费者的代码,让它们获取到消息后不依赖 NoSQL,直接把消息写入一个新的 Topic,这时候的速度是很快的,因为仅仅是读写 RocketMQ 而已。然后新的 Topic 会有 20 个 MessageQueue,于是部署 20 台临时增加的消费者去消费新的 Topic,消费新的 Topic 时才依赖 NoSQL。通过将积压的消息转移到一个新的 Topic,来解决无法扩容消费者的问题。
(5)消息积压问题的处理总结
如果 MessageQueue 比较多,可以直接扩容消费者,那么就直接临时增加消费者实例来扩容消费者。
如果 MessageQueue 比较少,不能直接扩容消费者,那么就把积压在原 Topic 的消息写入到新 Topic。在消费新 Topic 的消息时,临时部署足够多的消费者实例,来实现间接扩容消费者。
4.针对 RocketMQ 集群崩溃的金融级高可用方案
金融级的系统如果依赖了 RocketMQ 集群,那么应该如何设计 RocketMQ 集群崩溃时的高可用方案?
通常会在发送消息到 RocketMQ 的系统中设计高可用的降级方案,这个降级方案的思路如下:
在发送消息到 RocketMQ 的代码里通过 try catch 捕获异常,如果发现异常就进行重试。如果连续重试 3 次还是失败,则说明 RocketMQ 集群可能彻底崩溃了。此时需要把这条重要的消息进行持久化:可以是数据库、本地磁盘文件、NoSQL 存储。之后需要不停地尝试发送消息到 RocketMQ,一旦发现 RocketMQ 集群恢复,则通过后台线程把之前持久化存储的消息查询出来,然后按顺序发送到 RocketMQ,这样才能保证消息不会因为 RocketMQ 集群彻底崩溃而丢失。
注意:对消息进行持久化的时候要保证它的顺序。
只要使用这个方案,哪怕 RocketMQ 集群突然崩溃了,系统也不会丢失消息。这种高可用的方案设计,对于一些和金钱相关的金融系统、广告系统来说,是非常有必要的。
5.为 RocketMQ 增加消息限流功能保证其高可用
为什么要给 RocketMQ 增加限流功能保证其高可用性?因为限流功能可以对 RocketMQ 提供保护,避免因为 Bug 等原因导致短时间往 RocketMQ 写入大量数据而出现故障。
比如下面的代码,因为某些原因导致在 while 循环中向 RocketMQ 发消息。如果系统是部署在 10 多台机器上的,那么可能出现 10 多台机器都频繁往 RocketMQ 写消息,瞬间导致 RocketMQ 集群的 TPS 飙升,压垮 RocketMQ 集群。
所以针对这种情况,一般会改造 RocketMQ 的源码。在 Broker 接收消息时,引入限流机制。只允许一秒内写入多少条消息,避免因为一些异常情况,导致 RocketMQ 集群挂掉。
6.从 Kafka 迁移到 RocketMQ 的双写双读方案
假设系统原来使用的 MQ 是 Kafka,现在要从 Kafka 迁移到 RocketMQ,那么这个迁移过程应该怎么做?
首先要做到双写,也就是在所有的 Producer 系统中,同时往 Kafka 和 RocketMQ 写入消息。一般会让双写持续 1 周左右,因为 MQ 里面数据也就最多保留一周。当双写持续一周后,Kafka 和 RocketMQ 里的数据基本一模一样了。
但光是双写还不够,还需要同时进行双读。在双写的同时,所有 Consumer 消费者都要同时从 Kafka 和 RocketMQ 里获取消息,并且用一模一样的逻辑进行处理。只不过从 Kafka 里获取到的消息还是执行核心的逻辑进行处理,落入数据库或者其他存储。而从 RocketMQ 里获取到的消息,虽然也用同样逻辑进行处理,但不会把处理结果落入数据库或其他存储。
Consumer 消费消息时,需要统计每天从 Kafka 和 RocketMQ 读取和处理的消息量,以及记录对应的消息处理结果到某个临时存储中。这样一段时间后,就可以对比从 Kafka 和 RocketMQ 读取和处理的消息量是否一致、处理的消息结果是否一致。如果是,那么就可以进行正式切换了。
基本上对于类似中间件的迁移,都会采取这种双写双读的方案。双写一段时间后,再观察结果是否都一致。如果是,那么再进行切换。
文章转载自:东阳马生架构
评论