RocketMQ 如何保证消息可靠性
开篇
作为消息中间件, 提供消息的可靠性保证是非常重要的事情, 比如对于一个订单系统, 如果关于一个订单发送出去的消息都丢失了, 那就会出现用户下单了, 但是商品还在购物车中。
就会出现数据不一致的问题。那么消息系统是如何保证消息的可靠性的呢?我们先来看一下下面这张消息从生产到消费的过程图:
可以看到, 消息从生产到消费, 一共需要三个阶段:消息的发送, 消息的存储和消息的消费,只有保证这三个阶段都可靠,才能最终保证消息的可靠性。下面我们就从这三个方面分析 RocketMQ 是如何保证消息的可靠性的。
消息发送阶段
消息的发送阶段是指消息从生产者(Producer) 创建, 并成功发送给 Broker 端. 在介绍 RocketMQ 的消息模型时我们提到了请求-确认机制, 没错, 在生产者端就是通过这个机制来保证消息的可靠性传输. 即当生产者发送消息后, 消息队列的客户端会将消息发送给 Broker, 只有当 Broker 收到消息, 并返回确认信息, 然后客户端也接收到响应了, 才表示消息成功发送. 所以在编写发送端代码时, 要特别注意正确的处理返回值和异常的情况. 在返回值不是成功或者是抛出异常时意味着消息发送失败, 需要发送端做特别的处理. 下面根据 RokcetMQ 的三种不同的发送方式来说明如何编写保证可靠性的代码.
RocketMQ 提供了三种发送方式: 同步, 异步, 和单向(oneWay)
同步发送
同步发送的方式, 需要同步等待发送的结果, 需要注意捕捉异常, 以及根据返回的结果做自己的业务处理:
异步发送
异步发送, 不需要同步等待接口, 但是需要在回调中判断消息是否发送成功:
单向发送
单向发送方式, 只管发送, 不关心是否可以发送成功. 适用于那些对可靠性要求不高的场景, 比如日志记录.
RocketMQ 的最佳实践
在 RocketMQ 的官方文档中对于消息发送失败的处理也提供了最佳实践方式, 我摘抄到这里供大家参考:
Producer 的 send 方法本身支持内部重试,重试逻辑如下:
至多重试 2 次。
如果同步模式发送失败,则轮转到下一个 Broker,如果异步模式发送失败,则只会在当前 Broker 进行重试。这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认 10s。
如果本身向 broker 发送消息产生超时异常,就不会再重试。
以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用 send 同步方法发送失败时,则尝试将消息存储到 db,然后由后台线程定时重试,确保消息一定到达 Broker。
存储阶段
当生产者将消息发送给 Broker 后,Broker 会存储消息,通常情况下只要 Broker 正常运行,就不会出现消息丢失的问题,但是如果出现故障还是会丢失消息的。在 RocketMQ 的官网就听到如下几种影响消息可靠性的情况:
Broker 非正常关闭
Broker 异常 Crash
OS Crash
机器掉电,但是能立即恢复供电情况
机器无法开机(可能是 cpu、主板、内存等关键设备损坏)
磁盘设备损坏
1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ 在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ 在这两种情况下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。注:RocketMQ 从 3.0 版本开始支持同步双写。
所以在实际使用中我们可以通过配置 Broker 的参数来保证消息的可靠性。针对单点部署和集群部署两种不同方式, Broker 一般是通过持久化复制的方式来保证消息的可靠性. 对于单点的 Broker,需要将消息写入磁盘后在给 Producer 返回确认的响应。即将刷盘方式 flushDiskType 设置为 SYNC_FLUSH 同步刷盘。如果 Broker 是集群部署, 则至少要保证将消息发送到来个以上的节点在向 Producer 返回确认消息。
Broker 集群如何进行消息复制
考虑到我们在生产环境一般都是集群方式进行部署的, 我们重点学习下 Broker 在集群部署的情况下是如何进行消息的复制的以保证消息的可靠性.
RocketMQ 通过引入 Deldger 来保证消息的可靠性, Deldger 支持通过选举来动态切换主节点, 且在写入消息时要求至少要将消息复制到半数以上的节点之后, 才会返回成功的响应.
举个例子: 具有 3 个节点的集群部署, 当主节点宕机了,其余两个节点可以通过投票选出新的主节点继续服务,解决了主从模式的可用性的问题。用由于消息至少需要复制到两个以上的节点, 所以即使主节点宕机了,也不会丢消息,而且 Deldger 会保证选出与主节点数据一样的节点作为新的主节点,这样就保证了数据的一致性和顺序性。
Deldger 也不是完美的, 它的使用还是有一些限制的:
Deldger 的部署至少需要 3 个节点。
如果只有三个节点, 有两个宕机了, 就无法提供服务了
投票选举的过程中是无法提供服务的。
因为要求至少要复制到半数以上的节点才能返回成功的确认响应, 性能上也是不如主从模式快。
消费阶段
消息的消费阶段和生产阶段相同也是使用的请求确认机制. 即客户端从 Broker 拉取消息, 然后执行消息消费的业务逻辑, 只有当自身业务逻辑执行成功后才可以向 Broker 返回确认消息. 如果 Broker 没有收到确认消息, 下次拉取时还会收到这条消息.
这里有一个小 tips: 不知道大家是不是也使用 rocketmq-spring-boot-starter 进行消息的发送和接收, 在消费端我们的代码可能时这样的:
不知道大家是不是跟我一样有个困惑, 就是这个 onMessage() 方法明明返回的是 void 类型, 如果我的消息消费过程中不是预期的结果, 是如何通知 Broker 消息消费失败呢?
这个时候就需要去看一下这个 Starter 是如何封装的消息处理过程, 在类: DefaultRocketMQListenerContainer 可以看到这里帮我们实现了 MessageListenerConcurrently 和 MessageListenerOrderly. 这两个是 RocketMQ 提供的两个消费端监听接口(你不使用 starter 时就需要实现它们).
从上面的源码中可以看到, 当 handleMessage 抛出异常时, 会给 Broker 返回失败的响应. 所以在我们上面实现的消费端的代码就可能会出现问题, 我执行了业务逻辑, 但是没有抛出异常, 但是执行的结果不是我预期的, 就会导致 Broker 认为我成功消费了消息, 所以我对消费端做了一些改动, 只有当我业务逻辑返回的状态事 true 时才认为成功消费了消息.改后的代码如下:
所以说上来就使用人家封装好的 Starter, 真的是不利于学习人家的设计精髓, 建议小伙伴们还是看 RocketMQ 自己的 Client 来学习吧.
总结
好了今天的文章就写到这,大家不妨回去自己动手写个简单的 demo 试一下,把源码找出来自己熟悉一下,希望大家都不丢消息。
版权声明: 本文为 InfoQ 作者【废材姑娘】的原创文章。
原文链接:【http://xie.infoq.cn/article/1b53acf215b0d241d1c3cfa7a】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论