写点什么

Rocketmq 并发和顺序消费的失败重试机制

  • 2023-09-19
    江西
  • 本文字数:7861 字

    阅读完需:约 26 分钟

@

  • 问题

  • 并发消费触发时机客户端发起请求 CONSUMER_SEND_MSG_BACKBroker 处理 CONSUMER_SEND_MSG_BACK 请求

  • 顺序消费

  • Q&A 消费的时候是一批的消息, 如果其中某条消费失败了,是所有的消息都会被重试吗?用户可以自己控制重试次数、重试间隔时间吗?批量消费消息,能否自己控制重试的起始偏移量?比如 10 条消息,第 5 条失败了,那么只重试第 5 条和后面的所有。重试的消息是如何被重新消费的?如果关闭了 broker 的写权限,对消息消费的重试有没影响?


1 问题

  1. 消费的时候是一批的消息, 如果其中某条消费失败了,是所有的消息都会被重试吗?

  2. 用户可以自己控制重试次数、重试间隔时间吗

  3. 批量消费消息,能否自己控制重试的起始偏移量?比如 10 条消息,第 5 条失败了,那么只重试第 5 条和后面的所有。

  4. 重试的消息是如何被重新消费的?

  5. 如果关闭了 broker 的写权限,对消息消费的重试有没影响?

  6. 如果一个 Topic 被相同 ConsumerGroup 不同 consumer 顺序消费和并发消费会怎么样?

更详细请看:Rocketmq并发消费失败重试机制

2 并发消费

触发时机

消费者在消费完成之后, 需要处理消费的结果, 是成功或失败

