写点什么

RocketMQ 如何保证消息可靠性

用户头像
废材姑娘
关注
发布于: 2021 年 01 月 10 日
RocketMQ如何保证消息可靠性

开篇

作为消息中间件, 提供消息的可靠性保证是非常重要的事情, 比如对于一个订单系统, 如果关于一个订单发送出去的消息都丢失了, 那就会出现用户下单了, 但是商品还在购物车中。

就会出现数据不一致的问题。那么消息系统是如何保证消息的可靠性的呢?我们先来看一下下面这张消息从生产到消费的过程图:


Screen Shot 2021-01-08 at 8.30.18 AM.png


可以看到, 消息从生产到消费, 一共需要三个阶段:消息的发送, 消息的存储和消息的消费,只有保证这三个阶段都可靠,才能最终保证消息的可靠性。下面我们就从这三个方面分析 RocketMQ 是如何保证消息的可靠性的。

 

消息发送阶段

消息的发送阶段是指消息从生产者(Producer) 创建, 并成功发送给 Broker 端. 在介绍 RocketMQ 的消息模型时我们提到了请求-确认机制, 没错, 在生产者端就是通过这个机制来保证消息的可靠性传输. 即当生产者发送消息后,  消息队列的客户端会将消息发送给 Broker,  只有当 Broker 收到消息, 并返回确认信息, 然后客户端也接收到响应了, 才表示消息成功发送.  所以在编写发送端代码时, 要特别注意正确的处理返回值和异常的情况. 在返回值不是成功或者是抛出异常时意味着消息发送失败, 需要发送端做特别的处理.  下面根据 RokcetMQ 的三种不同的发送方式来说明如何编写保证可靠性的代码.

RocketMQ 提供了三种发送方式:  同步, 异步, 和单向(oneWay)

同步发送

