写点什么

RocketMQ 消息重试机制、死信队列

  • 2023-02-26
    湖南
  • 本文字数:4868 字

    阅读完需:约 16 分钟

消息队列中的消息消费时并不能保证总是成功的,那失败的消息该怎么进行消息补偿呢?这就用到今天的主角消息重试和死信队列了。

生产者消息重试

有时因为网路等原因生产者也可能发送消息失败,也会进行消息重试,生产者消息重试比较简单,在 springboot 中只要在配置文件中配置一下就可以了。

# 异步消息发送失败重试次数,默认为2rocketmq.producer.retry-times-when-send-async-failed=2# 消息发送失败重试次数,默认为2rocketmq.producer.retry-times-when-send-failed=2
复制代码


也可以通过下面这种方式配置

DefaultMQProducer defaultMQProducer = new DefaultMQProducer();defaultMQProducer.setRetryTimesWhenSendFailed(2);defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2);
复制代码

消费者消息重试

Apache RocketMQ 有两种消费模式:集群消费模式和广播消费模式。消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

同时 RocketMq Push 消费提供了两种消费方式:并发消费和顺序消费。

并发消费

在并发消费中,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证消息在同一个队列中按照 FIFO 的顺序,也无法保证消息实际被顺序消费,所有并发消费也可以称之为无序消费。

顺序消费

顺序消费是消息生产者发送过来的消息会遵循 FIFO 队列的思想,先进先出有顺序的消费消息。 对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

并发消费和顺序消费区别

顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。


并发消费失败后并不是投递回原 Topic,而是投递到一个特殊 Topic,其命名为 %RETRY%ConsumerGroupName,集群模式下并发消费每一个 ConsumerGroup 会对应一个特殊 Topic,并会订阅该 Topic。


并发消费在并发消费中,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证消息在同一个队列中按照 FIFO 的顺序,也无法保证消息实际被顺序消费,所有并发消费也可以称之为无序消费。


两者参数差别如下:

并发消费重试间隔如下:

死信队列

当一条消息初次消费失败,RocketMQ 会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。


死信队列是死信 Topic 下分区数唯一的单独队列。如果产生了死信消息,那对应的 ConsumerGroup 的死信 Topic 名称为 %DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。可以利用 RocketMQ Admin 工具或者 RocketMQ Dashboard 上查询到对应死信消息的信息。

实践出真知

Talk is cheap,show you the code.

公共部分创建

1.配置文件

rocketmq.name-server=localhost:9876# 消费者组rocketmq.producer.group=producer_group
rocketmq.consumer.topic=consumer_topicrocketmq.consumer.group=consumer_group
复制代码

2.创建消费者 RetryConsumerDemo

