写点什么

深度好文!RocketMQ 高级进阶知识精讲!

发布于: 2021 年 03 月 31 日

前言

大家好,我是 jack xu,本文是 RockeMQ 精讲系列的最后一篇,讲的是 RockeMQ 一些进阶高级的知识,在我们平时的面试中会用到,掌握了这些东西也是体现一个高手和 crud boy 的区别。

第一讲:《RocketMQ高可用架构及二主二从异步集群部署》

第二讲:《RocketMQ扫盲贴及Java API使用精讲》

本文使用到的源码:github.com/xuhaoj/rock…

官方文档的翻译:www.itmuch.com/books/rocke…

为了使大家能够清晰明了,有层次的掌握这些知识,我们从生产者、Broker、消费者三个维度来讲解。

生产者

消息发送规则

在 RocketMQ 中,是基于多个 Message Queue 来实现类似于 kafka 的分区效果。如果一个 Topic 要发送和接收的数据量非常大,需要能支持增加并行处理的机器来提高处理速度,这时候一个 Topic 可以根据需求设置一个或多个 Message Queue。Topic 有了多个 Message Queue 后,消息可以并行地向各个 Message Queue 发送,消费者也可以并行地从多个 Message Queue 读取消息并消费。

那么一个消息会发送到哪个 Message Queue 上呢,这个就需要我们的路由分发策略了。在 Send 的众多重载方法中,有这样一个参数 MessageQueueSelector。

RocketMQ 中已经帮我们实现了三个实现类:

  • SelectMessageQueueByHash(默认):它是一种不断自增、轮询的方式。

  • SelectMessageQueueByRandom:随机选择一个队列。

  • SelectMessageQueueByMachineRoom:返回空,没有实现。

如果上面这几个不能满足我们的需求,还可以自定义 MessageQueueSelector,作为参数传进去:

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {               Integer id = (Integer) arg;               int index = id % mqs.size();               return mqs.get(index);       }}, orderId);复制代码
复制代码

源码在 example/ordermessage/Producer.java

顺序消息

一道很经典的面试题,如何保证消息的有序性?思路是,需要保证顺序的消息要发送到同一个 message queue 中。其次,一个 message queue 只能被一个消费者消费,这点是由消息队列的分配机制来保证的。最后,一个消费者内部对一个 mq 的消费要保证是有序的。我们要做到生产者 - message queue - 消费者之间是一对一对一的关系。

具体操作过程如下:

  1. 生产者发送消息的时候,到达 Broker 应该是有序的。所以对于生产者,不能使用多线程异步发送,而是顺序发送。

  2. 写入 Broker 的时候,应该是顺序写入的。也就是相同主题的消息应该集中写入,选择同一个 Message Queue,而不是分散写入。

要达到这个效果很简单,只需要我们在发送的时候传入相同的 hashKey,就会选择同一个队列。


3. 消费者消费的时候只能有一个线程,否则由于消费的速率不同,有可能出现记录到数据库的时候无序。在 Spring Boot 中,consumeMode 设置为 ORDERLY,在 Java API 中,传入 MessageListenerOrderly 的实现类即可。

