RocketMQ 实战—消息零丢失的方案
1.全链路分析为什么用户支付完成后却没有收到红包
(1)客服反馈用户支付后没收到红包
有用户反馈,按照规则,在支付成功后应该是可以拿到一个现金红包的,但是他在支付完一个订单后,却没有收到这个现金红包,于是就反馈给了客服。
技术团队经过排查,找了系统中打印的很多日志,发现一个奇怪的现象。正常情况下,订单系统在完成支付后,会推送一条消息到 RocketMQ,然后红包系统会从 RocketMQ 中消费到这条消息去给用户发送现金红包,如下图示。

但是从订单系统和红包系统当天那个时间段的日志来看,居然只看到订单系统推送消息到 RocketMQ 的日志,却没看到红包系统从 RocketMQ 中消费消息以及发现金红包的日志。问题可能就出在这里,订单支付消息在传输过程中丢失了,导致现金红包没有派发出去。所以接下来需要分析,在使用 RocketMQ 的过程中,到底有哪些地方会导致消息丢失。
(2)订单系统推送消息到 RocketMQ 可能会丢失消息
订单系统在接收到订单支付成功的回调后,会推送一条订单支付成功的消息到 RocketMQ。在这个过程中,是有可能会出现丢失消息的情况的。因为订单系统在推送消息到 RocketMQ 时是通过网络去进行传输的,如果网络发生了抖动,就会导致网络通信失败,于是可能某条消息就没有成功投递给 RocketMQ。所以订单系统投递消息到 RocketMQ 可能会因为网络问题而导致失败。
除此之外,还有其他的原因可能会导致订单系统推送消息到 RocketMQ 失败。比如 RocketMQ 确实收到消息了,但是它网络通信模块的代码出现了异常,导致消息没成功处理。比如生产者在写消息到 RocketMQ 的过程中,刚好遇到了某个 Leader Broker 故障,其他的 Follower Broker 正在尝试切换为 Leader Broker,这个过程中也可能会有异常。
所以我们在使用任何一个 MQ 时,无论是 RocketMQ、还是 RabbitMQ、或者是 Kafka,首先都要明确一点:不一定发送消息出去就一定会成功、也有可能会失败,此时代码里可能会抛出异常、也有可能不会抛异常。
(3)消息到达 RocketMQ 后也可能会丢失消息
即使订单系统成功把消息写入了 RocketMQ,那么消息也有可能出现丢失。因为根据 RocketMQ 的底层原理可以知道:消息写入 RocketMQ 后,RocketMQ 可能仅仅是把这个消息写入到 PageCache 里,也就是 OS 管理的一个缓冲区,这本质也属于内存。
当生产者认为已经向 RocketMQ 成功写入了一条消息,但实际上该消息可能只是仅仅进入 OS 的 PageCache,还没刷入磁盘。此时如果 Broker 机器宕机,那么 OS 的 PageCache 中的数据也就丢失了。

所以根据 RocketMQ 底层原理,消息数据在进入 OS 的 PageCache 时,如果碰上机器宕机,那么内存里的数据必然会丢失。此后机器即便重启了,并启动好 Broker 进程,那么这条消息数据也没了。

(4)就算消息进入磁盘了也不是万无一失
当 Broker 把消息写入了 OS 的 PageCache,操作系统会在一段时间后把消息从内存中刷入磁盘文件里。
但是即便写入 RocketMQ 的一条消息已经进入 Broker 机器的磁盘文件里了,那么这条消息也是有可能会丢失的。因为如果 Broker 机器的磁盘出现了故障,比如磁盘坏了,那么上面存储的数据就可能会丢失。

(5)即使红包系统获取到消息也可能会丢失
即使红包系统顺利从 RocketMQ 里获取到一条消息,那么它也不一定能把现金红包发出去。要解释这种情况,需要了解消息 offset 的概念。
offset 表示的是消息的位置,代表了消息的标识。如下图示,假设有两条消息,offset 分别为 1 和 2。现在红包系统已经获取到了消息 1,然后消息 1 此时就在红包系统的内存里,正准备运行代码去派发现金红包(还没派发)。
由于默认情况下,RocketMQ 的消费者会自动提交已经消费的 offset。如果此时红包系统在还没处理完根据消息 1 派发红包的情况下,提交了消息 1 的 offset 到 Broker,标识已成功处理了消息 1。接着恰好红包系统突然重启或者宕机、或者在派发红包时更新数据库失败了。那么此时内存里的消息 1 必然会丢失,而且红包也还没发出去。

(6)用户支付完后红包没发送出去的原因汇总
原因一:订单系统推送消息到 RocketMQ 失败了,消息没有被成功发送到 RocketMQ 上
原因二:消息确实推送到 RocketMQ 了,但是结果 Broker 机器故障,把消息弄丢了
原因三:红包系统拿到了消息,但是在红包还没发完的时候把消息弄丢了
如果订单系统推送了消息,结果红包系统连消息都没收到,那么可能消息根本就没发到 RocketMQ,或者 RocketMQ 弄丢了消息。如果红包系统收到了消息,结果红包没派发,那么就是红包系统弄丢了消息。
2.RocketMQ 的事务消息机制实现发送消息零丢失
(1)解决消息丢失的第一个问题:推送消息时丢失
首先要解决的第一个问题,就是在订单系统推送消息到 RocketMQ 的过程中,可能会因为常见的网络故障等问题导致推送消息失败。

