写点什么

RocketMQ 实战—消息重复 + 乱序 + 延迟的处理

作者:EquatorCoco
  • 2025-02-07
    福建
  • 本文字数:14621 字

    阅读完需:约 48 分钟

1.根据 RocketMQ 原理分析为什么会重复发优惠券


(1)客服反馈有用户重复收到了多个优惠券


有用户在支付一个订单后,一下子收到了多个优惠券,本来按照规则只应该有一个优惠券的。也就是说,订单系统给用户重复发放了多个优惠券。

 

(2)问题定位为优惠券系统重复消费了消息


现在订单系统已和各个系统进行了解耦,当订单支付成功后,会发送一条消息到 MQ。然后红包系统会从 MQ 里获取消息进行红包派发,优惠券系统会从 MQ 里获取消息进行优惠券派发,其他系统也是同理。

 

但是现在出现了一个奇怪的问题:就是优惠券系统似乎对同一条消息重复处理了两次,导致给一个用户重复派发了两个优惠券。



优惠券重复派发两次的问题已经定位到了,就是:优惠券系统对同一条订单支付成功的消息处理了两次,导致给用户重复发放了优惠券。那么接下来的问题是,为什么优惠券会对同一条消息重复处理两次。

 

要明白为什么优惠券系统对同一条消息重复处理了两次,先来研究第一个问题:订单系统收到一个支付成功的通知后,它在发送消息到 MQ 时,是否会重复把一条消息发送两次。

 

(3)订单系统因处理超时被重复回调


首先考虑第一种情况:假设用户在支付成功后,订单系统收到了一个支付成功的通知,接着它就向 MQ 发送了一条订单支付成功的消息。但是偏偏可能因为某些原因,导致订单系统处理的速度有点慢。



然后可能因为订单系统处理的速度有点慢,导致支付系统跟订单系统之间的请求出现了超时。此时支付系统便有可能再次重试调用订单系统的接口,去通知这个订单已经支付成功了。然后订单系统这时可能又一次推送了一条消息到 MQ,相当于一条订单支付成功的消息,重复推送了两次到 MQ,于是 MQ 里就有两条同样的订单支付成功消息。



之后,优惠券系统便会消费到这两条重复的消息,重复派发两个优惠券给用户。由此可见,发送消息到 MQ 的订单系统,如果出现了接口超时等问题,可能会导致上游的支付系统重试调用订单系统的接口,从而导致订单系统对同一条订单支付成功的消息重复发送两次到 MQ。



(4)订单系统因发送 RocketMQ 异常重试发送


接着考虑第二种情况:假设订单系统为了保证消息一定能投递到 MQ,而采用了重试的代码,如下所示。这种重试的方式是一把双刃剑,正是因为这个重试可能导致消息重复发送。


