写点什么

RocketMQ 高级使用

  • 2025-05-16
    福建
  • 本文字数:7784 字

    阅读完需:约 26 分钟

消息存储


分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。



  1. 消息生成者发送消息

  2. MQ 收到消息,将消息进行持久化,在存储中新增一条记录

  3. 返回 ACK 给生产者

  4. MQ push 消息给对应的消费者,然后等待消费者返回 ACK

  5. 如果消息消费者在指定时间内成功返回 ack,那么 MQ 认为消息消费成功,在存储中删除消息,即执行第 6 步;如果 MQ 在指定时间内没有收到 ACK,则认为消息消费失败,会尝试重新 push 消息,重复执行 4、5、6 步骤

  6. MQ 删除消息


存储介质


  • 关系型数据库 DB

Apache 下开源的另外一款 MQ—ActiveMQ(默认采用的 KahaDB 做消息存储)可选用 JDBC 的方式来做消息持久化,通过简单的 xml 配置信息即可实现 JDBC 消息存储。由于,普通关系型数据库(如 Mysql)在单表数据量达到千万级别的情况下,其 IO 读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖 DB,如果一旦 DB 出现故障,则 MQ 的消息就无法落盘存储会导致线上故障



  • 文件系统

目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署 MQ 机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。



性能对比


文件系统>关系型数据库 DB


消息的存储和发送


1)消息存储

磁盘如果使用得当,磁盘的速度完全可以匹配上网络 的数据传输速度。目前的高性能磁盘,顺序写速度可以达到 600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概 100KB/s,和顺序写的性能相差 6000 倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ 的消息用顺序写,保证了消息存储的速度。


2)消息发送

Linux 操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。


一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:

  1. read;读取本地文件内容;

  2. write;将读取的内容通过网络发送出去。


这两个看似简单的操作,实际进行了 4 次数据复制,分别是:

  1. 从磁盘复制数据到内核态内存;

  2. 从内核态内存复 制到用户态内存;

  3. 然后从用户态 内存复制到网络驱动的内核态内存;

  4. 最后是从网络驱动的内核态内存复 制到网卡中进行传输。


通过使用 mmap 的方式,可以省去向用户态的内存复制,提高速度。这种机制在 Java 中是通过 MappedByteBuffer 实现的


RocketMQ 充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。


这里需要注意的是,采用 MappedByteBuffer 这种内存映射的方式有几个限制,其中之一是一次只能映射 1.5~2G 的文件至用户态的虚拟内存,这也是为何 RocketMQ 默认设置单个 CommitLog 日志数据文件为 1G 的原因了


消息存储结构


RocketMQ 消息的存储是由 ConsumeQueue 和 CommitLog 配合完成 的,消息真正的物理存储文件是 CommitLog,ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个 Topic 下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件。



  • CommitLog:存储消息的元数据

  • ConsumerQueue:存储消息在 CommitLog 的索引

  • IndexFile:为了消息查询提供了一种通过 key 或时间区间来查询消息的方法,这种通过 IndexFile 来查找消息的方法不影响发送与消费消息的主流程


刷盘机制


RocketMQ 的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ 为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过 Producer 写入 RocketMQ 的时 候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。



1)同步刷盘

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的 PAGECACHE 后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。


2)异步刷盘

在返回写成功状态时,消息可能只是被写入了内存的 PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。


3)配置

同步刷盘还是异步刷盘,都是通过 Broker 配置文件里的 flushDiskType 参数设置的,这个参数被配置成 SYNC_FLUSH、ASYNC_FLUSH 中的 一个。


高可用性机制



RocketMQ 分布式集群是通过 Master 和 Slave 的配合达到高可用性的。


Master 和 Slave 的区别:在 Broker 的配置文件中,参数 brokerId 的值为 0 表明这个 Broker 是 Master,大于 0 表明这个 Broker 是 Slave,同时 brokerRole 参数也会说明这个 Broker 是 Master 还是 Slave。


Master 角色的 Broker 支持读和写,Slave 角色的 Broker 仅支持读,也就是 Producer 只能和 Master 角色的 Broker 连接写入消息;Consumer 可以连接 Master 角色的 Broker,也可以连接 Slave 角色的 Broker 来读取消息。


消息消费高可用


在 Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者繁忙的时候,Consumer 会被自动切换到从 Slave 读。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 程序。这就达到了消费端的高可用性。


消息发送高可用


在创建 Topic 的时候,把 Topic 的多个 Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId 的机器组成一个 Broker 组),这样当一个 Broker 组的 Master 不可 用后,其他组的 Master 仍然可用,Producer 仍然可以发送消息。 RocketMQ 目前还不支持把 Slave 自动转成 Master,如果机器资源不足, 需要把 Slave 转成 Master,则要手动停止 Slave 角色的 Broker,更改配置文 件,用新的配置文件启动 Broker。