在 RocketMQ 中,有一个事务消息机制。凭借这个事务消息机制,就可以确保订单系统推送的消息一定会成功写入 RocketMQ,不会出现推送消息失败。
(2)事务消息机制之发送 half 消息试探是否正常
订单系统收到一个订单支付成功的回调后,需要在自己的订单数据库里做一些增删改操作,比如更新订单状态等。然后发一条消息到 RocketMQ,让其他关注这个订单支付成功消息的系统可以从 RocketMQ 获取消息做对应的处理。
在基于 RocketMQ 的事务消息机制中,首先会让订单系统发送一条 half 消息到 RocketMQ 中。这个 half 消息本质就是一个订单支付成功的消息,只不过该消息状态是 half 状态,此时红包系统是看不见该 half 消息的。然后订单系统会等待接收这个 half 消息写入成功的响应通知。
可以想象一下,假设订单系统在收到订单支付成功的通知后,直接进行本地的数据库操作,比如更新订单状态为已完成,然后再发送消息给 RocketMQ,结果才发现 RocketMQ 挂了报异常。这时就会导致没法通过消息通知到红包系统去派发红包,那用户一定会发现自己订单支付了,结果红包没收到。
所以订单系统在收到订单支付成功的通知后做的第一件事,不是先让订单系统做一些增删改操作,而是先发一个 half 消息给 RocketMQ 以及等待它的成功响应,也就是初步和 RocketMQ 建立联系和沟通,从而让订单系统确认 RocketMQ 还活着,RocketMQ 也会知道订单系统后续可能想发送一条很关键的不希望丢失的消息给它。

(3)事务消息机制之如果 half 消息写入失败
如果订单系统发送 half 消息给 RocketMQ 失败了,可能因为订单系统报错了、可能因为 RocketMQ 挂了、或者网络故障了等原因导致 half 消息都没发送成功,此时订单系统应该执行一系列的回滚操作,比如对订单状态做一个更新,让状态变成"关闭交易",同时通知支付系统自动进行退款。
因为订单虽然支付了,但是包括派发红包、发送优惠券之类的后续操作无法执行,所以此时需要把钱款退还给用户,表示交易失败了。
(4)事务消息机制之如果 half 消息写入成功
如果订单系统发送 half 消息给 RocketMQ 成功了,此时订单系统就应该在自己本地的数据库里执行一些增删改操作。因为一旦 half 消息写成功,就说明 RocketMQ 肯定已经收到了这条消息、RocketMQ 还活着而且目前生产者可以跟 RocketMQ 正常沟通,所以订单系统下一步应该执行自己的增删改操作。

(5)事务消息机制之如果本地事务执行失败
如果订单系统更新数据库失败了,比如数据库也出现网络异常、或者数据库挂了,那么订单系统就无法把订单更新为"已完成"这个状态。
此时应该让订单系统发送一个 rollback 请求给 RocketMQ,意思是 RocketMQ 可以把订单系统之前发过来的 half 消息删除掉。
订单系统发送 rollback 请求给 RocketMQ 删除 half 消息后,订单系统就必须执行回退流程,通知支付系统退款。当然回退流程中可能还要考虑订单系统自己的高可用降级机制:比如数据库无法更新时,订单系统需要在机器本地磁盘文件里写入订单支付失败的记录,然后订单系统会启动一个后台线程在 MySQL 数据库恢复后再把订单状态更新为"已关闭"。

(6)事务消息机制之如果本地事务执行成功
如果订单系统成功完成了本地事务的操作,比如把订单状态都更新为"已完成",那么订单系统就可以发送一个 commit 请求给 RocketMQ,要求 RocketMQ 对之前的 half 消息进行 commit 操作,让红包系统可以看见这个订单支付成功消息。
所谓的 half 消息实际就是订单支付成功的消息,只不过它的状态是 half。也就是当消息的状态是 half 状态时,红包系统是看不见该消息的,没法获取到该消息。必须等到订单系统执行 commit 请求,该 half 消息被 commit 后,红包系统才可以看到和获取该消息进行后续处理。

(7)事务消息机制之没收到 half 消息成功的响应
如果订单系统把 half 消息发送给 RocketMQ 了,RocketMQ 也把 half 消息保存下来了,但是订单系统却没能收到 RocketMQ 返回的响应,那么此时会发生什么事情?
订单系统没收到响应,可能是由于网络超时问题,也可能是由于其他的异常问题。如果订单系统没能收到 RocketMQ 返回 half 消息成功的响应,那么就会误以为发送 half 消息到 RocketMQ 失败了,从而执行退款流程,订单状态也会被标记为"已关闭"。

但此时 RocketMQ 已经存下来一条 half 消息了,那么对这条 half 消息应该怎么处理?其实 RocketMQ 里有一个补偿流程,它会扫描自己处于 half 状态的消息。如果订单系统一直没有对这个消息执行 commit/rollback 操作,超过一定时间,RocketMQ 就会回调订单系统的一个接口。
RocketMQ 会通过该接口询问订单系统:这个 half 消息到底怎么回事?到底是打算 commit 这个消息还是要 rollback 这个消息?这时订单系统的这个接口就会去查一下数据库,看看这个订单当前的状态,如果发现订单状态是“已关闭”,那么订单系统就要发送 rollback 请求给 RocketMQ 去删除那个 half 消息。

