写点什么

🏆【Alibaba 中间件技术系列】「RocketMQ 技术专题」帮你梳理 RocketMQ 相关的消费问题以及原理分析总结

作者:浩宇天尚
  • 2021 年 11 月 13 日
  • 本文字数:2749 字

    阅读完需:约 9 分钟

🏆【Alibaba中间件技术系列】「RocketMQ技术专题」帮你梳理RocketMQ相关的消费问题以及原理分析总结

消息重复消费的问题

消息重复消费是各个 MQ 都会发生的常见问题之一,在一些比较敏感的场景下,重复消费会造成比较严重的后果,比如重复扣款等。

消息重复消费场景及解决办法

在什么情况下会发生 RocketMQ 的消息重复消费呢?

生产者重复发送场景

当系统的调用链路比较长的时候,比如,系统 A 调用系统 B,系统 B 再把消息发送到 RocketMQ 中,在系统 A 调用系统 B 的时候。


如果系统 B 处理成功,但是迟迟没有将调用成功的结果返回给系统 A 的时候,系统 A 就会尝试重新发起请求给系统 B,造成系统 B 重复处理,发起多条消息给 RocketMQ 造成重复消费。

消费者重复发送场景

在系统 B 发送消息给 RocketMQ 的时候,也有可能会发生和上面一样的问题,消息发送超时,结果系统 B 重试,导致 RocketMQ 接收到了重复的消息。

消费者重复发送场景

当 RocketMQ 成功接收到消息,并将消息交给消费者处理,如果消费者消费完成后还没来得及提交 offset 给 RocketMQ,自己宕机或者重启了,那么 RocketMQ 没有接收到 offset,就会认为消费失败了,会重发消息给消费者再次消费。

消费者没有立刻返回成功

重复消费的问题的一个可能的问题:消费者消费消息时产生了异常,并没有返回 CONSUME_SUCCESS 标志。


因为消息处理异常导致的消息重新消费,RocketMQ 可以很好的保持消息,一定要消费成功才可以!

官方对 comsumerMessage 方法
It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure
复制代码


无论如何,都不要抛出异常,如果需要重新消费,可以返回 RECONSUME_LATER 主动要求重新消费。


catch Exception 根异常来捕获业务处理的异常:


consumer.registerMessageListener(new MessageListenerConcurrently() {                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,                    ConsumeConcurrentlyContext context) {                    logger.debug(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");                    MessagePack msgpack = new MessagePack();                    for (MessageExt msg : msgs){                        byte[] data = msg.getBody();                        try {                            RTMsgPack rtmsg = msgpack.read(data, RTMsgPack.class);                            logger.debug("Receive a message:" + rtmsg);                            anlysisRTMsgPack(rtmsg, engine);                        } catch (IOException e) {                            logger.error("Unpack RTMsg:", e);                        } catch (Exception e1){                            logger.warn("Unexcepted exception.", e1);                        }                    }                    logger.debug("RETURN CONSUME SUCCESS.");                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;                } });
复制代码

设置 CONSUME_FROM_LAST_OFFSET 的问题

Consumer 在消费时,会设置从哪里开始消费。默认是 CONSUME_FROM_LAST_OFFSET,设置的值如代码所示。


public enum ConsumeFromWhere {    /**     * 一个新的订阅组第一次启动从队列的最后位置开始消费<br>     * 后续再启动接着上次消费的进度开始消费     */    CONSUME_FROM_LAST_OFFSET,    @Deprecated   CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,    @Deprecated    CONSUME_FROM_MIN_OFFSET,    @Deprecated    CONSUME_FROM_MAX_OFFSET,    /**     * 一个新的订阅组第一次启动从队列的最前位置开始消费<br>     * 后续再启动接着上次消费的进度开始消费     */    CONSUME_FROM_FIRST_OFFSET,    /**     * 一个新的订阅组第一次启动从指定时间点开始消费<br>     * 后续再启动接着上次消费的进度开始消费<br>     * 时间点设置参见DefaultMQPushConsumer.consumeTimestamp参数     */    CONSUME_FROM_TIMESTAMP,}
复制代码


  • CONSUME_FROM_LAST_OFFSET:从最后的偏移量开始消费,是从该消费者上次消费到的位置开始消费。

  • 如果是一个新的消费者,就要根据这个 client 所属的消费组的情况来判断。

  • 如果所属的消费者组是新上线的,订阅的消息,最早的消息都没有过,RocketMQ 的设计者认为,你这是一个新上线的业务,会强制从第一条消息开始消费。

  • 如果订阅的消息,已经产生了过期消息,那么才会从我们这个 client 启动的时间点开始消费。


ConsumeFromWhere 这个参数只对一个新的消费者第一次启动时有效


  • CONSUME_FROM_FIRST_OFFSET:从最小偏移量开始消费,

  • CONSUME_FROM_TIMESTAMP:从某个时间开始消费。

  • 而判断是不是一个新的 ConsumerGroup 是在 broker 端判断。

  • 消费到哪个 offset 最先是存在 Consumer 本地的,定时和 broker 同步自己的消费 offset。

  • broker 在判断是不是一个新的 consumergroup,就是查 broker 端有没有这个 consumergroup 的 offset 记录。

偏移量无效化

对于一个新的 queue,这个参数也是没用的,都是从 0 开始消费。


所以,这就有了一个问题我已经设置了 CONSUME_FROM_LAST_OFFSET,为什么还是重复消费了,可能你这不是新的 consumergroup,也可能是个新的 Queue。

重试队列和死信队列

  • 消费端,一直不回传消费的结果。RocketMQ 认为消息没收到,consumer 下一次拉取,broker 依然会发送该消息。

  • 任何异常都要捕获返回:ConsumeConcurrentlyStatus.RECONSUME_LATER


RocketMQ 会放到重试队列,TOPIC 是:%RETRY%+COnsumerGroup 的名字


  • 重试的消息在延迟的某个时间点(默认是 10 秒,业务可设置)后,再次投递到这个 ConsumerGroup。

  • 而如果一直这样重复消费都持续失败到一定次数(默认 16 次),就会投递到 DLQ 死信队列,此时需要人工干预了。


/**Batch consumption size*/
private int consumeMessageBatchMaxSize = 1;
/**
Batch pull size*/
private int pullBatchSize = 32;
复制代码


  • consumeMessageBatchMaxSize 是批量消费的最大条数

  • pullBatchSize 是每次拉取的最大条数

broker 端的

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
复制代码


参数是设置重试的时间,即第一次 1s 之后,第二次 5s 之后

生产环境不要改

messageDelayLevel = 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s
复制代码


16 次之后,多了一个 topic 名为:%DLQ%+consumergroup



这个默认的 16 次,可以改,但是使用 DefaultMQPullConsumer 才可以修改。


DefaultMQPushConsumer 不能修改此值。


consumeMessageBatchMaxSize 这个 size 是消费者注册的回调 listener 一次处理的消息数,默认是 1,不是每次拉取的消息数(默认是 32),这个不要搞混。

消息消费进度的更新

未来的文章会进行介绍相关进度更新的功能和分析

发布于: 5 小时前阅读数: 5
用户头像

浩宇天尚

关注

🏆 InfoQ写作平台-签约作者 🏆 2020.03.25 加入

【个人简介】酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“ 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、APM专题及微服务/分布式体系等

评论

发布
暂无评论
🏆【Alibaba中间件技术系列】「RocketMQ技术专题」帮你梳理RocketMQ相关的消费问题以及原理分析总结