ConsumeMessageConcurrentlyService#processConsumeResult

    /**    * 石臻臻的杂货铺    * vx: shiyanzu001    **/    public void processConsumeResult(        final ConsumeConcurrentlyStatus status,        final ConsumeConcurrentlyContext context,        final ConsumeRequest consumeRequest    ){      int ackIndex = context.getAckIndex();
        if (consumeRequest.getMsgs().isEmpty())            return;
        switch (status) {            case CONSUME_SUCCESS:                if (ackIndex >= consumeRequest.getMsgs().size()) {                    ackIndex = consumeRequest.getMsgs().size() - 1;                }   // 这个意思是,就算你返回了消费成功,但是你还是可以通过设置ackIndex 来标记从哪个索引开始时消费失败了的;从而记录到 消费失败TPS的监控指标中;                int ok = ackIndex + 1;                int failed = consumeRequest.getMsgs().size() - ok;                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);                break;            case RECONSUME_LATER:                ackIndex = -1;                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),                    consumeRequest.getMsgs().size());                break;            default:                break;        }            List<MessageExt> msgBackFailed = new ArrayList<>(consumeRequest.getMsgs().size());                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {                    MessageExt msg = consumeRequest.getMsgs().get(i);                    // Maybe message is expired and cleaned, just ignore it.                    if (!consumeRequest.getProcessQueue().containsMessage(msg)) {                        log.info("Message is not found in its process queue; skip send-back-procedure, topic={}, "                                + "brokerName={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getBrokerName(),                            msg.getQueueId(), msg.getQueueOffset());                        continue;                    }                    boolean result = this.sendMessageBack(msg, context);                    if (!result) {                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);                        msgBackFailed.add(msg);                    }                }
                if (!msgBackFailed.isEmpty()) {                    consumeRequest.getMsgs().removeAll(msgBackFailed);
                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());                }                      //..... 部分代码省略....    }
复制代码

上面省略了部分代码, 上面代码是主要的针对发送失败的消息 发送回 Broker 的情况;

光看代码理解的意思如下

  1. 如果处理结果为 CONSUME_SUCCESS,无需重试, 则记录一下监控指标, 消费成功 TPS 、和 消费失败 TPS ; 这里用户是可以自己通过设置context.setAckIndex()来设置 ACK 的索引值的; 比如你本批次消息量 10 条, 你这里设置为 4; 则表示前面 5 条成功,后面 5 条失败; 当然了,这里并不会给失败的做重试;

  2. 如果处理结果为 RECONSUME_LATER, 则表示需要重试, 将该批次的所有消息遍历同步发送回 Broker 中; 如果某个同步请求失败,则会记录下来; 一会在本地客户端重新消费 ;

  3. 将这些消息从 待消费消息 TreeMap 中移除掉(同步发回 Broker 请求失败除外),并获得当前 TreeMap 中最小的值;

  4. 更新本地缓存中的已消费偏移量的值; 以便可以提交消费 Offset

在这里插入图片描述

看图,再讲几个重点

  1. 需要重试的消息, 会优先被发回重试队列中,发送成功之后它会被当做消费成功, 这样做的目的是为了不要让某个消息消费失败就阻碍了整个消费 Offset 的提交;比如, 1、2、3、4 四条消息, 第 1 条消费失败,其他都成功, 那么就因为最小的 Offset 1 失败了导致后面的都不能标标记为成功去提交。所以让 1 也设置为成功,就不会成为阻塞点,当然要把它发送到重试队列中等待重试。

  2. 可提交的消费 Offset 的值永远是 TreeMap 中的最小值, 这个 TreeMap 存放的就是 pullMessage 获取到的所有待消费 Msg。消费成功就删除。比如, 1、2、3、4 四条消息。1、2 消费成功删除了,那么最小的就是 3 这个偏移量,那么它之前的都可以提交了;如果 2、3、4 都消费成功并且删除了,但是 1 还在,那么可提交的偏移量还是当前最小的值 1 ;

用户可自己决定从哪条消息开始重试

上面其实已经说了, 用户可以通过入参ConsumeConcurrentlyContext来设置 ackIndex 控制重试的起始索引;

        /**        * 石臻臻的杂货铺        * vx: shiyanzu001        **/        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {            System.out.printf(" ----- %s 消费消息: %s  本批次大小: %s   ------ ", Thread.currentThread().getName(), msg, msg.size());
            for (int i = 0; i < msg.size(); i++) {                System.out.println("第 " + i + " 条消息, MSG: " + msg.get(i));                try{                 // 消费逻辑                }catch(Exception e){                  // 这条消息失败, 从这条消息以及其后的消息都需要重试                  context.setAckIndex(i-1);                  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;                }                            }             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        });
复制代码

PS: 目前所看为版本(5.1.3), 笔者始终觉得 ackIndex 这个设置有点问题;

  1. 消费成功的时候,设置 ackIndex 才会生效,既然用户返回的都是成功,则表示它并不需要重试; 设置这个值总感觉很别扭。

  2. 消费失败的时候,ackIndex 被强制设置为了-1,表示所有的都要重试, 正常情况来说,批量消费的时候,碰到其中一条失败,那么就应该从这条的索引开始往后的消息都需要重试,前面已经消费的并且成功的并不需要重试;

关于这一点,我更倾向于这是一个 Bug; 或者设计缺陷

优化建议:

  1. 为了兼容之前的逻辑,成功的状态的逻辑就不去修改了

  2. 失败的情况,没有必要强制设置为-1,导致全部重试, 让用户自己也能够通过 ackIndex 来设置重试的部分消息,而不用全部重试

客户端发起请求 CONSUMER_SEND_MSG_BACK

如果该批次的消息消费失败, 则会尝试重试,重试会尝试一条一条的把 Message 发回去

DefaultMQPushConsumerImpl#sendMessageBack

请求头 ConsumerSendMsgBackRequestHeader

目标地址

Message 所在 Broker 的地址

msg.getStoreHost()
复制代码

请求方式

同步请求

请求流程

    /**    * 石臻臻的杂货铺    * wx: szzdzhp001    **/    private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq)        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {        boolean needRetry = true;        try {              // 部分代码忽略....                String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)                    : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());                this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,                    this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());                    } catch (Throwable t) {            log.error("Failed to send message back, consumerGroup={}, brokerName={}, mq={}, message={}",                this.defaultMQPushConsumer.getConsumerGroup(), brokerName, mq, msg, t);            if (needRetry) {            //以发送普通消息的形式发送重试消息                sendMessageBackAsNormalMessage(msg);            }        } finally {            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));        }    }
复制代码
  1. 首次发送的超时时间为 5000ms;请求是RequestCode.CONSUMER_SEND_MSG_BACK

  2. 如果上面的请求发送失败, 则兜底策略为,直接发送普通消息;但是 Topic 为 %RETRY%{consumerGroup};延迟等级为 3 + msg.getReconsumeTimes(); 这里发送消息的 Producer 客户端是 Consumer 在构建实例的时候创建的内置的 Producer 客户端,客户端实例名是:CLIENT_INNER_PRODUCER; 这个发送也是同步发送;超时时间是 3000

  3. 如果上面都失败了,抛出异常了,才会进行本地客户端重试消费(延迟 5 秒);

本地客户端重试是一直重试还是有次数限制?

如果一直失败,并且都是客户端重试,没有次数限制,并且每次都是延迟 5 秒消费;它会成为消费 Offset 的阻塞点;后续的消息都有被重新消费的可能性(比如客户端重启)

在这里插入图片描述

Broker 处理 CONSUMER_SEND_MSG_BACK 请求

AbstractSendMessageProcessor#consumerSendMsgBack



复制代码
  1. 如果当前 Broker 不是 Master 则返回系统异常错误码

  2. 如果消费 Group 订阅关系不存在则返回错误码

  3. 如果brokerPermission权限不可写则返回无权限错误码

  4. 如果当前 Group 的重试队列数量retryQueueNums<=0 返回无权限错误码

  5. 如果该 Group 的重新 Topic 不存在则创建一个,TopicName:%RETRY%GroupName;读写权限

  6. 根据入参offset查找该 Message; 如果没有查询到则返回系统异常错误码

  7. 如果该消息重试次数已经超过了最大次数,或者重试策略为不重试的话,则将消息发送到死信队列里面;死信队列 Topic: %DLQ%GroupName

  8. 如果还没有超过重试次数, 则将消息发送到重试 Topic 里面:%RETRY%GroupName

  9. 如果有 ConsumeMessageHook 列表的话,则执行一下 consumeMessageAfter方法

  10. 返回 Response。

在这里插入图片描述

注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。所以就需要我们消费者端做好消费幂等操作。

3 顺序消费

顺序消费完毕执行处理结果的流程

ConsumeMessageOrderlyService#processConsumeResult

在这里插入图片描述

几个重要点

  1. 顺序消费针对同一个 ProcessQueue 只会有一个消费任务 ConsumeRequest 在执行

  2. 用户返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 则会重试, 重试流程会根据是否超过最大重试次数来决定要不要讲消息发回重试队列中。

  3. 这个发回重试队列是直接使用的 Consumer 内置 Producer 实例直接向重试 Topic %RETRY%{consumerGroup}发送的;

  4. 这个最大重试次数一般是 INTEGER.MAXVALUE;所以一般不会超过,那么就会一直在本地重试,每次重试的时候都是延迟 1s; 这个过程并不会将消息写回到 Broker 中。

  5. 如果某个消息一直消费失败, 那么整个队列消费都会被阻塞。

4Q&A

消费的时候是一批的消息, 如果其中某条消费失败了,是所有的消息都会被重试吗?

如果在消费的时候,你返回的是 ConsumeConcurrentlyStatus#RECONSUME_LATER, 则表示本次消费失败,需要重试,则本次分配到的 Msgs 都会被重试;本次分配的 Msgs 数量是由consumer.setConsumeMessageBatchMaxSize(1)决定的;默认就是 1;表示一次消费一条消息;

用户可以自己控制重试次数、重试间隔时间吗?

可以。控制重试次数:3.4.9 之前是使用 subscriptionGroupConfig 消费组配置retryMaxTimes3.4.9 之后是客户端指定(requestHeader.getMaxReconsumeTimes())这里可以通过Consumer#setMaxReconsumeTimes(最大次数)来设置值并发模式默认 16 次

重试的间隔时间:默认情况下,都是 Broker 端来控制的重试间隔时间,间隔时间是用延迟消息来实现的,比如 Broker 端的延迟级别为 3+重试次数; 默认情况下第一次重试对应的等级 3 的时间间隔为:10s;

想要自定义重试的间隔时间的话,那么就需要自己在消费的时候来处理了,比如

        /**        * 石臻臻的杂货铺        * vx: shiyanzu001        **/        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {            System.out.printf(" ----- %s 消费消息: %s  本批次大小: %s   ------ ", Thread.currentThread().getName(), msg, msg.size());
            for (int i = 0; i < msg.size(); i++) {                System.out.println("第 " + i + " 条消息, MSG: " + msg.get(i));                if(消费失败){                   // 延迟等级5 = 延迟1分钟;                    context.setDelayLevelWhenNextConsume(5);
                  // 或者你也可以根据重试的次数来递增延迟级别                  context.setDelayLevelWhenNextConsume(3 + msg.get(i).getReconsumeTimes());                }                // 需要重试                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                           }            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        });
复制代码

批量消费消息,能否自己控制重试的起始偏移量?比如 10 条消息,第 5 条失败了,那么只重试第 5 条和后面的所有。

可以但是目前仅限于返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 的情况。如果是返回的 ConsumeConcurrentlyStatus.RECONSUME_LATER,则整批的消息都会重试。具体,请看下面的代码

        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {            System.out.printf(" ----- %s 消费消息: %s  本批次大小: %s   ------ ", Thread.currentThread().getName(), msg, msg.size());
            for (int i = 0; i < msg.size(); i++) {                System.out.println("第 " + i + " 条消息, MSG: " + msg.get(i));                try{                 // 消费逻辑                }catch(Exception e){                  // 这条消息失败, 从这条消息以及其后的消息都需要重试                  context.setAckIndex(i-1);                  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;                }                            }             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        });
复制代码

笔者认为,这里应该同样支持 消费失败(RECONSUME_LATER)的情况,来允许用户控制从哪个消息开始才需要重试。

重试的消息是如何被重新消费的?

需要重试的消息,会将消息写入到 %RETRY%{consumerGroup} 重试队列中,等延迟时间一到,客户端会重新消费这些消息。如果超出重试次数,则会放入到死信队列 %DLQ%{consumerGroup}中。不会再重试

在这里插入图片描述

如果关闭了 broker 的写权限,对消息消费的重试有没影响?

答: 有影响。

消费重试的机制是,先往 Broker 发回重试消息,如果你把写权限关闭了,那么这个流程就阻塞了,就会在本地客户端一直重试, 无限次数的延迟 5s 进行消费。当然,如果一直本地重试的话,这个 Msg 就会成功消费的一个阻塞点,所有它后面的 Offset 就算被消费了,也提交不了。所以关闭 Broker 写权限还是需要慎重。



更详细请看:Rocketmq并发消费失败重试机制

发布于: 刚刚阅读数: 3
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019-09-06 加入

进高质量滴滴技术交流群,只交流技术不闲聊 加 szzdzhp001 进群 20w字《Kafka运维与实战宝典》PDF下载请关注公众号:石臻臻的杂货铺

评论

发布
暂无评论
Rocketmq并发和顺序消费的失败重试机制_RocketMQ_石臻臻的杂货铺_InfoQ写作社区