(8)事务消息机制之 rollback 或 commit 发送失败
场景一:
如果订单系统收到了 half 消息写入成功的响应,同时尝试对自己的数据库更新了。然后根据失败或者成功去执行了 rollback 或者 commit 请求,发送给 RocketMQ 了。结果因为网络故障,导致 rollback 或者 commit 请求发送失败了。
这时候因为 RocketMQ 里的消息一直是 half 状态,所以 RocketMQ 过了一定的超时时间就会发现这个 half 消息有问题,于是就会回调订单系统的接口。然后订单系统的接口就可以判断一下订单状态是否为"已完成",并决定执行 commit 请求还是 rollback 请求。
因此这个回调就是一个补偿机制:如果订单系统没收到 half 消息的响应,或者 rollback、commit 请求没发送成功,RocketMQ 都会来询问订单系统如何处理 half 消息。
场景二:
如果订单系统收到了 half 消息写入成功的响应,同时尝试对自己的数据库更新了。然后根据失败或者成功去执行了 rollback 或者 commit 请求,发送给 RocketMQ 了。但此时 RocketMQ 却挂掉了,导致 rollback 或者 commit 请求发送失败。
这时候就需要等 RocketMQ 自己重启,重启后它会扫描 half 状态的消息,然后还是通过补偿机制,回调订单系统的接口。
(9)RocketMQ 事务消息的全流程总结
情况一:如果 RocketMQ 有问题或者网络有问题,half 消息根本都发不出去。此时 half 消息肯定是失败的,那么订单系统就不会执行后续的流程。
情况二:如果 half 消息发送出去了,但是 half 消息的响应没有收到,然后执行了退款流程。那么 RocketMQ 会有补偿机制来回调订单系统询问要 commit 还是 rollback,此时订单系统选择 rollback 删除消息就可以了,不会执行后续流程。
情况三:如果订单系统收到 half 消息的响应了,结果订单系统自己更新数据库失败了。那么它也会进行回滚,不会执行后续流程。
情况四:如果订单系统收到 half 消息的响应了,然后还更新自己数据库成功了,订单状态是"已完成"。此时必然会发送 commit 请求给 RocketMQ,一旦消息 commit 了,那么必然保证红包系统可以收到这个消息。而且即使 commit 请求发送失败了,RocketMQ 也会有补偿机制,通过回调订单系统的接口来判断是否重新发送 commit 请求。
总之,只要订单系统本地事务成功了,那么必然会保证 RocketMQ 里的 half 消息被 commit,从而让红包系统看到该消息。
所以,通过 RocketMQ 的事务消息机制,可以保证订单系统一旦成功执行了数据库操作,就一定会通知到红包系统派发红包,至少订单系统到 RocketMQ 之间的消息发送不会出现消息丢失的问题。
3.RocketMQ 事务消息机制的底层实现原理
(1)half 消息是如何对消费者不可见的
前面介绍了 RocketMQ 事务消息的全流程,在这个流程中,第一步会由订单系统发送一个 half 消息给 RocketMQ。对于这个 half 消息,红包系统刚开始是看不到它的,没法消费这条消息进行处理。那么这个 half 消息是如何做到不让红包系统看到的呢?这就涉及到 RocketMQ 底层采取的一个巧妙的设计了。
假设订单系统发送了一个 half 状态的订单支付消息到 OrderPaySuccessTopic 里,然后红包系统也订阅了这个 OrderPaySuccessTopic 从里面获取消息。

根据前面 RocketMQ 的底层原理可知:向一个 Topic 写入消息,首先会定位这个 Topic 的某个 MessageQueue,然后会定位一台 Broker 机器,接着会将消息写入到这个 Broker 机器的 CommitLog 文件,同时将消息偏移量写入到该 Broker 机器上的 MessageQueue 对应的 ConsumeQueue 文件。

通过上图可知:如果要写入一条 half 消息到 OrderPaySuccessTopic 里,需要先定位到这个 Topic 的一个 MessageQueue,然后定位到 RocketMQ 的一台 Broker 机器上,接着将 half 消息写入到该 Broker 机器上的 CommitLog 文件,同时消息的 offset 会写入该到 Broker 机器上的 MessageQueue 对应的 ConsumeQueue,该 ConsumeQueue 是属于 OrderPaySuccuessTopic 的,最后红包系统才能从这个 ConsumeQueue 里获取到写入的这个 half 消息。
但实际上红包系统却没法看到这条 half 消息,原因是 RocketMQ 一旦发现生产者发送的是一个 half 消息,那么它就不会把这个 half 消息的 offset 写入 OrderPaySuccessTopic 的 ConsumeQueue 里,而是会把这条 half 消息写入到自己内部的 RMQ_SYS_TRANS_HALF_TOPIC 这个 Topic 对应的一个 ConsumeQueue 里。
所以对于事务消息机制下的 half 消息:RocketMQ 是写入内部 Topic 的 ConsumeQueue 的,不是写入生产者指定的 OrderPaySuccessTopic 的 ConsumeQueue 的。因此红包系统自然就无法从 OrderPaySuccessTopic 的 ConsumeQueue 中看到这条 half 消息了。