同步发送的方式, 需要同步等待发送的结果,  需要注意捕捉异常, 以及根据返回的结果做自己的业务处理: 

        try {            Boolean success = extRocketMQTemplate.sendAndReceive(userTopic + ":reply", message, Boolean.class);            log.info("消息发送成功");        } catch (Exception e) {            // 保证消息可靠性, 需要catch 异常            log.info("消息发送失败");        }java
复制代码

异步发送

异步发送, 不需要同步等待接口, 但是需要在回调中判断消息是否发送成功: 

    extRocketMQTemplate.asyncSend(userTopic+":reliability",  message, sendCallbackFunc());    // 异步发送时, 要使用回调来判断消息是否发送成功    private SendCallback sendCallbackFunc() {        return new SendCallback() {            @Override            public void onSuccess(SendResult var1) {                log.info("async onSucess SendResult= {}", var1);            }            @Override            public void onException(Throwable var1) {                // 如果发送失败, 则可以根据业务的需要进行重试, 或记入数据库等操作                log.error("async onException Throwable={}", var1);            }        };    }
复制代码

单向发送

单向发送方式, 只管发送, 不关心是否可以发送成功. 适用于那些对可靠性要求不高的场景, 比如日志记录.

RocketMQ 的最佳实践

在 RocketMQ 的官方文档中对于消息发送失败的处理也提供了最佳实践方式, 我摘抄到这里供大家参考:

Producer 的 send 方法本身支持内部重试,重试逻辑如下:

  • 至多重试 2 次。

  • 如果同步模式发送失败,则轮转到下一个 Broker,如果异步模式发送失败,则只会在当前 Broker 进行重试。这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认 10s。

  • 如果本身向 broker 发送消息产生超时异常,就不会再重试。

以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用 send 同步方法发送失败时,则尝试将消息存储到 db,然后由后台线程定时重试,确保消息一定到达 Broker。

存储阶段

当生产者将消息发送给 Broker 后,Broker 会存储消息,通常情况下只要 Broker 正常运行,就不会出现消息丢失的问题,但是如果出现故障还是会丢失消息的。在 RocketMQ 的官网就听到如下几种影响消息可靠性的情况:

  1. Broker 非正常关闭

  2. Broker 异常 Crash

  3. OS Crash

  4. 机器掉电,但是能立即恢复供电情况

  5. 机器无法开机(可能是 cpu、主板、内存等关键设备损坏)

  6. 磁盘设备损坏

1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ 在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。


5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ 在这两种情况下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。注:RocketMQ 从 3.0 版本开始支持同步双写。


所以在实际使用中我们可以通过配置 Broker 的参数来保证消息的可靠性。针对单点部署和集群部署两种不同方式, Broker 一般是通过持久化复制的方式来保证消息的可靠性. 对于单点的 Broker,需要将消息写入磁盘后在给 Producer 返回确认的响应。即将刷盘方式 flushDiskType 设置为 SYNC_FLUSH 同步刷盘。如果 Broker 是集群部署, 则至少要保证将消息发送到来个以上的节点在向 Producer 返回确认消息。

Broker 集群如何进行消息复制

考虑到我们在生产环境一般都是集群方式进行部署的, 我们重点学习下 Broker 在集群部署的情况下是如何进行消息的复制的以保证消息的可靠性.

RocketMQ 通过引入 Deldger 来保证消息的可靠性, Deldger 支持通过选举来动态切换主节点, 且在写入消息时要求至少要将消息复制到半数以上的节点之后, 才会返回成功的响应.

举个例子: 具有 3 个节点的集群部署, 当主节点宕机了,其余两个节点可以通过投票选出新的主节点继续服务,解决了主从模式的可用性的问题。用由于消息至少需要复制到两个以上的节点, 所以即使主节点宕机了,也不会丢消息,而且 Deldger 会保证选出与主节点数据一样的节点作为新的主节点,这样就保证了数据的一致性和顺序性。

Deldger 也不是完美的, 它的使用还是有一些限制的:

  • Deldger 的部署至少需要 3 个节点。

  • 如果只有三个节点, 有两个宕机了, 就无法提供服务了

  • 投票选举的过程中是无法提供服务的。

  • 因为要求至少要复制到半数以上的节点才能返回成功的确认响应, 性能上也是不如主从模式快。

消费阶段

消息的消费阶段和生产阶段相同也是使用的请求确认机制. 即客户端从 Broker 拉取消息,  然后执行消息消费的业务逻辑,  只有当自身业务逻辑执行成功后才可以向 Broker 返回确认消息. 如果 Broker 没有收到确认消息, 下次拉取时还会收到这条消息.

这里有一个小 tips:  不知道大家是不是也使用 rocketmq-spring-boot-starter 进行消息的发送和接收,  在消费端我们的代码可能时这样的: 

@Service@RocketMQMessageListener(nameServer = "${rocketmq.name-server}",        topic = "${demo.rocketmq.topic}",        consumerGroup = "user_consumer_user",        selectorExpression = "user")public class SysUserInfoConsumer implements RocketMQListener<SyncUserRequest> {    @Resource    private ISysUserInfoService sysUserInfoService;    @Override    public void onMessage(SyncUserRequest request) {        // TODO 这里的返回值是void, 消费者消费是否成功如何通知的Borker        sysUserInfoService.saveOrUpdateUserInfo(request);    }}
复制代码

​不知道大家是不是跟我一样有个困惑, 就是这个 onMessage() 方法明明返回的是 void 类型,  如果我的消息消费过程中不是预期的结果, 是如何通知 Broker 消息消费失败呢?

这个时候就需要去看一下这个 Starter 是如何封装的消息处理过程, 在类: DefaultRocketMQListenerContainer 可以看到这里帮我们实现了 MessageListenerConcurrently 和 MessageListenerOrderly. 这两个是 RocketMQ 提供的两个消费端监听接口(你不使用 starter 时就需要实现它们). 


    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {        @SuppressWarnings("unchecked")        @Override        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {            for (MessageExt messageExt : msgs) {                log.debug("received msg: {}", messageExt);                try {                    long now = System.currentTimeMillis();                    // 实际去处理消息的地方                    handleMessage(messageExt);                    long costTime = System.currentTimeMillis() - now;                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);                } catch (Exception e) {                    log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;                }            }            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        }    }    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {        @SuppressWarnings("unchecked")        @Override        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {            for (MessageExt messageExt : msgs) {                log.debug("received msg: {}", messageExt);                try {                    long now = System.currentTimeMillis();                    // 实际去处理消息的地方                    handleMessage(messageExt);                    long costTime = System.currentTimeMillis() - now;                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);                } catch (Exception e) {                    log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;                }            }            return ConsumeOrderlyStatus.SUCCESS;        }    }
复制代码

从上面的源码中可以看到, 当 handleMessage 抛出异常时, 会给 Broker 返回失败的响应. 所以在我们上面实现的消费端的代码就可能会出现问题, 我执行了业务逻辑, 但是没有抛出异常, 但是执行的结果不是我预期的, 就会导致 Broker 认为我成功消费了消息, 所以我对消费端做了一些改动, 只有当我业务逻辑返回的状态事 true 时才认为成功消费了消息.改后的代码如下:

所以说上来就使用人家封装好的 Starter, 真的是不利于学习人家的设计精髓, 建议小伙伴们还是看 RocketMQ 自己的 Client 来学习吧.

@Slf4j@Service@RocketMQMessageListener(nameServer = "${rocketmq.name-server}",        topic = "${demo.rocketmq.topic}",        consumerGroup = "user_consumer_reliability",        selectorExpression = "reliability")public class ReliabilityConsumer implements RocketMQListener<SyncUserRequest>  {    @Resource    private ISysUserInfoService sysUserInfoService;    @Override    public void onMessage(SyncUserRequest request) {        // 通过阅读源码 DefaultRocketMQListenerContainer        // 可以看到当这个方法抛出异常时, 会通知Broker消息消费失败, 需要重试        boolean success = sysUserInfoService.saveOrUpdateUserInfo(request);        if(!success) {            log.error("用户信息同步失败");            throw new BusinessException("用户信息同步失败");        }    }}
复制代码

​总结

好了今天的文章就写到这,大家不妨回去自己动手写个简单的 demo 试一下,把源码找出来自己熟悉一下,希望大家都不丢消息。





发布于: 2021 年 01 月 10 日阅读数: 34
用户头像

废材姑娘

关注

废材姑娘 2018.01.24 加入

大家叫我双儿,梦想着成为韦小宝的老婆 欢迎关注我的个人公众号----废材姑娘,回复“双儿”加我微信,让我们一起探索多彩的世界。

评论

发布
暂无评论
RocketMQ如何保证消息可靠性