try {    //执行订单本地事务    orderService.finishOrderPay();    //发送消息到MQ去    producer.sendMessage();} catch (Exception e) {    //如果发送消息失败了,进行重试    for (int i=0; i<3; i++) {        //重试发送消息    }    //如果多次重试发送消息后,还是不行,就回滚本地订单事务    orderService.rollbackOrderPay();}
复制代码


假设生产者发送了一条消息到 MQ,其实 MQ 已经接收到这条消息了。但是 MQ 返回响应给生产者时,网络有问题超时了,导致生产者没能及时收到 MQ 返回来的响应。此时 MQ 里已经有生产者发送过来的消息了,只不过它返回给生产者的响应没能及时给到生产者而已。



这时订单系统的代码里可能会有一个网络超时的异常,然后订单系统就会进行重试,再次发送这条消息到 MQ 里去。于是 MQ 也就会收到一条一模一样的消息,从而导致消息重复发送了。所以这种重试代码在使用时一定要小心,因为它还是有一定的概率会导致生产者重发消息的。



(5)优惠券系统重复消费一条消息


即使生产者没有重复发送消息到 MQ,哪怕 MQ 里就一条消息,优惠券系统也有可能会重复消费,这是为什么呢?

 

假设优惠券系统拿到了一条订单成功支付的消息,然后都已经进行了处理,都已经对这个订单的用户发放了一张优惠券。根据前面介绍,这时优惠券系统应该返回一个 CONSUME_SUCCESS 的状态,然后提交消费进度 offset 到 Broker。

 

但是碰巧的是,优惠券系统刚刚发放完优惠券,还没来得及提交消息 offset 到 Broker,优惠券系统就进行了一次重启。比如可能优惠券系统的代码更新了,需要重启进行重新部署。



这时候因为优惠券系统没提交这条消息的 offset 给 Broker,Broker 并不知道优惠券系统已经处理完了这条消息。然后优惠券系统重启后,Broker 就会再次把这条消息交给它进行处理。于是优惠券系统便会再次发送一张优惠券,从而导致重复发送了两次优惠券。这就是对同一条消息,优惠券系统重复处理两次的原因。



(6)消息重复问题是一种常见的问题


实际上类似优惠券系统这样的业务系统,肯定是会频繁更新代码的,可能每隔几天就需要重启一次系统进行代码更新。所以,重启优惠券系统时,可能有一批消息刚处理完还没来得及提交 offset 给 Broker,然后重启后会再一次重复处理这批消息,这种情况发生的概率比较大。

 

另外对于系统之间的调用,出现超时和重试的情况也是很常见的。所以负责发消息到 MQ 的系统,很可能时不时出现一次超时,然后被其他系统重试调用其接口,于是生产者可能就会重复发送一条消息到 MQ。

 

所以,消息重复问题是一种常见的问题。

 

2.引入幂等性机制来保证数据不会重复


(1)什么是幂等性机制


幂等性机制,就是避免对同一请求或同一消息进行重复处理的机制。幂等性指的是:比如有一个接口,如果调用方对该接口的一次请求重试了多次,那么该接口需要保证自己系统的数据是正常的,不能多出一些重复的数据。

 

幂等对于 RocketMQ 而言,就是从 RocketMQ 里获取消息时,要保证对同一条消息只能处理一次,不能重复处理多次,导致出现重复的数据。因此要解决消息重复问题,关键就是要引入幂等性机制。

 

(2)发送消息到 RocketMQ 时如何保证幂等性


订单系统发送消息到 RocketMQ,需要保证幂等性吗?由于订单系统的接口可能会被重复调用导致发送重复的消息到 RocketMQ,也可能有自己的重试机制导致发送重复的消息到 RocketMQ。如下图示:



那么如果想要让订单系统别发送重复的消息到 RocketMQ 去,应该怎么做呢?大体上来说,常见的方案有两种。第一个方案是业务判断法,第二个方案是状态判断法。

 

(3)基于查询 RocketMQ 的业务判断法


也就是说,订单系统必须要知道是否已成功发送消息到 RocketMQ,消息是否已在 RocketMQ 里。

 

举个例子,当支付系统重试调用订单系统的接口时,订单系统可以发送一个请求到 RocketMQ,查询一下当前 RocketMQ 里是否存在针对这个订单的支付消息。如果 RocketMQ 响应订单系统之前已经写入过这条消息了,那么订单系统就可以不用发送这条消息到 RocketMQ 了。



这个业务判断法的核心就在于:只有 RocketMQ 才知道消息是否发送过。如果没发送过这条消息,RocketMQ 里肯定没有这条消息。如果发送过这条消息,RocketMQ 里肯定有这条消息。

 

所以当订单系统的接口被支付系统重试调用时,应该先发送请求到 RocketMQ 里查询消息是否已存在。

 

(4)基于 Redis 缓存的状态判断法


这个状态判断法的核心在于,需要引入一个 Redis 缓存来存储消息是否已被订单系统发送过。如果订单系统成功发送了一条消息到 RocketMQ,需要在 Redis 缓存里写一条数据,标记该消息已经发送过。



那么当订单系统的接口被支付系统重复调用时,就可以根据订单 ID 去 Redis 缓存里查询一下这个订单的支付消息是否已经发送给 RocketMQ。如果已经发送过,就不用再发送了。



上述两种幂等性机制都是很常用的,但是需要注意的是,基于 Redis 的状态判断法也有可能没办法完全做到幂等性。

 

举个例子,支付系统发送请求给订单系统,然后已经发送消息到 RocketMQ 去了,但此时订单系统突然崩溃了,没来得及把消息发送的状态写入 Redis。



这时候如果订单系统在其他机器上部署了,或者他重启了。那么当订单系统被重试调用时,它去找 Redis 查询消息发送状态,会以为消息没发送过,然后会再次发送重复消息到 RocketMQ。



所以这种方案一般情况下是可以做到幂等性的,但是如果有时刚发送了消息到 RocketMQ,还没来得及写 Redis,系统就挂了,之后接口被重试调用时,接口去查 Redis 还以为消息没发过,就会发送重复的消息到 RocketMQ 去。

 

(5)没必要保证生产者不重复发送消息


在这个场景中,如果在订单系统环节要保证消息不重复发送,要么是直接通过查询 RocketMQ 来判断消息是否发过,要么是通过引入 Redis 来保存消息发送状态。但其实这两种方案都不是太好。

 

因为 RocketMQ 虽然支持查询某个消息是否存在,但是在生产者环节直接从 RocketMQ 查询消息是没必要的,而且它的性能也不是太好,会影响的生产者接口的性能。

 

另外基于 Redis 的消息发送状态的方案,在极端情况下还是没法 100%保证幂等性,所以也不是特别好的一个方案。

 

所以,建议不在生产者发送消息的环节保证幂等性,也就是可以默许生产者可能会发送重复的消息到 RocketMQ。

 

(6)如何保证消费者处理消息的幂等性


假设优惠券系统拿到了重复的消息,那么如何保证消息处理的幂等性?这就比较简单了,直接基于业务判断法即可。因为优惠券系统每次拿到一条消息后给用户发一张优惠券,实际上就是在数据库里给用户插入一条优惠券记录。

 

所以,如果优惠券系统从 RocketMQ 那里拿到一个订单的两条重复的支付成功消息,这时候只要去优惠券数据库中查询一下,比如查询 id=1100 的订单是否已经发放过优惠券、是否有优惠券的记录,如果有的话,就不要重复发券了。通过这个业务判断法,就可以简单高效地避免消息的重复处理。

 

(7)RocketMQ 消息幂等性的方案总结


一般来说,可以往 RocketMQ 里重复发送一样的消息。因为 RocketMQ 里有多条重复消息,不会对系统的核心数据直接造成影响。但关键要保证的是,消费者从 RocketMQ 获取消息进行处理时,消息不能被重复处理。

 

为了保证消息的幂等性,优先推荐的还是业务判断法。直接根据数据库存储中的记录来判断这个消息是否处理过,如果处理过了,那就别再次处理了。因为已经知道,基于 Redis 的消息发送状态的方案,在一些极端情况下还是没法 100%保证幂等性的。

 

3.如何用死信队列处理优惠券系统数据库宕机


(1)如果优惠券系统的数据库宕机会怎样


前面分析和解决了 RocketMQ 在使用过程中可能存在的消息丢失问题和消息重复问题,现在假设基本可以确保 RocketMQ 的消息不丢失,同时不会对消息进行重复处理。在正常流程下,基本没什么问题了。但是优惠券系统的数据库宕机了,那又该怎么处理呢?

 

如果优惠券系统的数据库宕机了,必然会导致优惠券系统没办法处理从 RocketMQ 获取到的消息。



所以,针对这样的异常场景应该怎么处理?优惠券系统应该怎么对消息进行重试?重试多少次才行?万一反复重试都没法成功,这时候消息应该放哪儿去,是否直接丢弃?

 

(2)数据库宕机时监听器回调函数还可以返回 CONSUME_SUCCESS 吗


在下面的代码片段中可以看到,优惠券系统是如何使用 RocketMQ 的 Consumer 从 RocketMQ 中获取消息的:注册一个监听器回调函数,当 Consumer 获取到消息后,就会交给该函数来处理。


consumer.registerMessageListener(    new MessageListenerConcurrently() {        @Override        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {            //在这里对获取到的msgs订单消息进行处理            //比如增加积分、发送优惠券、通知发货等            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        }    });
复制代码


我们可以在这个回调函数中对消息进行处理,比如发红包、发优惠券之类的。处理完成之后,就可以返回一个状态告诉 Consumer 和 RocketMQ 关于这批消息的处理结果。

 

比如,如果返回的是 CONSUME_SUCCESS,那么 Consumer 就知道这批消息处理完成了,就会提交这批消息的 offset 到 Broker,然后下一次就会继续从 Broker 获取下一批消息来处理。

 

但是如果在上面的回调函数中,对一批消息发放优惠券时,由于数据库宕机了,导致优惠券发放逻辑无法完成,此时监听器回调函数还能返回 CONSUME_SUCCESS 状态吗?如果返回该状态的话,下一次就会处理下一批消息,但是这批消息其实还没处理成功,此时必然会导致这批消息丢失,从而导致有一批用户没法收到优惠券。

 

(3)当消息处理异常时监听器回调函数可返回 RECONSUME_LATER 状态


如果因为数据库宕机等问题,对这批消息的处理出现异常,那么监听器回调函数就应该返回一个 RECONSUME_LATER 状态。

 

这个状态的意思是,现在没法完成这批消息的处理,麻烦消费者稍后再次获取这批消息进行重新消费,所以代码会改成如下的方式:


consumer.registerMessageListener(    new MessageListenerConcurrently() {        @Override        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {            try {                //在这里对获取到的msgs订单消息进行处理                //比如增加积分、发送优惠券、通知发货等                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            } catch (Exception e) {                //如果因为数据库宕机等问题,对消息处理失败了                //此时返回一个稍后重试消费的状态                return ConsumeConcurrentlyStatus.RECONSUME_LATER;            }        }    });
复制代码


在上面的代码中看到,如果消息处理失败了,就返回 RECONSUME_LATER 状态,让 RocketMQ 稍后再重新把这批消息发过来,让消费者可以对这批消息进行重试处理。

 

(4)RocketMQ 如何让消费者进行消费重试


RocketMQ 在收到返回的 RECONSUME_LATER 状态后,是如何让消费者进行消费重试的?

 

简单来说,RocketMQ 会有一个针对这个 ConsumerGroup 的重试队列。如果返回了 RECONSUME_LATER 状态,RocketMQ 就会把这批消息放到这个消费组的重试队列中。

 

比如消费组的名称是 VoucherConsumers,意思是优惠券系统的消费组,那么 RocketMQ 会有一个名为 %RETRY%VoucherConsumers 的重试队列。



然后过一段时间后,重试队列中的消息会再次给到消费者,让消费者进行处理。如果再次失败,又返回了 RECONSUME_LATER,那么会再过一段时间让消费者继续进行重试处理,默认最多重试 16 次。每次重试之间的间隔时间是不一样的,这个间隔时间可以如下进行配置:


messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
复制代码


上面这段配置的意思是:第一次重试是 1 秒后,第二次重试是 5 秒后,第三次重试是 10 秒后,第四次重试是 30 秒后,第五次重试是 1 分钟后。以此类推,最多重试 16 次。

 

(5)如果连续重试 16 次还是无法处理消息


在 16 次的重试范围内,如果处理消息成功了,那么自然就没问题了。但是如果消费者对一批消息重试了 16 次还是无法处理成功,这时就需要另外一个队列了,该队列叫做死信队列。所谓死信队列,就是死掉的消息就会放这个队列里。

 

所谓死掉的消息,其实就是一批消息交给消费者处理,消费者重试了 16 次都没法处理成功,那么 RocketMQ 就认为它们死掉了,然后会把这批消息放入死信队列中。

 

死信队列的名字是 %DLQ%VoucherConsumers,这可以在 RocketMQ 的管理后台上看到。



那么死信队列中的消息应该怎么处理呢,这个就看使用场景了。比如可以专门启动一个后台线程订阅 %DLQ%VoucherConsumers 这个死信队列,然后对死信队列中的消息一直不停地进行重试处理。

 

(6)消息处理失败场景下的方案总结


这里介绍了另外一个生产环境下的问题,就是消费者依赖的一些系统可能有故障,比如数据库宕机、缓存宕机等。此时消费者没办法完成消息的处理,那么可以通过一些返回状态去让消息进入 RocketMQ 自带的重试队列。同时如果反复重试还是不行,可以让消息进入 RocketMQ 自带的死信队列,后续再单独针对死信队列中的消息进行处理。

 

4.基于 RocketMQ 的订单库同步为什么会消息乱序


在介绍完常规场景下使用 MQ 过程中可能遇到的三大问题:消息丢失、消息重复、处理失败问题及其对应的解决方案后,下面来看特殊场景下的 MQ 使用问题:消息乱序问题。

 

(1)大数据团队同步订单数据库的技术方案


如果让大数据系统在订单系统的数据库上直接跑复杂的大 SQL 来得出一些数据报表,那么是会严重影响订单系统性能的。所以通常会优化成:大数据团队获取订单数据库中的全部数据,然后将订单数据保存一份在自己的存储系统中,比如 HDFS、Hive、HBase 等,接着再基于大数据技术对这些数据进行计算得出数据报表,如下图示:



具体的做法是:基于 Canal 这样的中间件去监听订单数据库的 binlog,然后把这些 binlog 发送到 MQ,接着大数据系统从 MQ 里获取这些 binlog 落地到自己的存储中,最后基于自己存储中的数据进行计算来得出数据报表。


 

(2)大数据团队遇到了数据指标错误的问题


这个技术方案原本以为会运行的很良好,结果没想到在上线这个技术方案一段时间后,遇到了一些奇怪的问题。大数据团队通过这个方案计算出来的数据报表,被发现很多数据指标都是错误的。

 

于是展开排查:在对大数据存储中的订单数据与订单数据库中的订单数据进行对比后,发现其中一些订单数据是不对的。比如在订单数据库中一个订单的字段 A 的值是 100,而在大数据存储中的一个订单的字段 A 的值是 0。可见,大数据存储中的订单数据有误。

 

(3)原因是订单数据库的 binlog 消息乱序了


接着,通过打印系统日志,然后观察发现:订单数据库的 binlog 在基于 RocketMQ 同步的过程中,出现了消息乱序的问题。比如订单系统在更新订单数据库的时候,有两条 SQL 语句:


insert into order values(xx, 0)update order set xxvalue=100 where id=xxx
复制代码


也就是先插入一条订单数据,刚开始某个字段的值是 0,接着更新该字段的值是 100。这样这两条 SQL 语句便会对应两条 binlog:一条 binlog 是 insert 语句的,另一条 binlog 是 update 语句的,这两条 binlog 会进入到 MQ 中。

 

当大数据系统从 RocketMQ 获取 binlog 时,居然先获取出 update 语句的 binlog,再获取出 insert 语句的 binlog。所以会先执行更新操作,但是此时数据根本不存在,没法进行更新。然后执行插入操作,插入一条字段值为 0 的订单数据,最后在大数据存储中该订单记录的字段值便是 0 了。正是这个消息乱序的原因,导致大数据存储中的数据有误。


 

(4)为什么基于 MQ 传输数据会出现消息乱序


由于可以给每个 Topic 指定多个 MessageQueue,所以生产者写入消息时,其实是会把消息均匀分发给不同的 MessageQueue 的。

 

比如在将 binlog 写入到 RocketMQ 时,可能会把 insert binlog 写入到 MessageQueue01,update binlog 写入到 MessageQueue02。



接着大数据系统在获取 binlog 时,可能会部署多台机器组成一个 Consumer Group。这样,Consumer Group 中的每台机器都会负责消费一部分 MessageQueue 的消息。所以可能一台机器从 ConsumeQueue01 中获取到 insert binlog,一台机器从 ConsumeQueue02 中获取到 update binlog。



由于是两台机器上的大数据系统并行去获取 binlog,所以完全有可能出现其中一个大数据系统先获取到 update binlog 去执行更新操作,而此时存储中是没有这条数据的,自然就没法更新。然后另一个大数据系统再获取到 insert binlog 去执行插入操作,最终导致该订单数据的某字段值为 0。

 

(5)消息乱序是必须要正视的一个问题


可见在使用 RocketMQ 时,出现消息乱序是非常正常的一个现象。因为原本有顺序的消息,完全可能被分发到不同的 MessageQueue 中,然后不同机器上部署的 Consumer 从不同的 MessageQueue 中获取这些原来有序的消息时,消息间的顺序便被打乱了。因此,在使用 RocketMQ 时需要考虑消息乱序问题。

 

5.如何解决 RocketMQ 的消息乱序问题


(1)RocketMQ 中消息乱序的原因


在订单数据库的同步过程中,产生消息乱序问题的根本原因是:属于同一个订单的 binlog 进入了不同的 MessageQueue,从而导致一个订单的 binlog 被不同机器上的 Consumer 获取来处理,最终导致这一个订单的 binlog 被乱序执行。所以这个消息乱序的原因可如下图示:


 

(2)让同一订单的 binlog 进入同一 MessageQueue


所以要解决消息乱序的问题,方法就是让同一个订单的 binlog 进入同一个 MessageQueue 里。具体的做法就是往 RocketMQ 发送 binlog 时根据订单 ID 对 MessageQueue 的数量进行取模来选择一个 MessageQueue。

 

比如有一个订单 ID 是 10,它有 2 条 binlog,对这两条 binlog,用订单 ID=10 对 MessageQueue 的数量进行取模。如果 MessageQueue 一共有 8 个,那么此时订单 ID=10 对 8 取模就是 2。也就是说,凡是订单 ID=10 的 binlog,都应该进入位置为 2 的 MessageQueue 中。

 

通过这个方法,就可以让一个订单的 binlog 都按顺序进入同一个 MessageQueue 中。



(3)消费者获取 binlog 时也需要有序


是否只要同一个订单的 binlog 都进入到同一个 MessageQueue,就能解决消息乱序的问题?

 

显然不是。

 

首先,MySQL 数据库中存储的 binlog 一定都是有序的。比如订单系统对订单数据库执行了两条 SQL,先是 insert 语句,然后是 update 语句。那么 MySQL 数据库在磁盘文件里会按顺序先写入 insert 语句的 binlog,然后再写入 update 语句的 binlog。

 

然后,从 MySQL 数据库中获取 binlog 时,此时必须按 binlog 的顺序来获取。也就是 Canal 作为一个中间件从 MySQL 中获取 binlog 时,需要按 binlog 的顺序来获取。

 

接着,Canal 将 binlog 发送给 RocketMQ 时,必须要将同一个订单的 binlog 都发送到同一个 MessageQueue 里,而且发送时也必须严格按照顺序来发送。

 

只有这样,才能让同一个订单的 binlog 有序地进入同一个 MessageQueue。之后消费者从这个 MessageQueue 中获取这个订单的 binlog 时,才能有序地获取。

 

由于一个 Consumer 可以处理多个 MessageQueue 的消息,但是一个 MessageQueue 只能交给同一个 Consumer 来处理,所以同一个订单的 binlog 有序地进入同一个 MessageQueue 后,会有序地交给同一个 Consumer 来处理。

 

这样一个大数据系统就可以获取到同一个订单的有序的 binlog,然后根据 binlog 有序地把数据还原到自己的存储中去。当然,大数据系统在处理消息时,需要注意是否使用了多线程来打乱消息处理的顺序。



(4)如果消息处理失败了不能进入重试队列


消费者处理消息时,可能会因为底层存储挂了导致消息处理失败,此时可以返回消息的 RECONSUME_LATER 状态,这样 Broker 过一会儿会自动让消费者进行重试。

 

但是为了保证消息有序,不能让处理失败的消息进入重试队列。因为如果消费者获取到一个订单的一条 insert binlog,结果处理失败了,此时返回了 RECONSUME_LATER。那么这条消息就会进入重试队列,过一会儿才交给消费者重试。但此时 Broker 会直接把下一条消息,也就是这个订单的 update binlog 交给消费者来处理。此时如果消费者执行成功了,发现根本没有数据可以更新,又会出现消息乱序的问题。

 

所以为了保证消息有序,如果消息处理失败,就必须返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 状态,该状态的意思是先等一会儿再继续处理这批消息。而不能把这批消息放入重试队列,然后直接处理下一批消息。

 

(5)有序消息方案与其他消息方案的结合


如果消费者一定要求消息是有序的,那么必须要让:同一个订单的 binlog 都进入同一个 MessageQueue 中 + Cannel 获取和发送 binlog 时要有序 + binlog 写入 MessageQueue 时要有序 + 消费者处理消息时要有序 + 消息处理失败时不能进入重试队列而要暂停等待继续处理。

 

如果在这个方案的基础上还要确保消息不丢失,那么可以和消息零丢失方案结合起来。如果还要避免消息重复处理,那么还需要在消费者处理消息时,判断消息是否已经处理过,已经处理过就不能重复处理了。

 

6.RocketMQ 的顺序消息机制的代码实现


(1)如何让同一个订单的 binlog 进入同一个 MessageQueue


要实现消息顺序,首先需要让同一个订单的 binlog 都进入同一个 MessageQueue 中,此时可以通过如下代码实现:


SendResult sendResult = producer.send(    message,    new MessageQueueSelector() {        @Override        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {            //根据订单ID选择发送queue            Long orderId = (Long) arg;            long index = orderId % mqs.size();//用订单ID对MessageQueue数量取模            return mqs.get((int) index);//返回一个MessageQueue        }    },    orderId//这里传入订单ID);
复制代码


在上面的代码片段中可以看到,关键的地方有两处:

 

一.发送消息时传入 MessageQueueSelector。在 MessageQueueSelector 里需要根据订单 ID 和 MessageQueue 的数量去选择这个订单 ID 的数据要进入哪个 MessageQueue 中。

 

二.发送消息时除了带上消息自己外,还要带上订单 ID。然后 MessageQueueSelector 就会根据订单 ID 去选择一个 MessageQueue 来进行发送,这样就可以保证一个订单的多个 binlog 都会进入一个 MessageQueue 中。

 

(2)消费者如何保证按照顺序来获取一个 MessageQueue 中的消息


要实现消息顺序,然后还需要让消费者按照顺序来获取一个 MessageQueue 中的消息。


consumer.registerMessageListener(    new MessageListenerOrderly() {        @Override        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {            context.setAutoCommit(true);            try {                for (MessageExt msg : msgs) {                    //对有序的消息进行处理                }                return ConsumeOrderlyStatus.SUCCESS;            } catch (Exception e) {                //如果消息处理有问题,返回一个状态,让Broker暂停一会儿再继续处理这批消息                return SUSPEND_CURRENT_QUEUE_A_MOMENT;            }        }    });
复制代码


上述代码使用了 MessageListenerOrderly 这个类,这个类的类名中有 Orderly,表示 Consumer 对每一个 ConsumeQueue 都只用一个线程来处理消息。

 

比如对 ConsumeQueue01 中订单 ID=10 的多个 binlog,会交给一个线程来按照 binlog 的顺序依次处理。否则若 ConsumeQueue01 中的订单 ID=10 的多个 binlog 交给 Consumer 中的多个线程来处理,还是可能会出现消息乱序的问题。

 

7.基于 RocketMQ 的数据过滤机制提升处理效率


(1)混杂在一起的订单数据库的 binlog


首先一个数据库中可能会包含很多表的数据。比如订单数据库,它里面除了订单信息表以外,可能还包含很多其他的表。所以在进行数据库 binlog 同步的时候,很可能是把一个数据库里所有表的 binlog 都推送到 RocketMQ 里去的。因此在 RocketMQ 的某个 Topic 中,可能混杂了订单数据库里几个甚至十几个表的 binlog 数据,不一定只包含大数据系统想要的表的 binlog 数据。



(2)处理不关注的表的 binlog 很浪费时间


假设大数据系统仅仅关注订单数据库中的表 A 的 binlog,并不关注其他表的 binlog。那么大数据系统可能需要在获取到所有表的 binlog 后,对每条 binlog 都判断一下是否是表 A 的 binlog。

 

如果不是表 A 的 binlog,那么就直接丢弃不要处理。如果是表 A 的 binlog,才会去进行处理。但是这样的话,必然会导致大数据系统处理很多不关注的表的 binlog,从而浪费时间并降低处理消息的效率。



(3)在发送消息时给消息设置 tag 和属性


针对这个问题,可以采用 RocketMQ 的数据过滤机制,让大数据系统只关注它要的表的 binlog 数据。

 

首先在发送消息时,给每条消息设置 Tag 和属性:


Message msg = new Message(    "TopicOrderDbData",//这是订单数据库写入的Topic    "TableA",//这是这条数据的tag, 可以是表的名字    ("binlog").getBytes(RemotingHelper.DEFAULT_CHARSET)//这是一条binlog数据);//我们可以给一条消息设置一些属性msg.putUserProperty("a", 10);msg.putUserProperty("b", "abc");
复制代码


(4)在消费数据时根据 tag 和属性进行过滤


然后在消费消息时,根据每条消息的 Tag 值来过滤:


//只需要tag=TableA和tag=TableB的数据consumer.subscribe("TopicOrderDbData", "TableA || TableB");
复制代码


或者根据每条消息的属性值来过滤:


consumer.subscribe(    "TopicOrderDbData",    MessageSelector.bySql("a > 5 AND b = 'abc'")//只要a大于5, b等于abc的数据 );
复制代码


RocketMQ 支持丰富的数据过滤语法,比如:


一.数值比较,比如:>,>=,<,<=,BETWEEN,=;二.字符比较,比如:=,<>,IN;三.IS NULL 或者 IS NOT NULL;四.逻辑符号 AND,OR,NOT;五.数值,比如:123,3.1415;六.字符,比如:'abc',必须用单引号包裹起来;七.NULL,特殊的常量;八.布尔值,TRUE 或 FALSE;
复制代码


(5)基于数据过滤机制减轻 Consumer 负担


在使用 RocketMQ 时,如果 RocketMQ 里混杂了大量的数据,可能 Consumer 只对其中一部分数据感兴趣,那么就可以在 Consumer 端使用 Tag 等数据过滤语法,过滤出消费者自己感兴趣的数据来消费。

 

8.基于延迟消息机制优化订单的定时退款扫描问题


(1)订单创建后被自动关闭的规则


用户在电商 APP 上选择一些商品加入购物车,然后对购物车里选择的一些商品提交订单,此时后台的订单系统会在订单数据库中创建一个订单。

 

用户提交了一个订单后,虽然订单数据库里会有一个订单,订单的状态却是"待支付"状态。因为用户还没有支付这个订单,订单系统也在等待用户完成这个订单的支付。

 

这里就有两种可能:一是用户下单后马上就支付了,二是用户下单后一直在犹豫而迟迟没有下订单。

 

在实际情况中,电商 APP 每天都会有大量用户提交的很多订单,但不少订单都是一直没有进行支付的,可能是由于用户下单后犹豫了、或者忘了支付。



所以订单系统一般会设置一个规则:当一个订单创建后,如果超过 30 分钟没有被支付,那么订单系统就会自动关闭这个订单。

 

(2)定时扫描未支付订单的问题


为此,订单系统可能需要有一个后台线程,不停地扫描订单数据库里所有未支付的订单,然后判断这些订单是否是已经超过 30 分钟都还没支付,如果是则需要把订单状态更新为"已关闭"。



但是通过后台线程不停扫描各种未支付订单的处理方式并不是很好,原因如下:

 

原因一:未支付的订单可能比较多,需要不停进行扫描。可能每个未支付状态的订单要被扫描很多遍,才能发现已经超过 30 分钟没支付了。

 

原因二:很难实现分布式并行扫描这些订单。因为如果订单数量特别多,并且希望用多台机器一起扫描,那么就会面临每台机器应该扫描哪些订单、怎么扫描、什么时候扫描等一系列问题。

 

(3)通过 RocketMQ 的延迟消息解决不停扫描的问题


针对这种需求场景,可以使用 RocketMQ 的延迟消息来解决。所谓延迟消息,就是订单系统在创建一个订单后,可以发送一条消息到 RocketMQ 里,然后指定这条消息是延迟消息,比如要等 30 分钟之后,才能被订单的扫描服务给消费到。



这样当订单扫描服务在 30 分钟后消费到这条消息,就可以根据这条消息去订单数据库里查询订单,判断该订单在完成创建已超 30 分钟了是否还是未支付状态。如果是,则关闭它,否则就不用处理。



这种方式就比用后台线程扫描订单的方式好得多:首先每个订单只会在它创建的 30 分钟后才被查询,不会出现反复被查询多次的情况。然后如果订单数量很多,那么可以让订单扫描服务部署在几台机器上,并且给 Topic 多指定一些 MessageQueue,从而让每台订单扫描服务机器作为一个 Consumer 都可以处理其中一部分订单的查询任务。

 

9.RocketMQ 的延迟消息的代码实现


(1)生产者发送延迟消息的代码示例


public class ScheduledMessageProducer {    public static void main(String[] args) throws Exception {        //这是订单系统的生产者        DefaaultMQProducer producer = new DefaultMQProducer("OrderSystemProducerGroup");        //启动生产者        producer.start();              Message message = new Message(            "CreateorderInformTopic",//这是创建订单通知Topic            orderInfoJSON.getBytes()//这是订单信息的JSON串        );        //这里设置了消息为延迟消息,延迟级别为3        message.setDelayTimeLevel(3);               //发送消息        producer.send(message);    }}
复制代码


上述代码中,发送延迟消息的核心就是设置消息的 delayTimeLevel,也就是延迟级别。RocketMQ 默认支持如下的延迟级别:


1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
复制代码


上述代码中置的延迟级别为 3,意思就是延迟 10s,生产者发送到 MQ 的消息会过 10s 才会被消费者获取到。因此如果是订单延迟扫描的场景,可以设置延迟级别为 16,对应 30 分钟后才能被获取到。

 

(2)消费者消费延迟消息的代码示例


订单扫描服务会在 30 分钟后才获取到每个订单创建的消息,然后会查询该订单的状态,判断订单是否未支付,如果是则关闭订单。


public class ScheduledMessageConsumer {    public static void main(String[] args) throws Exception {        //订单扫描服务的消费者        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderScanServiceConsumer");        //订阅订单创建通知Topic        consumer.subscribe("CreateOrderInformTopic", "*");        //注册消息监听者        consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {                for (MessageExt message : messages) {                    //这里打印一下消息的存储时间到消费时间的差值,大概就是在生产者设置的延迟级别的时间                    System.out.println("Receive message[msgId=" + message.getMsgId() + "]" + (System.currentTimeMills() - message.getStoreTimestamp()) + "ms later");                }                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }        });        //启动消费者        consumer.start();    }}
复制代码


10.RocketMQ 的生产实践经验总结

 

(1)灵活运用 Tags 来过滤数据


在生产项目中,建议合理地规划 Topic 和里面的 Tags。一个 Topic 代表了一类业务数据,对于这类业务数据,如果希望继续划分一些类别,那么可以在发送消息时设置 Tags。

 

比如常见的外卖平台有 A、B、C 几种,现在某系统要发送外卖订单消息到 MQ,那么不同类型的外卖订单消息可以设置不同的 Tags。然后消费外卖订单消息的系统,如果只需要某一种外卖类型的订单消息,就可以根据 Tags 来进行筛选。

 

(2)基于消息 Key 来定位消息是否丢失


在消息零丢失方案中,需要解决的是消息是否丢失的问题。那么如果消息真的丢失了,应该如何进行排查?可不可以在 RocketMQ 里查一下:某条消息是否真的丢失了?

 

其实可以基于消息 Key 来实现,比如通过下面的方式设置一个消息的 Key 为订单 ID,这样这个消息就具备一个 Key 了。


message.setKeys(orderId);
复制代码


接着这个消息被发送到 Broker 时,会基于 Key 构建 Hash 索引,这个 Hash 索引就存放在 IndexFile 索引文件里。然后后续就可以通过 RocketMQ 提供的命令根据 Key 来查询这个消息,类似如下:


mqadmin queryMsgByKey -n 127.0.0.1:9876 -t SCANRECORD -k orderId
复制代码


(3)消息零丢失方案的补充


在消息零丢失的方案中其实还有一个问题:就是 RocketMQ 集群彻底故障了,此时集群都不可用了,那么应该怎么办?

 

对于一些金融级的系统,或者与钱相关的支付系统、广告系统等,都必须要有高可用保障机制。如果 RocketMQ 集群彻底崩溃了,那么生产者就应该把消息写入到本地磁盘文件或者数据库进行持久化,等 RocketMQ 集群恢复后再将持久化的消息投递到 RocketMQ 里。

 

(4)提高消费者的吞吐量


如果消费者消费消息比较慢,那么可能会造成 RocketMQ 中的消息积压,甚至导致 Broker 读消息从内存读变为磁盘读,严重影响读性能。所以需要注意提高消费者的吞吐量,尽量避免 RocketMQ 中的消息积压。常见的做法如下:

 

一.部署更多的 Consumer 机器


注意:部署更多的 Consumer 机器时,Topic 的 MessageQueue 也要对应增加。因为如果 Consumer 机器有 5 台,而 MessageQueue 只有 4 个,那意味着有一个 Consumer 机器是获取不到消息的,一个 MessageQueue 只会给一台 Consumer 机器去消费。

 

二.增加 Consumer 的线程数量


可以设置 Consumer 的参数 consumeThreadMin 和 consumeThreadMax,这样一台 Consumer 机器上的消费线程越多,消费的速度就越快。

 

三.开启消费者的批量消费功能


也就是设置 consumeMessageBatchMaxSize 参数,该参数默认是 1。可以设置该参数的值多一些,这样 RocketMQ 一次就会把一批消息交给消费者的回调函数进行处理。通过批量处理消息的方式,也可以大幅度提升消息消费的速度。

 

(5)要不要消费历史消息


Consumer 支持设置从哪里开始消费消息,常见的设置有两种:一是从 Topic 的第一条数据开始消费:CONSUME_FROM_LAST_OFFSET,二是从最后一次消费过的消息之后开始消费:CONSUME_FROM_FIRST_OFFSET。

 

一般会选择 CONSUME_FROM_FIRST_OFFSET,这样消费者就会从 Topic 的第一条消息开始消费。但以后每次重启,消费者都会从上一次消费到的位置继续往后进行消费。


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18701914

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

EquatorCoco

关注

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
RocketMQ实战—消息重复+乱序+延迟的处理_数据库_EquatorCoco_InfoQ写作社区