(2)订单系统何时会收到 half 消息成功的响应
在什么情况下订单系统会收到 half 消息成功的响应呢?简单来说,必须要 half 消息进入到 RocketMQ 内部的 RMQ_SYS_TRANS_HALF_TOPIC 的 ConsumeQueue 文件了,此时才会认为 half 消息写入成功,然后才会返回响应给订单系统。
所以一旦订单系统收到 half 消息写入成功的响应,那就代表着这个 half 消息已经在 RocketMQ 内部了。
(3)如果没有执行 rollback 或 commit 会怎样
如果因为网络故障,订单系统没收到 half 消息的响应,或者发送的 rollback/commit 请求失败了,那么 RocketMQ 会怎么处理呢?
其实 RocketMQ 会有一个定时任务,定时扫描 RMQ_SYS_TRANS_HALF_TOPIC 中的 half 消息。如果这些消息超过一定时间还是 half 消息,就会回调订单系统的接口来判断这个 half 消息是要 rollback 还是 commit。

(4)处理 rollback 请求时如何标记消息回滚
假设订单系统发送了 rollback 请求,那么 RocketMQ 就需要对消息进行回滚。RocketMQ 会删除对应的 half 消息,但并不是在磁盘文件里删除。
RocketMQ 内部有一个 OP_TOPIC,在处理 half 消息的 rollback 请求时,会向这个 Topic 写入一条 OP 记录,标记这个 half 消息为 rollback 状态。
如果订单系统一直没有执行 commit/rollback,RocketMQ 会回调订单系统的接口去判断 half 消息的状态。但是 RocketMQ 最多回调 15 次,如果 15 次之后订单系统都没法告知 half 消息的状态,就自动把 half 消息标记为 rollback 状态。

(5)处理 commit 请求时如何让消息可见
假设订单系统发送了 commit 请求,那么 RocketMQ 需要让消息可见。
RocketMQ 在处理 half 消息的 commit 请求时,首先会在 OP_TOPIC 里写入一条 OP 记录,然后标记这条 half 消息为 commit 状态,接着会把 RMQ_SYS_TRANS_HALF_TOPIC 中的 half 消息写入到 OrderPaySuccessTopic 的 ConsumeQueue 里,这样红包系统才可以看到这条消息并进行消费。

4.是否可以通过同步重试方案来代替事务消息方案来实现发送消息零丢失
(1)是否有简单方法确保消息可以到达 RocketMQ
生产者在发送消息时,可能会存在消息丢失的情况,也就是可能消息根本就没有进入到 RocketMQ 就丢了,如下图示:

如果生产者使用事务消息机制去发送消息到 RocketMQ,那么一定可以保证消息发送到 RocketMQ。但根据事务消息机制的原理,发现其流程有点复杂:需要先发送 half 消息,之后还得发送 rollback 或 commit 请求,要是中间有点什么问题,RocketMQ 还得回调生产者的接口。
那么是否真的有必要使用这么复杂的机制去确保消息到达 RocketMQ 且不会丢失呢?毕竟这么复杂的机制完全有可能导致整体性能比较差,而且吞吐量比较低。是否有更加简单的方法来确保消息一定可以到达 RocketMQ 呢?

(2)能不能基于重试机制来确保消息到达 RocketMQ
生产者发送消息给 RocketMQ 时,是可以等待 RocketMQ 返回响应给生产者的。那么在什么样的情况下,RocketMQ 会返回响应给生产者呢?事实上,只要 RocketMQ 将消息写入了自己的本地存储,就可以返回响应给生产者。

所以只要生产者在发送消息到 RocketMQ 后,同步等待 RocketMQ 返回的响应,也可以确保消息一定会到达 RocketMQ。如果期间有网络异常或者 RocketMQ 内部异常,生产者肯定会收到异常响应,比如网络错误或者请求超时等。如果生产者收到了异常响应,那么就可以认为消息发送到 RocketMQ 失败了,然后再次尝试重新发送消息,再次同步等待 RocketMQ 返回的响应。通过反复重试,最终也可以确保消息一定会到达 RocketMQ。