consumer.registerMessageListener(new MessageListenerOrderly() {复制代码
复制代码

当然顺序消费会带来一些问题:

  1. 遇到消息失败的消息,无法跳过,当前队列消费暂停

  2. 降低了消息处理的性能

事务消息

分布式事务有很多种解决方案,其中一种就是使用 RocketMQ 的事务消息来达到最终一致性。下面我们来看下 RocketMQ 是怎么实现的。下面是 RocketMQ 官网的一张流程图,我们对照着图来分析讲解一下。rocketmq.apache.org/rocketmq/th…



  1. 生产者向 RocketMQ 服务端发送半消息,什么叫半消息呢,就是暂不能投递消费者的消息,发送方已经将消息成功发送到了 MQ 服务端,此时消息被标记为暂不能投递状态,需要等待生产者对该消息的二次确认

  2. MQ 服务端给生产者发送 ack,告诉生产者半消息已经成功收到了。

  3. 发送方开始执行本地数据库事务的逻辑。

  4. 执行完成以后将结果告诉 MQ 服务端,本地事务执行成功就告诉 commint,MQ Server 收到 commit 后则将半消息状态置为可投递,consumer 最终将收到该消息;本地事务执行失败则发送 rollback,MQ Server 收到 rollback 以后则删除半消息,订阅费将不会收到该条消息。

  5. 未收到第 4 步的确认信息时,回查事务状态。消息回查: 因为网络闪断、生产者重启等原因,RocketMQ 的发送方会提供一个反查事务状态接口,如果一段时间内半消息没有收到任何操作请求,那么 Broker 会通过反查接口得知发送方事务是否执行成功。

  6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  7. 发送方根据检查本地事务的最终状态再次提交二次确认,发送 commit 或者 rollback。

上述就是整个事务消息的执行流程,下面我们来看下如何在代码中操作。RocketMQ 中提供了一个 TransactionListener 接口,我们需要实现它,然后在 executeLocalTransaction 方法中实现执行本地事务逻辑。

    @Override    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {        //local transaction process,return rollback,commit or unknow        log.info("executeLocalTransaction:"+JSON.toJSONString(msg));        return LocalTransactionState.UNKNOW;    }复制代码
复制代码

这个方法必须返回一个状态,rollback,commit 或者 unknow,返回 unknow 之后,因为不确定到底事务有没有成功,Broker 会主动发起对事务执行结果的查询,所以还要再实现一个 checkLocalTransaction 回查方法。

    @Override    public LocalTransactionState checkLocalTransaction(MessageExt msg) {       log.info("checkLocalTransaction:"+JSON.toJSONString(msg));       return LocalTransactionState.COMMIT_MESSAGE;    }复制代码
复制代码

默认回查总次数是 15 次,第一次回查的间隔是 6s,后续每次间隔 60s。最后在生产者发送的时候指定下事务监听器即可。

源码在 example/transaction/TransactionProducer.java

延迟消息

很多时候,我们村会在这样的业务场景:在一段时间之后,完成一个工作任务的需求,例如:滴滴打车订单完成之后,如果用户一直不评价,48 小时会将自动评价为 5 星;外卖下单 30 分钟不支付自动取消等等。这种问题的解决方案有很多种,其中一种就是用 RocketMQ 的延迟队列来实现,但是开源版本功能被阉割了,只能支持特定等级的消息,商业版可以任意指定时间。

   msg.setDelayTimeLevel(2); // 5秒钟复制代码
复制代码

比如 leve=2 代表 5 秒,一共支持 18 个等级,延迟的级别配置在代码 MessageStoreConfig 中:

  private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";复制代码
复制代码

Spring Boot 中这样使用

  rocketMQTemplate.syncSend(topic,message,1000,2);// 5秒钟复制代码
复制代码

源码在 example/delay/DelayProducer.java

Broker

物理存储

我们进入到 RocketMQ 存储的文件夹看一下,这个目录是我们在安装的时候指定的。


下面依次介绍下这几个文件夹的作用:

  1. checkpoint:文件检查点,存储 commitlog、consumequeue、indexfile 最后一次刷盘时间或时间戳。

  2. commitlog:消息存储目录,一个文件集合,每个默认文件 1G 大小,当第一个文件写满了,第二个文件会以初始量命名。比如起始偏移量是 1073741824,第二个文件名为 00000000001073741824,以此类推。



  1. config:运行时的配置信息,包含主题消息过滤信息、集群消费模式消息消费进度、延迟消息队列拉取进度、消息消费组配置信息、topic 配置属性等。

  2. consumequeue:消息消费队列存储目录,我们可以看到在 consumequeue 文件夹下是按 topic 的名字建文件夹,在每一个 topic 下面又是按 message queue 的编号建文件夹,在每个 message queue 文件夹下就是存放消息在 commit log 的偏移量以及大小和 Tag 属性。


5. index:消息索引文件存储目录,在前面使用 java api 发送消息的时候,我们看到会传入一个 keys 的参数,它是用来检索消息的。所以如果出现 keys,服务端就会创建索引文件,以空格分割的每个关键字都会产生一个索引。单个 IndexFile 可以保存 2000W 个索引,文件固定大小约为 400M。索引使用的是哈希索引,所以 key 尽量设置为唯一不重复。

存储理念

我们来看下 RocketMQ 官网的说明,rocketmq.apache.org/rocketmq/ho… ,我们来导读一下,首先是说 kafka 为什么不能支持更多的分区,然后说在 RocketMQ 中我们是如何支持更多分区的。



  1. 每个分区存储整个消息数据。虽然每个分区被有序地写入磁盘,但随着并发写入分区数量的增加,从操作系统的角度来看,写入变得随机。

  2. 由于数据文件分散,难以使用 Linux IO Group Commit 机制。

所以 RocketMQ 干脆另辟蹊径,设计了一种新的文件文件存储方式,就是所有 topic 的所有消息全部写在同一个文件中,这样就能够保证绝对的顺序写。当然消费的时候就复杂了,要到一个巨大的 commitlog 中去查找消息,我们不可能遍历所有消息吧,这样效率太慢了。

那怎么办呢?这个就是上面提到的 consume queue,它把 consume group 消费的 topic 的最后消费到的 offset 存储在里面。当我们消费的时候,先从 consume queue 读取持久化消息的起始物理位置偏移量 offset、大小 size 和消息 tag 的 hashcode 值,随后再从 commitlog 中进行读取待拉取消费消息的真正实体内容部分。

consume queue 可以理解为消息的索引,它里面没有消息,当然这样的存储理念也不是十全十美,对于 commitlog 来说,写的时候虽然是顺序写,但是读的时候却变成了完全的随机读;读一条消息先会读 consume queue,再读 commit log,这样增加了开销。

文件清理策略

跟 kalka 一样,commit log 的内容在消费之后是不会删除,这样做有两个好处,一个是可以被多个 consumer group 重复消费,只要修改 consumer group,就可以从头开始消费,每个 consumer group 维护自己的 offset;另一个是支持消息回溯,随时可以搜索。

但是如果不清理文件的话,文件数量不断地增加,最终会导致磁盘可用空间越来越少,所以 RocketMQ 会将 commitLog、consume queue 这些过期文件进行删除,默认是超过 72 个小时的文件。这里会启动两个线程去跑。

    private void cleanFilesPeriodically() {        this.cleanCommitLogService.run();        this.cleanConsumeQueueService.run();    }复制代码
复制代码

过期文件选出来以后,什么时候去清理呢,有两种情况。一种是通过定时任务,每天凌晨四点去删除这些文件。第二种是磁盘使用空间超过 75% 了,这时候已经火烧眉毛了,我还等到你四点干嘛,立即马上就清理了。

如果情况更严重,如果磁盘空间使用率超过 85%,会开始批量清理文件,不管有没有过期,直到空间充足;如果磁盘使用率超过 90%,会拒绝消息写入。

零拷贝

大家都知道 RocketMQ 的消息是存储在磁盘上的,但是怎么还能做到这么低的延迟和这么高的吞吐量,其中的一个奥秘就是使用到了零拷贝技术。

首先和大家介绍一下 page Cache 的概念,这个是操作系统层面的,CPU 如果要读取或者操作磁盘上的数据,必须要把磁盘的数据加载到内存中,这个加载的大小有一个固定的单位,叫做 Page。x86 的 linux 中一个标准的页大小是 4kb。如果要提升磁盘的访问速度,或者说减少磁盘的 IO,可以把访问过的 Page 在内存中缓存起来,这个内存的区域就叫做 Page Cache。

下次处理 IO 请求的时候,先到 Page Cache 中查找,找到了就直接操作,没找到再到磁盘中去找。当然 Page Cache 本身也会对数据进行预读,对于每个文件的第一个读请求操作,系统也会将所请求的页的相邻后几个页一起读出来。但是这里还有个问题,我们知道虚拟内存分为内核空间和用户空间,Page Cache 属于内核空间,用户空间访问不了,还需要从内核空间拷贝到用户空间缓冲区,这个 copy 的过程就降低了数据访问的速度。

为了解决这个问题,就产生了零拷贝技术,干脆把 Page Cache 的数据在用户空间中做一个地址映射,这样用户进行就可以通过指针操作直接读写 Page Cache,不再需要系统调用(例如 read())和内存拷贝。RocketMQ 中具体的实现是使用 mmap(memory map,内存映射),而 kafka 用的是 sendfile。


消费者

消费端的负载均衡与 rebalance

和 kafka 一样,消费端也会针对 Message Queue 做负载均衡,使得每个消费者能够合理的消费多个分区的消息。消费者挂了,消费者增加,这时候就会用到我们的 rebalance。

在 RebalanceImpl.class 的 277 行有 rebalance 的策略

      AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null; try { allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e); return; }复制代码
复制代码

AllocateMessageQueueStrategy 有6种实现的策略,也可以自定义实现,在消费者端指定即可。

consumer.setAllocateMessageQueueStrategy();复制代码
复制代码
  • AllocateMessageQueueAveragely:平均分配算法(默认)



  • AllocateMessageQueueAveragelyByCircle:环状分配消息队列



  • AllocateMessageQueueByConfig:按照配置来分配队列,根据用户指定的配置来进行负载

  • AllocateMessageQueueByMachineRoom:按照指定机房来配置队列

  • AllocateMachineRoomNearby:按照就近机房来配置队列

  • AllocateMessageQueueConsistentHash:一致性 hash,根据消费者的 cid 进行

队列的数量尽量要大于消费者的数量。

重试与死信队列

在消费者端如果出现异常,比如数据库不可用、网络出现问题、中途断电等等,这时候返回给 Broker 的是 RECONSUME_LATER,表示稍后重试。这个时候消息会发回到 Broker,进入到 RocketMQ 的重试队列中。服务端会为 consumer group 创建一个名字为 %RETRY%开头的重试队列。


重试队列过一段时间后再次投递到这个 ConsumerGroup,如果还是异常,会再次进入到重试队列。重试的时间间隔会不断衰减,从 10 秒开始直到 2 个小时:10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,最多重试 16 次。

而如果一直这样重复消费都持续失败到一定次数(默认 16 次),就会投递到 DLQ 死信队列。Broker 会创建一个死信队列,死信队列的名字是%DLQ%+ConsumerGroupName,应用可以监控死信队列来做人工干预。一般情况下我们在实际生产中是不需要重试 16 次,这样既浪费时间又浪费性能,理论上当尝试重复次数达到我们想要的结果时如果还是消费失败,那么我们需要将对应的消息进行记录,并且结束重复尝试。

源码在 jackxu/SimpleConsumer.java

MQ 选型分析

下面列出市面上常见的三种 MQ 的分析对比,供大家在项目中实际使用的时候参考对比:

好,RocketMQ 系列到这里就结束了,感谢大家的观看~

作者:jack_xu

链接:https://juejin.cn/post/6944894142652612638

来源:掘金


用户头像

还未添加个人签名 2021.03.15 加入

还未添加个人简介

评论

发布
暂无评论
深度好文!RocketMQ高级进阶知识精讲!