深扒 RocketMQ 源码之后,我找出了 RocketMQ 消息重复消费的 7 种原因
在众多关于 MQ 的面试八股文中有这么一道题,“如何保证 MQ 消息消费的幂等性”。
为什么需要保证幂等性呢?是因为消息会重复消费。
为什么消息会重复消费?
明明已经消费了,为什么消息会被再次被消费呢?
不同的 MQ 产生的原因可能不一样
本文就以 RocketMQ 为例,来扒一扒 RocketMQ 中会导致消息重复消息的原因,最终你会发现,其实消息重复消费算是 RocketMQ 无奈的“bug”。
消息发送异常时重复发送
首先,我们来瞅瞅 RocketMQ 发送消息和消费消息的基本原理。
如图,简单说一下上图中的概念:
Broker,就是 RocketMQ 的服务端,如上图就有两个服务实例
Topic 就是一类消息集合的名字
Queue 就是 Topic 的对应的队列,消息都存在 Queue 上,每个 Topic 都会有自己的几个 Queue
所以,整个消息发送和消费过程大致如下:
生产者在发送消息之前根据负载均衡策略(默认是轮询)选择一个 Queue,然后跟这个 Queue 所在的机器建立连接,把消息发送到这个 Queue 上
消费者只要消费这个 Queue,那么就能消费到消息
在正常情况下,生产者的确是按照这个方式来发送消息的
但是当出现了异常时,这种异常包括消息发送超时、响应超时等等,RocketMQ 为了保证消息成功发送,会进行消息发送的重试操作,默认情况下会最多会重试两次
重试操作比较简单,就是选择另一台机器的 Queue 来发送。
虽然重试操作可以很大程度保证消息能够发送成功,但是同时也会带来消息重复发送的问题。
举个例子,假设生产者向 A 机器发送消息,发生了异常,响应超时了,但是就一定代表消息没发成功么?
不一定,有可能会出现服务端的确接收到并处理了消息,但是由于网络波动等等,导致生产者接收不到服务端响应的情况,此时消息处理成功了,但是生成者还是以为发生了异常
此时如果发生重试操作,那么势必会导致消息被发送了两次甚至更多次,导致服务端存了多条相同的消息,那么就一定会导致消费者重复消费消息。
消费消息抛出异常
在 RocketMQ 的并发消费消息的模式下,需要用户实现 MessageListenerConcurrently 接口来处理消息
当消费者获取到消息之后会调用 MessageListenerConcurrently 的实现,传入需要消费的消息集合 msgs,这里提到的 msgs 很重要
如上代码,当消息消费出现异常的时候,status 就会为 null,后面就会将 status 设置成为 RECONSUME_LATER。
RECONSUME_LATER 翻译成功中文就是稍后重新消费的意思
所以从这可以看出,一旦抛出异常,那么消息之后就可以被重复消息。
到这其实可能有小伙伴觉得消息消费失败重新消费很正常,保证消息尽可能消费成功。
对,这句话不错,的确可以在一定程度上保证消费异常的消息可以消费成功。
但是坑不在这,而是前面提到的消费时传入的整个集合中的消息都需要被重新消费。
具体的原因我们接着往下看
当消息处理之后,不论是成功还是异常,都需要对结果进行处理,代码如下
当处理结果为 RECONSUME_LATER 的时候(异常会设置为 RECONSUME_LATER),此时 ackIndex 会设置成-1,后面循环遍历的时候就会遍历到所有这次消费的消息,然后调用 sendMessageBack 方法,sendMessageBack 方式是用来实现消息重新消费的逻辑,这里就不展开说了。
所以,一旦被消费的一批消息中出现一个消费异常的情况,那么就会导致整批消息被重新消费,从而会导致在出现异常之前的成功处理的消息都会被重复消费,非常坑。
不过好在消费时传入的消息集合中的消息数量是可以设置的,并且默认就是 1
也就说默认情况下那个集合中就一条消息,所以默认情况下不会出现消费成功的消息被重复消费的情况。
所以这个参数不要轻易设置,一旦设置大了,就可能导致消息被重新消费。
除了并发消费消息的模式以外,RocketMQ 还支持顺序消费消息的模式,也会造成重复消费,逻辑其实差不多,但是在实现消息重新消费的逻辑不一样。
对 RocketMQ 不熟悉的小伙伴推荐移步:RocketMQ进阶教程
消费者提交 offset 失败
首先来讲一讲什么是 offset。
前面说过,消息在发送的时候需要指定发送到,消息最后会被放到 Queue 中,其实真正的消息不是在 Queue 中,Queue 存的是每个消息的位置,但是你可以理解为 Queue 存的是消息。
而消息在 Queue 中是有序号的,这个序号就被称为 offset,从 0 开始,单调递增 1。
比如说,如上图,消息 1 的 offset 就是 0,消息 2 的 offset 就是 1,依次类推。
这个 offset 的一个作用就是用来管理消费者的消费进度。
当消费者在成功消费消息之后,需要将所消费的消息的 offset 提交给 RocketMQ 服务端,告诉 RocketMQ,这个 Queue 的消息我已经消费到了这个位置了。
提交 offset 的代码就在上述第二节提到的处理结果的后面
这样有一个好处,那么一旦消费者重启了或者其它啥的要从这个 Queue 拉取消息的时候,此时他只需要问问 RocketMQ 服务端上次这个 Queue 消息消费到哪个位置了,之后消费者只需要从这个位置开始消费消息就行了,这样就解决了接着消费的问题。
但是 RocketMQ 在设计的时候,当消费完消息的时候并不是同步告诉 RocketMQ 服务端 offset,而是定时发送。
如图,当消费者消费完消息的时候,会将 offset 保存到内存中的一个 Map 数据结构中,所以上面截图的那段代码其实是更新内存中的 offset
而在消费者启动的时候会开启一个定时任务,默认是 5s 一次,会通过网络请求将内存中的每个 Queue 的消费进度 offset 发送给 RocketMQ 服务端。
由于是定时任务,所以就可能出现服务器一旦宕机,导致最新消费的 offset 没有成功告诉 RocketMQ 服务端的情况
此时,消费进度 offset 就丢了,那么消费者重启的时候只能从 RocketMQ 中获取到上一次提交的 offset,从这里开始消费,而不是最新的 offset,出现明明消费到了第 8 个消息,RocketMQ 却告诉他只消费到了第 5 个消息的情况,此时必然会导致消息又出现重复消费的情况。
服务端持久化 offset 失败
上一节说到,消费者会有一个每隔 5s 钟的定时任务将每个队列的消费进度 offset 提交到 RocketMQ 服务端
当 RocketMQ 服务端接收到提交请求之后,会将这个消费进度 offset 保存到内存中
同时为了保证 RocketMQ 服务端重启消费进度不会丢失,也会开启一个定时任务,默认也是 5s 一次,将内存中的消费进度持久化到磁盘文件中
所以,整个消费进度 offset 的数据流转过程如下
当 RocketMQ 服务端重启之后,会从磁盘中读取文件的数据加载到内存中。
跟消费者产生的问题一样,一旦 RocketMQ 发生宕机,那么 offset 就有可能丢失 5s 钟的数据,RocketMQ 服务端一旦重启,消费者从 RocketMQ 服务端获取到的消息消费进度就比实际消费的进度低,同样也会导致消息重复消费。
主从同步 offset 失败
在 RocketMQ 的高可用模式中,有一种名叫主从同步的模式,当主节点挂了之后,从节点可以手动升级为主节点对外提供访问,保证高可用。
在主从同步模式下,从节点默认每隔 10s 会向主节点发送请求,同步一些元数据,这些元数据就包括消费进度
当从节点获取到主节点的消费进度之后,会将主节点的消费进度设置到自己的内存中,同时也会持久化到磁盘。
所以整个消费进度 offset 的数据的流转过程就会变成如下
同样,由于也是定时任务,那么一旦主节点挂了,从节点就会丢 10s 钟的消费进度,此时如果从节点升级为主节点对外提供访问,就会出现跟上面提到的一样的情况,消费者从这个新的主节点中拿到的消费进度比实际的低,自然而然就会重复消费消息。
所以,总的来说,在消费进度数据流转的过程中,只要某个环节出现了问题,都有很有可能会导致消息重复消费。
重平衡
先来讲一讲什么是重平衡,其实重平衡很好理解,我说一下你就明白了。
前面说到,消费者是从队列中获取消息的
在 RocketMQ 中,有个消费者组的概念,一个消费者组中可以有多个消费者,不同消费者组之间消费消息是互不干扰的,所以前面提到的消费者其实都在消费组下
在同一个消费者组中,消息消费有两种模式:
集群消费模式
广播消费模式
由于 RocketMQ 默认是集群消费模式,并且绝大多数业务场景都是使用集群消费模式,所以这里就不讨论广播消费模式了,感兴趣的同学可以看看 RocketMQ 消息短暂而又精彩的一生 这篇文章。
集群消费模式是指同一条消息只能被这个消费者组消费一次,这就叫集群消费。
并且前面提到提交消费进度给 RocketMQ 服务端的情况只会集群消费模式下才会有,在广播消费模式不会提给到 RocketMQ 服务端,仅仅持久化到本地磁盘
同时前面说的消费者提交消费进度真正提交的是消费者组对于这个 Queue 的消费进度,而不是指具体的某个消费者对于 Queue 消费进度。
虽然说这里将前面提到的一些含义更深一步,但是并不妨碍前面的理解。
集群消费的实现就是将队列按照一定的算法分配给消费者,默认是按照平均分配的。
如图所示,假设某个 topic 有 4 个 Queue,有个消费者组订阅了这个 topic,这个消费者组有两个消费者 1 和消费者 2,此时每个消费者就可以被分配两个队列,这样就能保证消息正常情况下只会被消费一次。如果只有一个消费者,那么这个消费者就会消费所有队列,很好理解。
接着后面又启动了一个消费者 3,此时为了保证刚上线的消费者 3 能够消费消息,就要进行重平衡操作,重新分配每个消费者消费的队列。
在重平衡之后就可能会出现下面这种情况
如上图,原本被消费者 2 消费的 Queue4 被分配给消费者 3,此时消费者 3 就能消费到消息了,这就是重平衡。
除了新增消费者会导致重平衡之外,消费者数量减少,队列的数量增加或者减少都会触发重平衡。
在了解了重平衡概念之后,接下来分析一下为什么重平衡会导致消息的重复消费。
假设在进行重平衡时,还未重平衡完之前,消费者 2 此时还是会按照上面第二节提到的消费消息的逻辑来消费 Queue4 的消息
当消费者 2 已经重平衡完成了,发现 Queue4 自己已经不能消费了,那么此时就会把这个 Queue4 设置为 dropped,就是丢弃的意思
但是由于重平衡进行时消费者 2 仍然在消费 Queue4 的消息,但是当消费完之后,发现队列被设置成 dropped,那么此时被消费者 2 消费消息的 offset 就不会被提交,原因如下代码
这段代码前面已经出现过,一旦 dropped 被设置成 true,这个 if 条件就通不过,消费进度就不会被提交。
成功消费消息了,但是却不提交消费进度,这就非常坑了。。
于是当消费者 3 开始消费 Queue4 的消息的时候,他就会问问 RocketMQ 服务端,我消费者 3 所在的消费者组对于 Queue4 这个队列消费到哪了,我接着消费就行了。
此时由于没有提交消费进度,RocketMQ 服务端告诉消费者 3 的消费进度就会比实际的低,这就造成了消息重复消费的情况。
清理长时间消费的消息
在 RocketMQ 中有这么一个机制,会定时清理长时间正在消费的消息。
如图,假设有 5 条消息现在正在被消费者处理,这 5 条消息会被存在一个集合中,并且是按照 offset 的大小排序,消息 1 的 offset 最小,消息 5 的 offset 最大。
RocketMQ 消费者启动时会开启一个默认 15 分钟执行一次的定时任务
这个定时任务会去检查正在处理的消息的第一条消息,也就是图中的消息 1,一旦发现消息 1 已经处理了超过 15 分钟了,那么此时就会将消息 1 从集合中移除,之后会隔一定时间再次消费消息 1。
这也会有坑,虽然消息 1 从集合中被移除了,但是消息 1 并没有消失,仍然被消费者继续处理,但是消息 1 隔一定时间就会再次被消费,就会出现消息 1 被重复消费的情况。
这就是清理长时间消费的消息导致重复消费的原因。
但此时又会引出一个新的疑问,为什么要移除这个处理超过 15 分钟的消息呢?
这就又跟前面提到的消费进度提交有关!
前面说过消息被消费完成之后会提交消费进度,提交的消费进度实际会有两种情况:
第一种就是某个线程消费了所有的消息,当把所有的消息都消费完成之后,就会把消息从集合中全部移除,此时提交的消费进度 offset 就是图中消息 5 的 offset+1
加 1 的操作是为了保证如果发生重启,那么消费者下次消费的起始位置就是消息 5 后面的消息,保证消息 5 不被重复消费
第二种情况就不太一样了
假设现在有两个线程来处理这 5 条消息,线程 1 处理前 2 条,线程 2 处理后 3 条,如图
现在线程 1 出现了长时间处理消息的情况。
此时线程 2 处理完消息之后,移除后面三条消息,准备提交 offset 的时候发现集合中还有元素,就是线程 1 正在处理的前两条消息,此时线程 2 提交的 offset 并不是消息 5 对应的 offset,而是消息 1 的 offset,代码如下
这么做的主要原因就是保证消息 1 和消息 2 至少被消费一次。
因为一旦提交了消息 5 对应的 offset,如果消费者重启了,下次消费就会接着从消息 5 的后面开始消费,而对于消息 1 和消息 2 来说,并不知道有没有被消费成功,就有可能出现消息丢失的情况。
所以,一旦集合中最前面的消息长时间处理,那么就会导致后面被消费的消息进度无法提交,那么重启之后就会导致大量消息被重复消费。
为了解决这个问题,RocketMQ 引入了定时清理的机制,定时清理长时间消费的消息,这样消费进度就可以提交了。
最后
总得来说,RocketMQ 中还是存在很多种导致消息重读消费的情况,并且官方也说了,只是在大多数情况下消息不会重复
所以如果你的业务场景中需要保证消息不能重复消费,那么就需要根据业务场景合理的设计幂等技术方案。
原文:https://mp.weixin.qq.com/s/XtIZbObkDcDzcwttSDslZg
作者:三友
如果感觉本文对你有帮助,点赞关注支持一下,想要了解更多 Java 后端,大数据,算法领域最新资讯可以关注我公众号【架构师老毕】私信 666 还可获取更多 Java 后端,大数据,算法 PDF+大厂最新面试题整理+视频精讲
评论