理论上在一些短暂的网络异常场景下,生产者是可以通过不停的重试去保证消息到达 RocketMQ 的。因为如果短时间网络异常了消息一直没法发送,只要不停重试,那么当网络恢复后,消息就可以发送到 RocketMQ。
如果反复重试多次都没法把消息投递到 RocketMQ,此时就可以直接让订单系统回滚之前的流程。比如发起退款流程,判定本次订单支付交易失败。
所以这个简单的同步发送消息 + 反复重试的方案,也可以保证消息成功投递到 RocketMQ 中。
在基于 Kafka 作为消息中间件的发送消息零丢失方案中,因为 Kafka 本身不具备 RocketMQ 这种事务消息的高级功能,所以一般都会采用同步发送消息 + 反复重试的方案,保证消息成功投递到 Kafka 中。
但是在类似这个较为复杂的订单业务场景中,仅仅采用同步发送消息 + 反复重试的方案,来确保消息成功投递到 RocketMQ 中,似乎还是不够。下面分析一下在复杂业务场景下,这种方案会有什么问题。
(3)先执行订单本地事务还是先发消息到 RocketMQ
如果订单系统先执行订单本地事务,接着再发送消息到 RocketMQ,那么伪代码如下所示:
上述代码看着天衣无缝,先执行订单本地事务,接着发送消息到 RocketMQ。如果订单本地事务执行失败了,则不会继续发送消息到 RocketMQ。如果订单事务执行成功,但发送消息失败了,则自动进行几次重试,如果重试一直失败,就回滚订单事务。
但是有一个问题:假设订单系统刚执行完成订单本地事务,结果还没等订单系统发送消息到 RocketMQ,订单系统却突然崩溃了。这就会导致订单状态可能已经修改为"已完成",但是消息却没发送到 RocketMQ,这就是这个方案最大的隐患。
如果出现这种场景,那么多次重试发送消息的代码根本没机会执行。而且订单本地事务已经执行成功了,但消息没发送出去,红包系统没机会派发红包。必然导致用户支付成功了,结果看不到自己的红包。

(4)如果把订单本地事务代码和重试发送 RocketMQ 消息的代码放到一个事务中
伪代码如下所示:
上述代码看起来解决了面临的问题,就是在这个方法上加入事务。在这个事务方法中:哪怕执行了 orderService.finishOrderPay(),但其实也只是执行一些增删改 SQL 语句,还没提交订单本地事务。
如果发送消息到 RocketMQ 失败了,而且多次重试还不行,则在抛出异常后会自动回滚订单本地事务。如果刚执行 orderService.finishOrderPay(),结果订单系统直接崩溃,此时订单本地事务也会被回滚,因为根本没提交过。
但是这个方案还是非常不理想,原因就出在多次重试的地方。如果用户支付成功了,然后支付系统回调通知订单系统,有一笔订单已经支付成功。这时订单系统卡在多次重试的代码里,可能耗时好几秒种,此时发起回调通知的支付系统早就等不及可能都超时异常了。而且把重试的代码放在这个逻辑里,会导致订单系统的这个接口性能很差。

(5)订单系统就一定可以依靠本地事务回滚吗
如果将订单事务和发送消息到 RocketMQ 包裹在一个事务代码中,依靠本地事务回滚,那么除了多次重试导致的超时异常问题外,还会有其他问题。
伪代码如下所示:
上述代码中,虽然在方法上加了事务注解,但是代码里还有更新 Redis 缓存和 Elasticsearch 数据的代码逻辑。如果已经完成了订单数据库更新、Redis 缓存更新、ES 数据更新,结果没法送 MQ 订单系统就崩溃了。虽然此时订单数据库的操作会回滚,但是 Redis、Elasticsearch 中的数据更新就不会自动回滚了。而且它们也根本没法自动回滚,此时数据还是会不一致。所以,完全寄希望于本地事务自动回滚是不现实的。
(6)保证业务系统一致性的最佳方案是使用 RocketMQ 的事务消息机制
所以分析完这个同步发送消息 + 反复重试的方案后,会发现该方案落会存在一些问题。
问题一:订单事务执行成功,但消息没发送出去
问题二:订单事务执行成功,但反复重试发送消息到 RocketMQ 极为耗时,导致调用该接口的系统超时
问题三:利用本地事务回滚,发生异常时只能自动回滚数据库部分的操作
所以真正要保证消息一定投递到 RocketMQ,同时保证业务系统之间的数据完全一致,业内最佳方案还是用基于 RocketMQ 的事务消息机制。因为这个方案落地后,就能保证订单系统的本地事务一旦成功,那么必然会投递消息到 RocketMQ。而且整个流程中,订单系统也不会进行长时间的阻塞和重试。
如果 half 消息发送失败,就直接回滚整个流程。如果 half 消息发送成功,后续的 rollback 或者 commit 发送失败了,订单系统也不需要阻塞在那里反复重试,直接让代码结束即可,因为之后 RocketMQ 会回调订单系统的接口来判断是 rollback 还是 commit。
此外,我们也可以借鉴 RocketMQ 事务消息机制的实现原理,来实现同时向多个不同业务系统写同一数据时的数据一致性。
5.使用 RocketMQ 事务消息的代码案例细节
(1)生产者发送 half 事务消息出去
(2)half 消息发送失败或没收到 half 消息的响应
如果发送 half 消息失败了,此时会在执行 Producer 的 sendMessageInTransaction()方法时,收到一个异常,表示消息发送失败了。所以可以用下面的代码去关注 half 消息发送失败的问题。
如果 Producer 一直没有收到 half 消息发送成功的响应,那么针对这个问题,可以把发送出去的 half 消息放在内存里,或者写入本地磁盘文件,然后后台开启一个线程去检查。如果一个 half 消息超过比如 10 分钟都没有收到响应,那么就自动触发回滚逻辑。
(3)half 消息发送成功时如何执行本地事务
上面代码里有一个 TransactionListener,这个类也是需要自己定义的,如下所示:
(4)没有发送 commit 或者 rollback 请求的回调
6.同步刷盘+Raft 协议同步实现发送消息零丢失
(1)使用事务消息机制就一定不会丢消息吗
RocketMQ 的事务消息机制是 RocketMQ 非常核心以及重要的一个功能,该功能可以实现在生产消息的环节不丢失数据,而且最重要的是,可以保证两个业务系统的数据一致性。但是即使在生产消息时用了事务消息机制,也未必就真的可以保证数据不丢失。
假设现在订单系统已经通过事务消息的机制,通过 half 消息 + commit 的方式,在 RocketMQ 里提交了消息。也就是对于 RocketMQ 而言,那条消息已经进入到它的存储层了,可以被红包系统看到了。