消息主从复制


如果一个 Broker 组有 Master 和 Slave,消息需要从 Master 复制到 Slave 上,有同步和异步两种复制方式。


1)同步复制

同步复制方式是等 Master 和 Slave 均写 成功后才反馈给客户端写成功状态;

在同步复制方式下,如果 Master 出故障, Slave 上有全部的备份数据,容易恢复,但是同步复制会增大数据写入 延迟,降低系统吞吐量。


2)异步复制

异步复制方式是只要 Master 写成功 即可反馈给客户端写成功状态。

在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果 Master 出了故障,有些数据因为没有被写 入 Slave,有可能会丢失;


3)配置

同步复制和异步复制是通过 Broker 配置文件里的 brokerRole 参数进行设置的,这个参数可以被设置成 ASYNC_MASTER、 SYNC_MASTER、SLAVE 三个值中的一个。


4)总结



实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是 SYNC_FLUSH 方式,由于频繁地触发磁盘写动作,会明显降低 性能。通常情况下,应该把 Master 和 Save 配置成 ASYNC_FLUSH 的刷盘 方式,主从之间配置成 SYNC_MASTER 的复制方式,这样即使有一台 机器出故障,仍然能保证数据不丢,是个不错的选择。


集群 Master 选举机制


基于 raft 协议的过半写入机制



在这里考虑有三个 Broker 节点的情况,即 Broker01 作为主节点,以及 Broker02 和 Broker03 作为从节点。

当 Producer 将消息写入 Broker01 主节点时,Broker01 只需将消息顺利写入 pageCache(页高速缓存)即视为写入成功,该节点会记入写入操作并在后台进行异步持久化。


紧接着,基于 RAFT 协议,Broker01 节点会将消息同步至从节点 Broker02 和 Broker03,并将消息写入他们的 pageCache。只要在 Broker02 和 Broker03 中有一个节点成功写入,整个写入操作即视为成功。这是因为在这三节点系统中,只需要有过半节点(即 2 个节点)写入成功,整个系统即认定消息已成功写入。


假设此时 Broker01 节点发生故障,系统会通过 Leader 选举机制,让 Broker02 或 Broker03 中的一个节点升级为主节点,保证消息能继续被写入。此时只有两个节点,只有当写入消息的操作在这两个节点中全部成功完成,才能被视为成功。这同样体现了"过半写入"的原则。


基于 raft 协议的 Leader 选举机制



这里要讨论 RocketMQ 中的领导选举机制,类似于设计模式中的状态机模式,为了便于理解,我们将其分为三种角色:Follower(跟随者)、Candidate(候选人)、以及 Leader(领导)。


首先关注 Follower 角色,它是节点初始状态,即 Broker 节点在一开始就处于 Follower 状态。在此状态中,节点设有一个随机倒计时,如果 Follower 收到了 Leader 的生命信号(心跳),这个倒计时将被重置,这意味着只要有心跳信息,Follower 状态将一直保持。然而,如果这个随机倒计时结束了,Follower 角色会升级为 Candidate 角色。


作为 Candidate,其主动行为是发起投票寻求成为 Leader,对于接收的投票,如果投票数大于或等于整个集群节点数的一半,它将升级为 Leader 状态,如果小于这个数量,它会降级为 Follower 状态。

Leader 角色则是系统的核心,它的主动行为是发送心跳信号保持其他 Follower 节点的跟随状态,避免他们试图抢夺领导地位。


举例来说,这里有三个 Broker 节点,即 Broker01,Broker02,和 Broker03。他们刚启动,目的是要选举出一个作为 Leader。假设 Broker01 节点首先完成了随机倒计时,它将首先变成 Candidate,并开始发起投票。如果其余两个节点,即 Broker02 和 Broker03 都把票投给了 Broker01,那么 Broker01 将会升级为 Leader,并发送心跳给 Broker02 和 Broker03。一旦 Broker02 和 Broker03 接收到心跳,他们会重置自己的倒计时和状态,一直保持在 Follower 状态。这就是一种状态转换的情况。



接下来,来讨论另一种领导选举的情形。假设在三个 Broker 节点刚启动时,Broker01 和 Broker02 都完成了随机倒计时并升级为 Candidate 状态,并同时发起投票。


在这次投票中,Broker01 投自己一票,Broker02 也投自己一票,而 Broker03 则选择投给 Broker01。那么由于 Broker01 的票数达到了过半(二分之一以上),它将成功升级为 Leader。在升级后,Broker01 将开始发出心跳信号。


另一方面,Broker02 由于未能得到过半的选票(只有自己的一票),将不得不降级为 Follower,然后重新开始新一轮的随机倒计时,等待下一次的领导选举机会。


