写点什么

RocketMQ 消息丢失场景及解决办法 (1)

作者:Java高工P7
  • 2021 年 11 月 11 日
  • 本文字数:1608 字

    阅读完需:约 5 分钟

②如果消息已经被刷入了磁盘中,但是数据没有做任何备份,一旦磁盘损坏,那么消息也会丢失 3. 消费者成功从 RocketMQ 中获取到了消息,还没有将消息完全消费完的时候,就通知 RocketMQ 我已经将消息消费了,然后消费者宕机,但是 RocketMQ 认为消费者已经成功消费了数据,所以数据依旧丢失了


那么如何保证消息的零丢失呢?



  1. 场景 1 中保证消息不丢失的方案是使用 RocketMQ 自带的事务机制来发送消息,大致流程为①首先生产者发送 half 消息到 RocketMQ 中,此时消费者是无法消费 half 消息的,若 half 消息就发送失败了,则执行相应的回滚逻辑②half 消息发送成功之后,且 RocketMQ 返回成功响应,则执行生产者的核心链路③如果生产者自己的核心链路执行失败,则回滚,并通知 RocketMQ 删除 half 消息④如果生产者的核心链路执行成功,则通知 RocketMQ commit half 消息,让消费者可以消费这条数据其中还有一些 RocketMQ 长时间没有收到生产者是要 commit/rollback 操作的响应,回调生产者接口的细节在使用了 RocketMQ 事务将生产者的消息成功发送给 RocketMQ,就可以保证在这个阶段消息不会丢失

  2. 在场景 2 中要保证消息不丢失,首先需要将 os cache 的异步刷盘策略改为同步刷盘,这一步需要修改 Broker 的配置文件,将 flushDiskType 改为 SYNC_FLUSH 同步刷盘策略,默认的是 ASYNC_FLUSH 异步刷盘。一旦同步刷盘返回成功,那么就一定保证消息已经持久化到磁盘中了;为了保证磁盘损坏不会丢失数据,我们需要对 RocketMQ 采用主从机构,集群部署,Leader 中的数据在多个 Follower 中都存有备份,防止单点故障。

  3. 在场景 3 中,消息到达了消费者,RocketMQ 在代码中就能保证消息不会丢失


//注册消息监听器处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){


//对消息进行处理 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});


上面这段代码中,RocketMQ 在消费者中注册了一个监听器,当消费者获取到了消息,就会去回调这个监听器函数,去处理里面的消息当你的消息处理完毕之后,才会返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 只有返回了 CONSUME_SUCCESS,消费者才会告诉 RocketMQ 我已经消费完了,此时如果消费者宕机,消息已经处理完了,也就不会丢失消息了如果消费者还没有返回 CONSUME_SUCCESS 时就宕机了,那么 RocketMQ 就会认为你这个消费者节点挂掉了,会自动故障转移,将消息交给消费者组的其他消费者去消费这个消息,保证消息不会丢失


为了保证消息不会丢失,在 consumeMessage 方法中就直接写消息消费的业务逻辑就可以了,如果非要搞一些骚操作,比如下面的代码


//注册消息监听器处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){//开启子线程异步处理消息 new Thread() {public void run() {//对消息进行处理}}.start();


return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});


如果新开子线程异步处理消息的话,就有可能出现消息还没有被消费完,消费者告诉 RocketMQ 消息已经被消费了,结果宕机丢失消息的情况。


使用上面一整套的方案就可以在使用 RocketMQ 时保证消息零丢失,但是性能和吞吐量也将大幅下降


  1. 使用事务机制传输消息,会比普通的消息传输多出很多步骤,耗费性能

  2. 同步刷盘相比异步刷盘,一个是存储在磁盘中,一个存储在内存中,速度完全不是一个数量级

  3. 主从


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


架构的话,需要 Leader 将数据同步给 Follower4. 消费时无法异步消费,只能等待消费完成再通知 RocketMQ 消费完成


消息零丢失是一把双刃剑,要想用好,还是要视具体的业务场景而定,选择合适的方案才是最好的

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
RocketMQ消息丢失场景及解决办法(1)