@Componentpublic class RetryConsumerDemo {
@Value("${rocketmq.name-server}") private String namesrvAddr;
@Value("${rocketmq.consumer.topic}") private String topic;
@Value("${rocketmq.consumer.group}") private String consumerGroup;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
@PostConstruct public void start() { try { consumer.setNamesrvAddr(namesrvAddr); //设置集群消费模式 consumer.setMessageModel(MessageModel.CLUSTERING);
//设置消费超时时间(分钟) consumer.setConsumeTimeout(1);
//订阅主题 consumer.subscribe(topic , "*");
//注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());
//最大重试次数 consumer.setMaxReconsumeTimes(2);
//启动消费端 consumer.start(); System.out.println("Retry Consumer Start..."); } catch (MQClientException e) { e.printStackTrace(); } }}
复制代码

测试并发消费

1.创建并发消费监听类

并发消费监听类要实现 MessageListenerConcurrently 类

public class MessageListenerConcurrentlyImpl implements MessageListenerConcurrently {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
MessageExt message = msgs.get(0); try { final LocalDateTime now = LocalDateTime.now(); //逐条消费 String messageBody = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("当前时间:"+now+", messageId: " + message.getMsgId() + ",topic: " + message.getTopic() + ",messageBody: " + messageBody);
//模拟消费失败 if ("Concurrently_test".equals(messageBody)) { int a = 1 / 0; }
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }}
复制代码

2.注册监听类 在消费者类 RetryConsumerDemo 中注册监听类

//注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());
复制代码

3.测试

@RunWith(SpringRunner.class)@SpringBootTest(classes = RocketmqApplication.class)class RocketmqApplicationTests {
@Value("${rocketmq.consumer.topic}") private String topic; @Autowired private RocketMQTemplate rocketMQTemplate;
@Test public void testProducer(){ String msg = "Concurrently_test"; rocketMQTemplate.convertAndSend(topic , msg); }}
复制代码

4.测试结果:

后面重试时间太长就不做测试了,可以看到并发消费的消息时间都是按照上面那张时间间隔表来。


然后通过 RocketMq Dashboard Topic 一栏可以看到有一个重试消费者组 %RETRY%consumer_group,这个消费者组内存放的就是 consumer_group 消费者组消费失败重试的消息。


并发消费的重试次数是可以修改的,重试次数对应参数 DefaultMQPushConsumer 类的 maxReconsumeTimes 属性,maxReconsumeTimes 默认是-1,也就是默认会重试 16 次;0 代表不重试,只要失败就会放入死信队列;1-16 重试次数对应着上面时间间隔表中对应次数。配置的最大重试次数超过 16 就按 16 处理。

并发消费状态

并发消费有两个状态 CONSUME_SUCCESS 和 RECONSUME_LATER。返回 CONSUME_SUCCESS 代表着消费成功,返回 RECONSUME_LATER 代表进行消息重试。

public enum ConsumeConcurrentlyStatus {    /**     * Success consumption     */    CONSUME_SUCCESS,    /**     * Failure consumption,later try to consume     */    RECONSUME_LATER;}
复制代码

当 MessageListenerConcurrently 接口的 consumeMessage 方法返回 ConsumeConcurrentlyStatus#RECONSUME_LATER、null 或者方法抛异常了,都会进行消息重试。当然还是推荐返回 ConsumeConcurrentlyStatus#RECONSUME_LATER。

测试顺序消费

顺序消费和并行消费其实都差不多的,只不过顺序消费实现的是 MessageListenerOrderly 接口

1.创建顺序消费监听类

public class MessageListenerOrderlyImpl implements MessageListenerOrderly {
@Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { if (CollectionUtils.isEmpty(msgs)) { return ConsumeOrderlyStatus.SUCCESS; }
MessageExt message = msgs.get(0); try { final LocalDateTime now = LocalDateTime.now(); //逐条消费 String messageBody = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("当前时间:"+now+", messageId: " + message.getMsgId() + ",topic: " + message.getTopic() + ",messageBody: " + messageBody);
//模拟消费失败 if ("Orderly_test".equals(messageBody)) { int a = 1 / 0; }
return ConsumeOrderlyStatus.SUCCESS; } catch (Exception e) { return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } }}
复制代码

2.注册监听类

//最大重试次数consumer.setMaxReconsumeTimes(2);//顺序消费 重试时间间隔consumer.setSuspendCurrentQueueTimeMillis(2000);
复制代码

SuspendCurrentQueueTimeMillis 表示重试的时间间隔,默认是 1s,这里修改成 2s

3.测试

@RunWith(SpringRunner.class)@SpringBootTest(classes = RocketmqApplication.class)class RocketmqApplicationTests {
@Value("${rocketmq.consumer.topic}") private String topic; @Autowired private RocketMQTemplate rocketMQTemplate;
@Test public void testProducer(){ String msg = "Orderly_test"; rocketMQTemplate.convertAndSend(topic , msg); }}
复制代码

测试结果:


可以看到三条结果,第一条是第一次消费的,其余两条是隔了 2s 重试的。重试 2 次之后这条数据就进入了死信队列。

顺序消费状态

顺序消费目前也是两个状态:SUCCESS 和 SUSPEND_CURRENT_QUEUE_A_MOMENT。SUSPEND_CURRENT_QUEUE_A_MOMENT 意思是先暂停消费一下,过 SuspendCurrentQueueTimeMillis 时间间隔后再重试一下,而不是放到重试队列里。

public enum ConsumeOrderlyStatus {    /**     * Success consumption     */    SUCCESS,    /**     * Rollback consumption(only for binlog consumption)     */    @Deprecated    ROLLBACK,    /**     * Commit offset(only for binlog consumption)     */    @Deprecated    COMMIT,    /**     * Suspend current queue a moment     */    SUSPEND_CURRENT_QUEUE_A_MOMENT;}
复制代码

测试死信队列

并发消费和顺序消费达到了最大重试次数之后就会放到死信队列。死信队列在一开始是不会被创建的,只有需要的时候才会被创建。就拿上面测试结果来看,进入到的死信队列就是 %DLQ%consumer_group,进入死信队列的消息要收到处理。


死信队列特性

  • 不会再被消费者正常消费。

  • 一个死信队列对应一个分组, 而不是对应单个消费者实例。

  • 如果一个消费者组未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。

  • 一个死信队列包含了对应 分组产生的所有死信消息,不论该消息属于哪个 Topic。

  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理


作者:索码理

链接:https://juejin.cn/post/7151571345199874084

来源:稀土掘金

用户头像

还未添加个人签名 2021-07-28 加入

公众号:该用户快成仙了

评论

发布
暂无评论
RocketMQ 消息重试机制、死信队列_Java_做梦都在改BUG_InfoQ写作社区