在这种情况下,虽然 Broker01 和 Broker02 同时成为 Candidate 并发起投票,但由于得票数量的原因,Broker01 最终升级为 Leader,而 Broker02 则只能继续留在 Follower 状态。这也是 RocketMQ 中领导选举机制的一种典型应用。


负载均衡


Producer 负载均衡


Producer 端,每个实例在发消息的时候,默认会轮询所有的 message queue 发送,以达到让消息平均落在不同的 queue 上。而由于 queue 可以散落在不同的 broker,所以消息就发送到不同的 broker 下,如下图:



图中箭头线条上的标号代表顺序,发布方会把第一条消息发送至 Queue 0,然后第二条消息发送至 Queue 1,以此类推。


Consumer 负载均衡


1)集群模式


在集群消费模式下,每条消息只需要投递到订阅这个 topic 的 Consumer Group 下的一个实例即可。RocketMQ 采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条 message queue。

而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照 queue 的数量和实例的数量平均分配 queue 给每个实例。


默认的分配算法是 AllocateMessageQueueAveragely,如下图:



还有另外一种平均的算法是 AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条 queue,只是以环状轮流分 queue 的形式,如下图:



需要注意的是,集群模式下,queue 都是只允许分配只一个实例,这是由于如果多个实例同时消费一个 queue 的消息,由于拉取哪些消息是 consumer 主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个 queue 只分给一个 consumer 实例,一个 consumer 实例可以允许同时分到不同的 queue。


通过增加 consumer 实例去分摊 queue 的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的 queue 将分配到其他实例上继续消费。


但是如果 consumer 实例的数量比 message queue 的总数量还多的话,多出来的 consumer 实例将无法分到 queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让 queue 的总数量大于等于 consumer 的数量。


2)广播模式


由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。


在实现上,其中一个不同就是在 consumer 分配 queue 的时候,所有 consumer 都分到所有的 queue。



消息重试


顺序消息的重试


对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。


无序消息的重试


对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。


无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。


1)重试次数


消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:



如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。


注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。


2)配置方式


消费失败后,重试配置方式


集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

  • 返回 Action.ReconsumeLater (推荐)

  • 返回 Null

  • 抛出异常


public class MessageListenerImpl implements MessageListener {    @Override    public Action consume(Message message, ConsumeContext context) {        //处理消息        doConsumeMessage(message);        //方式1:返回 Action.ReconsumeLater,消息将重试        return Action.ReconsumeLater;        //方式2:返回 null,消息将重试        return null;        //方式3:直接抛出异常, 消息将重试        throw new RuntimeException("Consumer Message exceotion");    }}
复制代码


消费失败后,不重试配置方式


集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回 Action.CommitMessage,此后这条消息将不会再重试。


public class MessageListenerImpl implements MessageListener {    @Override    public Action consume(Message message, ConsumeContext context) {        try {            doConsumeMessage(message);        } catch (Throwable e) {            //捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;            return Action.CommitMessage;        }        //消息处理正常,直接返回 Action.CommitMessage;        return Action.CommitMessage;    }}
复制代码


自定义消息最大重试次数


消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:

  • 最大重试次数小于等于 16 次,则重试时间间隔同上表描述。

  • 最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。


Properties properties = new Properties();//配置对应 Group ID 的最大消息重试次数为 20 次properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");Consumer consumer =ONSFactory.createConsumer(properties);
复制代码


注意:

  • 消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。

  • 如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。

  • 配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置


获取消息重试次数


消费者收到消息后,可按照如下方式获取消息的重试次数:


public class MessageListenerImpl implements MessageListener {    @Override    public Action consume(Message message, ConsumeContext context) {        //获取消息的重试次数        System.out.println(message.getReconsumeTimes());        return Action.CommitMessage;    }}
复制代码


死信队列


当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。


在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。


死信特性


死信消息具有以下特性

  • 不会再被消费者正常消费。

  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。


死信队列具有以下特性:

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。

  • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。

  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。


查看死信信息


1、在控制台查询出现死信队列的主题信息



2、在消息界面根据主题查询死信消息



3、选择重新发送消息


一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。


消费幂等


消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。


消费幂等的必要性


在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:

  • 发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。


  • 投递时消息重复

消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。


  • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)

当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。


处理方式


因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置:


Message message = new Message();message.setKey("ORDERID_100");SendResult sendResult = producer.send(message);
复制代码


订阅方收到消息时可以根据消息的 Key 进行幂等处理:


consumer.subscribe("ons_test", "*", new MessageListener() {    public Action consume(Message message, ConsumeContext context) {        String key = message.getKey()        // 根据业务唯一标识的 key 做幂等处理    }});
复制代码


文章转载自:Seven

原文链接:https://www.cnblogs.com/seven97-top/p/18868809

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
RocketMQ高级使用_RocketMQ_量贩潮汐·WholesaleTide_InfoQ写作社区