RabbitMQ 补偿机制、消息幂等性解决方案
1. 场景
先看这么几个面试题:
如何保证消息的可靠性投递?即如何确定消息是否发送成功?
如果失败如何处理(补偿机制)?
如何保证消息不被重复消费?或者说,如何保证消息消费时的幂等性?
2. 消息的可靠性投递
消息确认
消息确认包括主要生产者发送确认和消费者接收确认,因为发送消息的过程中我们是无法确认消息是否能路由等,一旦消息丢失我们就无法处理,所以需要确认消息,避免消息丢失。
2.1 生产者确认
我们知道生产者与消费者完全隔离的,不做任何配置的情况下,生产者是不知道消息是否真正到达 RabbitMQ,也就是说消息发布操作不返回任何消息给生产者。
那么怎么保证我们消息发布的可靠性投递?有以下几种常用机制。
由于之前的文章对上面都有过介绍,所以这里不一一介绍,而一般采用的方式就是发布者确认模式(生产者确认模式)。
原理:生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),由这个 id 在生产者和 RabbitMQ 之间进行消息的确认。
这里的唯一 ID 能够唯一标识消息,在消息不可达的时候触发回调时可以获取该值,进行对应的错误处理,即对应的消息补偿机制。(记住这个唯一 ID,且是全局唯一,分布式系统中可采用雪花算法等方式)
confirm 模式最大的好处在于他可以是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息决定下一步的处理。
注:这里描述的场景都是能正确路由到队列中的,也就是不考虑失败通知(ReturnCallback)的情况。
由于现在大家开发基本都是通过 Spring Boot 的方式进行开发,所以,这里直接提供其基本配置类参考,如下:
2.2 消费者确认
说了生产者如何保证消息的确认,而对于消费者来说,同样需要确认。
前面也说了目前编码都是基于 Spring 的,那么对于消费者来说,同样也有一个接收消息的配置类,如下:
对于上面接收消息的配置并没有做任何配置,当我们发送消息的时候,消费者接收消息并进行对应的逻辑处理,并且 Spring 的处理是自动 ack 的,但其实它也有配置,如下:
也就是上面的acknowledge-mode
,他有三个值,如下:
当为
NONE
的时候,即默认值,即autoAck=true
,消费者接收消息后自动确认,此时,MQ 队列中的消息会移除;当为
MANUAL
的时候,需要我们手动确认,即channel.basicAck
,如下:当为
AUTO
的时候,经测试发现和NONE
没什么不同 。
此时,有人就会说那么如果在默认情况下(自动确认),我们的业务代码抛出异常了怎么办?
Spring 的做法是会抛出异常,并且消息不会被 ack,如下面这种情况:
1/0
肯定会抛出异常,此时会一直打印日志,如下:
再去看 MQ 客户端,状态如下:
此时,可能会造成重复消费,怎么理解?
假如我的业务代码没有事务,或者在参数的传递过程中某个方法没有事务的控制,当异常业务代码之前入库了,那么这条消息实际上是没有被确认的,还在队列中,因此,当下次程序启动,则会再次消费这条消息,尽管业务代码出现了异常。
自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端业务代码抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
如果消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务代码会进行回滚,这也同样造成了实际意义的消息丢失。
3. 补偿机制
在回到前面的问题,如何确定消息是否发送成功?生产者确认机制确实能帮我们解决这个问题,但如果生产者就是接收不到 ack 这个指令怎么办,比如消费者处理时间太长或者网络超时,等等情况,导致生产者一直接收不到这个 ack ,此时怎么办?
生产者与消费者之间应该约定一个超时时间,比如 5 分钟,对于超出这个时间没有得到响应的消息,可以设置一个定时重发的补偿机制:通过消息落库 + 定时任务来实现。
怎么做?这里讲讲思路,如下:
发送消息之前,先把消息入库,我这里的表设计如下:
入库之后在发送消息;
如果在规定时间不能
ack
或者ack=false
,即confirmCallback
回调的ack=false
,则按照定时规则重新发送消息;然后对于发布成功的消息,如果业务操作完成,实际上它的作用已经发挥完成,一段时间对数据库做清理即可,根据业务的具体情况。
4. 消息幂等性
首先我们要知道什么是幂等性,比如一个转账系统,A 要转给 B 100 元,当 A 发出消息后,B 接收成功,然后给 MQ 确认的时候出现网络波动,MQ 并没有接收到 ack 确认,那 MQ 为了保证消息被消费,就会继续给消费者投递之前的消息,如果再重复投递 5 次,则 B 在处理 5 次,加上之前的一次,B 的余额增加了 600 元,很明显是不合理的。
所以幂等性简单来说就是:重复调用多次产生的业务结果与调用一次产生的业务结果相同;
为了避免相同消息的重复处理,必须要采取一定的措施。RabbitMQ 服务端是没有这种控制的,因为它不知道你是不是就要把一条消息发送两次,所以只能在消费端控制。
回到前面生产者确认模式中讲到了一个全局唯一 ID,我们可以通过他来保证消息的幂等性,如下:
消费者获取到消息后先根据这个全局唯一 ID 去查询 redis/db 是否存在该消息;
如果不存在,则正常消费,消费完毕后写入 redis/db;
如果存在,则证明消息被消费过,直接丢弃,不做处理。
版权声明: 本文为 InfoQ 作者【Ayue、】的原创文章。
原文链接:【http://xie.infoq.cn/article/abe7691508f57c91906d9a160】。文章转载请联系作者。
评论