Rocketmq 并发和顺序消费的失败重试机制
@
问题
并发消费触发时机客户端发起请求 CONSUMER_SEND_MSG_BACKBroker 处理 CONSUMER_SEND_MSG_BACK 请求
顺序消费
Q&A 消费的时候是一批的消息, 如果其中某条消费失败了,是所有的消息都会被重试吗?用户可以自己控制重试次数、重试间隔时间吗?批量消费消息,能否自己控制重试的起始偏移量?比如 10 条消息,第 5 条失败了,那么只重试第 5 条和后面的所有。重试的消息是如何被重新消费的?如果关闭了 broker 的写权限,对消息消费的重试有没影响?
1 问题
消费的时候是一批的消息, 如果其中某条消费失败了,是所有的消息都会被重试吗?
用户可以自己控制重试次数、重试间隔时间吗
批量消费消息,能否自己控制重试的起始偏移量?比如 10 条消息,第 5 条失败了,那么只重试第 5 条和后面的所有。
重试的消息是如何被重新消费的?
如果关闭了 broker 的写权限,对消息消费的重试有没影响?
如果一个 Topic 被相同 ConsumerGroup 不同 consumer 顺序消费和并发消费会怎么样?
更详细请看:Rocketmq并发消费失败重试机制
2 并发消费
触发时机
消费者在消费完成之后, 需要处理消费的结果, 是成功或失败
ConsumeMessageConcurrentlyService#processConsumeResult
上面省略了部分代码, 上面代码是主要的针对发送失败的消息 发送回 Broker 的情况;
光看代码理解的意思如下
如果处理结果为 CONSUME_SUCCESS,无需重试, 则记录一下监控指标, 消费成功 TPS 、和 消费失败 TPS ; 这里用户是可以自己通过设置
context.setAckIndex()
来设置 ACK 的索引值的; 比如你本批次消息量 10 条, 你这里设置为 4; 则表示前面 5 条成功,后面 5 条失败; 当然了,这里并不会给失败的做重试;如果处理结果为 RECONSUME_LATER, 则表示需要重试, 将该批次的所有消息遍历同步发送回 Broker 中; 如果某个同步请求失败,则会记录下来; 一会在本地客户端重新消费 ;
将这些消息从 待消费消息 TreeMap 中移除掉(同步发回 Broker 请求失败除外),并获得当前 TreeMap 中最小的值;
更新本地缓存中的已消费偏移量的值; 以便可以提交消费 Offset
在这里插入图片描述
看图,再讲几个重点
需要重试的消息, 会优先被发回重试队列中,发送成功之后它会被当做消费成功, 这样做的目的是为了不要让某个消息消费失败就阻碍了整个消费 Offset 的提交;比如, 1、2、3、4 四条消息, 第 1 条消费失败,其他都成功, 那么就因为最小的 Offset 1 失败了导致后面的都不能标标记为成功去提交。所以让 1 也设置为成功,就不会成为阻塞点,当然要把它发送到重试队列中等待重试。
可提交的消费 Offset 的值永远是 TreeMap 中的最小值, 这个 TreeMap 存放的就是 pullMessage 获取到的所有待消费 Msg。消费成功就删除。比如, 1、2、3、4 四条消息。1、2 消费成功删除了,那么最小的就是 3 这个偏移量,那么它之前的都可以提交了;如果 2、3、4 都消费成功并且删除了,但是 1 还在,那么可提交的偏移量还是当前最小的值 1 ;
用户可自己决定从哪条消息开始重试
上面其实已经说了, 用户可以通过入参ConsumeConcurrentlyContext
来设置 ackIndex 控制重试的起始索引;
PS: 目前所看为版本(5.1.3), 笔者始终觉得 ackIndex 这个设置有点问题;
消费成功的时候,设置 ackIndex 才会生效,既然用户返回的都是成功,则表示它并不需要重试; 设置这个值总感觉很别扭。
消费失败的时候,ackIndex 被强制设置为了-1,表示所有的都要重试, 正常情况来说,批量消费的时候,碰到其中一条失败,那么就应该从这条的索引开始往后的消息都需要重试,前面已经消费的并且成功的并不需要重试;
关于这一点,我更倾向于这是一个 Bug; 或者设计缺陷
优化建议:
为了兼容之前的逻辑,成功的状态的逻辑就不去修改了
失败的情况,没有必要强制设置为-1,导致全部重试, 让用户自己也能够通过 ackIndex 来设置重试的部分消息,而不用全部重试
客户端发起请求 CONSUMER_SEND_MSG_BACK
如果该批次的消息消费失败, 则会尝试重试,重试会尝试一条一条的把 Message 发回去
DefaultMQPushConsumerImpl#sendMessageBack
请求头 ConsumerSendMsgBackRequestHeader
目标地址
Message 所在 Broker 的地址
请求方式
同步请求
请求流程
首次发送的超时时间为 5000ms;请求是
RequestCode.CONSUMER_SEND_MSG_BACK
如果上面的请求发送失败, 则兜底策略为,直接发送普通消息;但是 Topic 为 %RETRY%{consumerGroup};延迟等级为
3 + msg.getReconsumeTimes()
; 这里发送消息的 Producer 客户端是 Consumer 在构建实例的时候创建的内置的 Producer 客户端,客户端实例名是:CLIENT_INNER_PRODUCER; 这个发送也是同步发送;超时时间是3000
如果上面都失败了,抛出异常了,才会进行本地客户端重试消费(延迟 5 秒);
本地客户端重试是一直重试还是有次数限制?
如果一直失败,并且都是客户端重试,没有次数限制,并且每次都是延迟 5 秒消费;它会成为消费 Offset 的阻塞点;后续的消息都有被重新消费的可能性(比如客户端重启)
在这里插入图片描述
Broker 处理 CONSUMER_SEND_MSG_BACK 请求
AbstractSendMessageProcessor#consumerSendMsgBack
如果当前 Broker 不是 Master 则返回系统异常错误码
如果消费 Group 订阅关系不存在则返回错误码
如果
brokerPermission
权限不可写则返回无权限错误码如果当前 Group 的重试队列数量
retryQueueNums
<=0 返回无权限错误码如果该 Group 的重新 Topic 不存在则创建一个,TopicName:%RETRY%GroupName;读写权限
根据入参
offset
查找该 Message; 如果没有查询到则返回系统异常错误码如果该消息重试次数已经超过了最大次数,或者重试策略为不重试的话,则将消息发送到死信队列里面;死信队列 Topic: %DLQ%GroupName
如果还没有超过重试次数, 则将消息发送到重试 Topic 里面:%RETRY%GroupName
如果有 ConsumeMessageHook 列表的话,则执行一下
consumeMessageAfter
方法返回 Response。
在这里插入图片描述
注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。所以就需要我们消费者端做好消费幂等操作。
3 顺序消费
顺序消费完毕执行处理结果的流程
ConsumeMessageOrderlyService#processConsumeResult
在这里插入图片描述
几个重要点
顺序消费针对同一个 ProcessQueue 只会有一个消费任务 ConsumeRequest 在执行
用户返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 则会重试, 重试流程会根据是否超过最大重试次数来决定要不要讲消息发回重试队列中。
这个发回重试队列是直接使用的 Consumer 内置 Producer 实例直接向重试 Topic %RETRY%{consumerGroup}发送的;
这个最大重试次数一般是 INTEGER.MAXVALUE;所以一般不会超过,那么就会一直在本地重试,每次重试的时候都是延迟 1s; 这个过程并不会将消息写回到 Broker 中。
如果某个消息一直消费失败, 那么整个队列消费都会被阻塞。
4Q&A
消费的时候是一批的消息, 如果其中某条消费失败了,是所有的消息都会被重试吗?
如果在消费的时候,你返回的是 ConsumeConcurrentlyStatus#RECONSUME_LATER, 则表示本次消费失败,需要重试,则本次分配到的 Msgs 都会被重试;本次分配的 Msgs 数量是由
consumer.setConsumeMessageBatchMaxSize(1)
决定的;默认就是 1;表示一次消费一条消息;
用户可以自己控制重试次数、重试间隔时间吗?
可以。控制重试次数:3.4.9 之前是使用 subscriptionGroupConfig 消费组配置
retryMaxTimes
3.4.9 之后是客户端指定(requestHeader.getMaxReconsumeTimes())这里可以通过Consumer#setMaxReconsumeTimes(最大次数)
来设置值并发模式默认 16 次重试的间隔时间:默认情况下,都是 Broker 端来控制的重试间隔时间,间隔时间是用延迟消息来实现的,比如 Broker 端的延迟级别为
3+重试次数
; 默认情况下第一次重试对应的等级 3 的时间间隔为:10s;
想要自定义重试的间隔时间的话,那么就需要自己在消费的时候来处理了,比如
批量消费消息,能否自己控制重试的起始偏移量?比如 10 条消息,第 5 条失败了,那么只重试第 5 条和后面的所有。
可以但是目前仅限于返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 的情况。如果是返回的 ConsumeConcurrentlyStatus.RECONSUME_LATER,则整批的消息都会重试。具体,请看下面的代码
笔者认为,这里应该同样支持 消费失败(RECONSUME_LATER)的情况,来允许用户控制从哪个消息开始才需要重试。
重试的消息是如何被重新消费的?
需要重试的消息,会将消息写入到 %RETRY%{consumerGroup} 重试队列中,等延迟时间一到,客户端会重新消费这些消息。如果超出重试次数,则会放入到死信队列 %DLQ%{consumerGroup}中。不会再重试
在这里插入图片描述
如果关闭了 broker 的写权限,对消息消费的重试有没影响?
答: 有影响。
消费重试的机制是,先往 Broker 发回重试消息,如果你把写权限关闭了,那么这个流程就阻塞了,就会在本地客户端一直重试, 无限次数的延迟 5s 进行消费。当然,如果一直本地重试的话,这个 Msg 就会成功消费的一个阻塞点,所有它后面的 Offset 就算被消费了,也提交不了。所以关闭 Broker 写权限还是需要慎重。
更详细请看:Rocketmq并发消费失败重试机制
版权声明: 本文为 InfoQ 作者【石臻臻的杂货铺】的原创文章。
原文链接:【http://xie.infoq.cn/article/14ccd09bd66b7aee7ae1011c7】。未经作者许可,禁止转载。
评论