由上图可见,生产者生产的这条消息在 commit 之后,会从内部的 TRANS_HALF_TOPIC 进入生产者的 OrderPaySuccessTopic 中。但是这条消息此时仅仅是进入生产者指定的 Topic 而已,仅仅是可以被红包系统看到而已,此时红包系统可能还没来得及去获取这条消息。
然而恰好在此时,这条消息还停留在 OS 的 PageCache 中,还没进入到 ConsumeQueue 磁盘文件里,然后这台 Broker 机器突然宕机了,OS 的 PageCache 中的数据全部丢失。从而导致这条消息也会丢失,红包系统再也没机会读到这条消息了。

(2)消息进了磁盘就不会丢了吗
即使这条消息已经进入了 OrderPaySuccessTopic 的 ConsumeQueue 磁盘文件了,不只是停留在 OS 的 PageCache 里了,此时消息也未必一定不会丢失。
即使消息已经进入磁盘文件,但是这个时候红包系统还没来得及消费这条消息,然后此时这台机器的 PageCache 已经没有了这条消息,同时磁盘突然坏了。这样也一样会导致消息丢失,而且这条消息可能再也找不回来了。

(3)保证消息写入 MQ 不代表不丢失
所以需要明确一个前提:无论是通过比较简单的同步发送消息 + 反复重试的方案,还是事务消息机制的方案,哪怕已经确保消息成功写入了 RocketMQ,此时消息也未必就不会丢失。
因为即使写入 RocketMQ 成功,这条消息也大概率是还停留在 OS 的 PageCache 中,一旦 RocketMQ 机器宕机,其内存里的数据也都会丢失。甚至哪怕消息已经进入了 RocketMQ 机器的磁盘文件,一旦磁盘坏了,消息也同样可能会丢失。
如果消息丢失了,消费者还没来得及消费,那么该消息就永远没机会被消费了。
(4)异步刷盘 vs 同步刷盘
所以到底怎么去确保消息写入 RocketMQ 后,RocketMQ 自己不会丢失数据呢?解决这个问题的第一个关键点,就是将异步刷盘调整为同步刷盘。
所谓的异步刷盘指的是:消息即使成功写入了 RocketMQ,它也只是在机器的 OS PageCache 中,还没有进入磁盘里,要过一会儿等操作系统自己把 PageCache 里的数据刷入磁盘文件中。

所以在异步刷盘的模式下,消息写入的吞吐量肯定是极高的,毕竟消息只要进入 OS 的 PageCache 这个内存就可以了。写消息的性能就是写内存的性能,但是这个情况下可能就会有数据丢失的风险。
因此如果一定要确保数据零丢失,可以调整 RocketMQ 的刷盘策略为同步刷盘。需要调整 Broker 的配置文件,将 flushDiskType 参数的值设置为 SYNC_FLUSH,flushDiskType 参数的默认值是 ASYNC_FLUSH,即异步刷盘。
如果调整为同步刷盘后,写入 RocketMQ 的每条消息,只要 RocketMQ 返回写入成功,那么消息就一定是已进入磁盘文件。比如发送 half 消息时,只要 RocketMQ 返回响应就是 half 消息发送成功了,那么就说明消息已经进入磁盘文件。
所以如果使用同步刷盘的策略,是可以确保写入 RocketMQ 的消息一定是已经进入磁盘文件的。
(5)通过主从架构避免磁盘故障导致数据丢失
如何避免磁盘故障导致的数据丢失?其实解决方法很简单,就是对 Broker 使用主从架构的模式。
也就一个 Master Broker 必须要有一个 Slave Broker 去同步它的数据。而且 Master Broker 中的一条消息写入成功,Slave Broker 也必须是写入成功,保证数据有多个副本的冗余。
这样一来,一条消息只要写入成功了,那么主从两个 Broker 上就会都有这条数据。此时如果 Master Broker 的磁盘坏了,但是 Slave Broker 上至少还是有数据的,数据不会因为磁盘故障而丢失。
Broker 的主从同步架构,一般是基于 DLedger 技术和 Raft 协议实现的。所以如果采用了基于 DLedger 技术和 Raft 协议的主从同步架构,那么对于所有的消息写入,只要写入成功,就一定会通过 Raft 协议同步给其他的 Broker 机器。

(6)RocketMQ 确保数据零丢失的方案总结
根据以上分析可知:只要把 Broker 的刷盘策略调整为同步刷盘,那么就绝对不会因为机器宕机而丢失数据。只要采用了基于 DLedger 技术和 Raft 协议的主从架构的 Broker 集群,那么一条消息写入成功,就意味着多个 Broker 机器都写入了,此时任何一台机器的磁盘故障,数据也是不会丢失的。
这样,只要 Broker 层面保证写入的数据不丢失,后续就一定可以让消费者消费到这条消息。
7.手动提交 offset+自动故障转移的消费消息零丢失
(1)红包系统拿到了消息就一定会派发红包吗
根据前面介绍,现在已经知道:如何确保订单系统发送出去的消息一定会到达 RocketMQ,如何确保到达 RocketMQ 的消息一定不会丢失。如下图示:

只要能做到上图红色标记的几点:对 half 消息 rollback 或 commit + OS 的 PageCache 同步刷盘 + Broker 主从数据同步,那么必然可以保证红包系统可以获取到一条订单支付成功的消息,然后一定可以去尝试把红包派发出去。
但现在的问题在于:即使红包系统拿到了消息,也未必一定可以成功派发红包。因为如果红包系统已经拿到了一条消息,但消息目前还在它的内存里,还没执行派发红包的逻辑。此时它就直接提交了这条消息的 offset 到 Broker 上,告知 Broker 自己已经处理过该消息了。接着红包系统此时才突然崩溃,内存里的消息于是就没了,红包也没派发出去。但 Broker 已经收到它提交的消息 offset 了,认为它已经处理完这条消息了。等红包系统重启时,就不会再次消费到这条消息了。
因此需要明确的就是:即使可以保证发送消息到 RocketMQ 时绝对不会丢失,而且 RocketMQ 收到消息后一定不会发生丢失,但是红包系统在获取到消息后还是可能会发生丢失。

(2)Kafka 消费者的数据丢失问题
RocketMQ 中的消费者数据丢失问题,也完全可以套用到 Kafka 中。Kafka 的消费者采用的消费的方式跟 RocketMQ 是有些不一样的,如果按照 Kafka 的消费模式,就是会存在数据丢失的风险。
也就是说 Kafka 消费者可能会出现上图说的,拿到一批消息,还没来得及处理,结果就提交 offset 到 Broker 去了。之后消费者系统就宕机了,这批消息就再也没机会处理了,因为它重启之后已经不会再次获取提交过 offset 的消息了。
(3)RocketMQ 消费者的与众不同的地方
对于 RocketMQ 的消费者而言,它有一些与众不同的地方,至少跟 Kafka 的消费者是有较大不同的。
先来看 RocketMQ 消费者的代码,如下所示:
再来看上述代码中的一小块代码:
可以看见,RocketMQ 的消费者会注册一个监听器,这个监听器名为 MessageListenerConcurrently。
情况一:当消费者获取到一批消息后,就会回调这个监听器函数,让它来处理这一批消息。然后处理完毕后,才会向 RocketMQ 返回 CONSUME_SUCCESS 表示消费成功。也就是告诉 RocketMQ,这批消息已经处理完毕了。
所以对于 RocketMQ 而言,红包系统首先会在这个监听器的函数中处理一批消息,然后返回消费成功的状态,接着才会去提交这批消息的 offset 到 Broker,此时即使消费者系统崩溃了,消息也不会丢失,因为都已经消费完了。

情况二:如果红包系统获取到一批消息后,还没处理完。也就是还没返回 CONSUME_SUCCESS 这个状态,还没提交这批消息的 offset 给 Broker。此时红包系统突然挂了,会怎么样?

在这种情况下,由于消费者还没有提交这批消息的 offset 给 Broker,所以 Broker 是不会认为消费者已经处理完这批消息的。
此时红包系统的一台机器宕机,Broker 会感知到红包系统的一台机器作为一个 Consumer 挂了。接着 Broker 就会把宕机机器没处理完的那批消息交给红包系统的其他机器去进行处理。因此在这种情况下,消息也是不会丢失的。

(4)需要注意的地方是不能异步消费消息
所以在默认的消费模式下:必须是处理完一批消息了,才能返回 CONSUME_SUCCESS 状态,标识消息处理结束,接着才能提交 offset 到 Broker。
在这种情况下,是不会丢失消息的。即使一个 Consumer 宕机,Broker 也会把没处理完的消息交给其他 Consumer 处理。
但是需要注意的一点就是:不能在代码中对消息进行异步处理。如下便是错误的示范,开启了一个子线程去处理这批消息,然后启动线程后就直接返回 CONSUME_SUCCESS 状态了。
如果使用上述这种方式来处理消息的话,那么可能就会出现开启的子线程还没处理完消息,消费者已经返回 CONSUME_SUCCESS 状态了。于是就可能提交这批消息的 offset 给 Broker 了,让 Broker 认为消费者已经处理结束了。如果此时红包系统突然宕机,必然会导致消息丢失。
因此如果要保证消费过程中不丢消息,那么就需要在回调函数里同步处理消息,处理完再让消费者返回 CONSUME_SUCCESS 状态表明消息已处理完毕。
如果同步处理消息比较耗时,可能会造成消息积压,导致 RocketMQ 的消费性能大幅下降(消息积压导致读内存变为读磁盘)。那么可以先将消息存入数据库,后续异步处理消息成功后,再手动提交消息的 offset 给 Broker。
8.基于 RocketMQ 全链路的消息零丢失方案总结
(1)对全链路消息零丢失方案进行总结
一.发送消息到 RocketMQ 的零丢失
方案一:同步发送消息 + 反复重试
方案二:事务消息机制
两者都有保证消息发送零丢失的效果,但是经过分析,事务消息方案整体会更好一些。
二.RocketMQ 收到消息之后的零丢失
开启同步刷盘策略 + 主从架构同步机制
只要让一个 Broker 收到消息后同步写入磁盘,同时同步复制给其他 Broker,然后再返回响应给生产者表示写入成功,那么就可以保证 RocketMQ 自己不会丢失消息。
三.消费消息的零丢失
RocketMQ 的消费者可以保证处理完消息后,才会提交消息的 offset 给 Broker,所以只要注意避免采用多线程异步处理消息时提前提交 offset 即可。
如果想要保证在一条消息基于 RocketMQ 流转时绝对不会丢失,那么可以采取上述一整套方案。
(2)消息零丢失方案的优势与劣势
优势就是:
可以让系统的数据都是正确的,不会丢失数据。
劣势就是:
会让消息在流转链路中的性能大幅度下降,让消息生产和消费的吞吐量大幅度下降。
(3)消息零丢失方案会导致吞吐量大幅度下降
一.在发送消息到 RocketMQ 的环节中,如果生产者仅仅只是简单的把消息发送到 RocketMQ。那么不过就是一次普通的网络请求罢了,生产者发送请求到 RocketMQ 然后接收返回的响应。这个性能自然很高,吞吐量也是很高的。如下图示:

二.如果生产者改成了基于事务消息的机制之后,那么此时实现原理如下图示,会涉及到 half 消息、commit or rollback、写入内部 Topic、回调机制等诸多复杂的环节。生产者光是成功发送一条消息,至少要 half + commit 两次请求。

所以当生产者一旦上了如此复杂的方案之后,势必会导致生产者发送消息的性能大幅度下降,从而导致发送消息到 RocketMQ 的吞吐量大幅度下降。
三.当 Broker 收到消息后,一样会让性能大幅度下降。首先 RocketMQ 的一台 Broker 机器收到消息后,会直接把消息刷入磁盘,这个性能就远远低于直接写入 OS PageCache 的性能。写入 OS 的 PageCache 相当于是写内存,可能仅仅需要 0.1ms,但是写入磁盘文件可能就需要 10ms。

四.接着这台 Broker 还需要把消息复制给其他 Broker 完成多副本的冗余。这个过程涉及到两台 Broker 机器之间的网络通信 + 另外一台 Broker 机器需要写数据到自己本地磁盘,同样会比较慢。

在 Broker 完成了上述两个步骤后,接着才能返回响应告诉生产者这次消息写入已经成功。
由此可见,写入一条消息需要强制同步刷磁盘,而且还需要同步复制消息给其他 Broker 机器。这两个步骤可能就让原本只要 10ms 完成的变成 100ms 完成了。所以也势必会导致性能和吞吐量大幅下降。
五.当消费者拿到消息之后,比如开启一个子线程去处理这批消息,然后就直接返回 CONSUME_SUCCESS 状态,接着就可以去处理下一批消息了。如果这样的话,该消费者消费消息的速度会很快,吞吐量也会很高。
但为了保证数据不丢失,消费者必须在处理完一批消息后再返回 CONSUME_SUCCESS 状态。那么消费者处理消息的速度就会降低,吞吐量自然会下降。

(4)消息零丢失方案到底适合什么场景
所以如果系统一定要使用消息零丢失方案,那么必然导致从头到尾的性能下降以及吞吐量下降,因此一般不要轻易在一个业务里使用如此重的一套方案。
一般来说,与金钱、交易以及核心数据相关的系统和核心链路,可以使用这套消息零丢失方案。
比如支付系统,它是绝对不能丢失任何一条消息的,性能可以低一些,但是不能有任何一笔支付记录丢失。比如订单系统,公司一般是不能轻易丢失一个订单的,毕竟一个订单就对应一笔交易。如果订单丢失,用户还支付成功了,轻则要给用户赔付损失,重则弄不好要经受官司。特别是一些 B2B 领域的电商,一笔线上交易可能多大几万几十万。
所以,对于这种非常核心的场景和少数核心链路的系统,才会建议使用这套复杂的消息零丢失方案。
而对于其他大部分非核心的场景和系统,其实即使丢失一些数据,也不会导致太大的问题。此时可以不采取这套方案,或者可以在某些地方做一些简化。
比如可以把事务消息方案退化成同步发送消息 + 反复重试的方案。如果发送消息失败,就重试几次,但是大部分时候可能不需要重试,那么也不会轻易的丢失消息的。最多在该方案里,可能会出现一些数据不一致的问题,因为生产者可能在发送消息前宕机导致没法重试但本地事务已执行。
比如可以把 Broker 的刷盘策略改为异步刷盘 + 但使用一套主从架构。这样即使一台机器挂了,OS PageCache 里的数据丢失了,其他机器上还有数据。不过大部分时候 Broker 不会随便宕机,那么异步刷盘策略下性能还是很高的。
所以,对于非核心的链路,非金钱交易的链路,可以适当简化这套方案,用一些方法避免数据轻易丢失。同时整体性能也很高,即使有极个别的数据丢失,对非核心的场景,也不会有太大的影响。
文章转载自:东阳马生架构
评论