RocketMQ 原理—源码设计简单分析下
1.Producer 作为生产者是如何创建出来的
(1)NameServer 的启动
NameServer 启动后的核心架构,如下图示:
NameServer 启动后,会有一个 NamesrvController 组件管理控制 NameServer 的所有行为,包括内部会启动一个 Netty 服务器去监听一个 9876 端口号,然后接收处理 Broker 和客户端发送过来的请求。
(2)Broker 的启动
Broker 启动后的核心架构,如下图示:
Broker 启动后,也会有一个 BrokerController 组件管理控制 Broker 的整体行为,包括初始化 Netty 服务器用于接收客户端的网络请求、启动处理请求的线程池、执行定时任务的线程池、初始化核心功能组件,同时还会发送注册请求到 NameServer 去注册自己。
(3)Broker 的注册和心跳
Broker 启动后,会向 NameServer 进行注册和定时发送注册请求作为心跳。NameServer 会有一个后台进程定时检查每个 Broker 的最近一次心跳时间,如果长时间没心跳就认为 Broker 已经故障。如下图示:
(4)通过 Producer 发送消息
假设 RocketMQ 集群已经启动好了 NameServer,而且还启动了一批 Broker,同时 Broker 都已经把自己注册到 NameServer 里去了,NameServer 也会定时检查这批 Broker 是否存活。那么就可以让开发好的业务系统去发送消息到 RocketMQ 集群里,于是需要创建一个 Producer 实例。
实际上我们开发好的系统,最终都需要创建一个 Producer 实例,然后通过 Producer 实例发送消息到 RocketMQ 的 Broker 上去。
下面是使用 Producer 实例发送消息到 RocketMQ 的代码,可以看到 Producer 是如何构造出来的。
构造 Producer 的过程很简单:也就是创建一个 DefaultMQProducer 对象实例。在构造方法中,首先会传入所属的 Producer 分组,然后设置一下 NameServer 的地址,最后调用它的 start()方法启动这个 Producer 即可。
创建 DefaultMQProducer 对象实例是一个非常简单的过程:就是创建出一个对象,然后保存它的 Producer 分组。设置 NameServer 地址也是一个很简单的过程,就是保存一下 NameServer 地址。
所以,最关键的还是调用 DefaultMQProducer 的 start()方法去启动 Producer 这个消息生产者。
2.Producer 启动时是如何准备好相关资源的
(1)DefaultMQProducer 的 start()方法
接下来分析 Producer 在启动时是如何准备好相关资源的。Producer 内部必须要有独立的线程资源,以及需要和 Broker 已经建立好网络连接,这样才能把消息发送出去。
在构造 Producer 时,它内部便会构造一个真正用于执行消息发送逻辑的 DefaultMQProducerImpl 组件。所以,真正的 Producer 生产者其实是这个 DefaultMQProducerImpl 组件。那么这个组件在启动的时都干了什么呢?
其实,上述 start()方法的具体逻辑暂时不需要深入分析,因为其中的逻辑并没有直接与 Producer 发送消息相关联。比如拉取 Topic 的路由数据、选择 MessageQueue、跟 Broker 建立长连接、发送消息到 Broker 等这些核心逻辑,其实都封装在发送消息的方法中。
(2)Producer 在第一次向 Topic 发送消息时才拉取 Topic 的路由数据
假设后续 Producer 要发送消息,那么就要指定往哪个 Topic 发送消息。因此 Producer 需要知道 Topic 的路由数据,比如 Topic 有哪些 MessageQueue,每个 MessageQueue 在哪些 Broker 上。如下图示:
从 start()方法源码可知,在 Producer 启动时,并不会去拉取 Topic 的路由数据。实际上,Producer 在第一次向 Topic 发送消息时,才会去拉取 Topic 的路由数据。包括这个 Topic 有几个 MessageQueue、每个 MessageQueue 在哪个 Broker 上。然后从中选择一个 MessageQueue,接着与对应的 Broker 建立网络连接,最后才把消息发送过去。
(3)Producer 在第一次向 Broker 发送消息时才与 Broker 建立网络连接
从 start()方法源码可知,在 Producer 启动时,并不会和所有 Broker 建立网络连接。很多核心的逻辑,包括拉取 Topic 路由数据、选择 MessageQueue、和 Broker 建立网络连接等,都是在 Producer 第一次发送消息时才进行处理的。
3.Producer 是如何从拉取 Topic 元数据的
(1)Producer 发送消息的方法
当调用 Producer 的 send()方法发送消息时,最终会调用到 DefaultMQProducerImpl 的 sendDefaultImpl()方法。
在 sendDefaultImpl()方法里,开始会有一行非常关键的代码,如下所示:
该行代码的意思是,每次 Producer 发送消息时,都会先检查一下要发送消息的那个 Topic 的路由数据是否在本地。如果不在,才会发送请求到 NameServer 去拉取 Topic 的路由数据,然后缓存在本地。
(2)Producer 拉取 Topic 路由数据的过程
进入 tryToFindTopicPublishInfo()方法,会发现其逻辑非常简单:就是会先检查一下自己本地是否有这个 Topic 的路由数据的缓存,如果没有就发送网络请求到 NameServer 去拉取,如果有就直接返回本地 Topic 路由数据缓存,如下图示:
那么 Producer 是如何发送网络请求到 NameServer 去拉取 Topic 路由数据的呢?这其实就对应了 tryToFindTopicPublishInfo()方法内的一行代码,如下所示:
通过以下这行代码,Producer 就可以从 NameServer 拉取某个 Topic 的路由数据,然后更新到自己本地的缓存里去。
Producer 发送请求到 NameServer 的拉取 Topic 路由数据的过程:首先封装一个 Request 请求对象,然后通过 Netty 客户端发送请求到 NameServer,接着会接收到 NameServer 返回的一个 Response 响应对象,于是就可以从 Response 响应对象里取出所需的 Topic 路由数据并更新到自己本地缓存里。更新时会做一些判断,比如 Topic 路由数据是否有改变过等,然后把 Topic 路由数据放入本地缓存。如下图示:
4.Producer 是如何选择 MessageQueue 的
(1)Topic 是由多个 MessageQueue 组成的
Producer 发送消息时,会先检查一下要发送消息的 Topic 的路由数据是否在本地缓存。如果不在,就会通过底层的 Netty 网络通信模块发送一个请求到 NameServer 拉取 Topic 路由数据,然后缓存在 Producer 本地。当 Producer 拿到一个 Topic 的路由数据后,就应该选择要发送消息到这个 Topic 的哪一个 MessageQueue 上了。
因为 Topic 是一个逻辑上的概念,一个 Topic 的数据往往会分布式存储在多台 Broker 机器上,所以 Topic 本质是由多个 MessageQueue 组成的。
每个 MessageQueue 都可以在不同的 Broker 机器上,当然也可能一个 Topic 的多个 MessageQueue 在一个 Broker 机器上。如下图示:
只要 Producer 知道了要发送消息到哪个 MessageQueue 上去,其实就已经知道了这个 MessageQueue 在哪台 Broker 机器上,接着和该 Broker 机器建立连接,发送消息过去即可。
(2)选择 MessageQueue 的源码和算法
发送消息的源码在 DefaultMQProducerImpl 的 sendDefaultImpl()方法中。该方法里只要 Producer 获取到 Topic 的路由数据,不管从本地缓存获取还是从 NameServer 拉取,就会执行下面的代码:
selectOneMessageQueue()方法其实就是在选择 Topic 中的一个 MessageQueue,然后发送消息到这个 MessageQueue。下面是选择 MessageQueue 的算法:
这是一种简单的负载均衡算法:首先获取一个自增长的 Index,接着就用这个 Index 对 Topic 的 MessageQueue 列表进行取模运算,从而获取到一个 MessageQueue 列表的位置,最后返回这个位置的 MessageQueue。
但是如果某个 Broker 故障了,那么就不能把消息发送到故障 Broker 的 MessageQueue 了。所以 selectOneMessageQueue()方法里还有其他代码,用来实现 Broker 故障时的自动回避机制。
5.Producer 与 Broker 是如何进行网络通信的
(1)Producer 是如何把消息发送给 Broker 的
在 DefaultMQProducerImpl.sendDefaultImpl()方法中,会先获取到 MessageQueue 所在的 Broker 名称,如下所示:
获取到这个 BrokerName 后,就会调用 sendKernelImpl()方法把消息发送到 Broker 上。
在 sendKernelImpl()方法中:
首先会通过 BrokerName 去本地缓存查找它的实际地址。如果找不到,就到 NameServer 中拉取 Topic 的路由数据,再次在本地缓存获取 Broker 的实际地址,有了这个地址才能进行网络通信。
然后会封装一个 Request 请求,包括请求头、发送的消息等,并且会给消息分配全局唯一 ID,以及对超过 4KB 的消息体进行压缩。
在 Request 请求中,会包含生产者组、Topic 名称、Topic 的 MessageQueue 数量、MessageQueue 的 ID、消息发送时间、消息的 flag、消息扩展属性、消息重试次数、是否批量发送等信息,如果是事务消息则带上 prepared 标记等。
把这些数据都封装到一个 Request 请求后,就会通过 Netty 把 Request 请求发送到指定的 Broker 上。
(2)Producer 和 Broker 基于长连接进行通信
其中,Producer 和 Broker 会通过 Netty 建立长连接,然后基于长连接进行持续通信。如下图示:
那么 Broker 上的 Netty 服务器接收到消息后,会如何进行处理?这个过程比较复杂,涉及到 CommitLog、ConsumeQueue、IndexFile、Checkpoint 等一系列机制,这也是 RocketMQ 中核心机制。
6.Broker 收到一条消息后是如何存储的
(1)Broker 收到消息后的处理流程
Broker 中的 Netty 网络服务器获取到一条消息后:
首先,会把这条消息写入到一个 CommitLog 文件里。一个 Broker 机器上就只有一个 CommitLog 文件,所有 Topic 的消息都会写入到这个文件里。如下图示:
然后,Broker 会以异步的方式把消息写入到一个 ConsumeQueue 文件里,因为一个 Topic 会有多个 MessageQueue。任何一条消息都需要写入到一个 MessageQueue 的,一个 MessageQueue 其实就是对应了一个 ConsumeQueue 文件。所以一条写入 MessageQueue 的消息,必然会异步进入对应的 ConsumeQueue 文件,如下图示:
接着,Broker 还会以异步的方式把消息写入到一个 IndexFile 文件里。在 IndexFile 文件里,会把每条消息的 key 和消息在 CommitLog 中的 offset 偏移量做一个索引,这样后续如果要根据消息 key 从 CommitLog 文件里查询消息,就可以根据 IndexFile 文件的索引来查询,如下图示:
(2)Broker 如何将消息写入 CommitLog 文件
Broker 收到一个消息后,首先会顺序写入 CommitLog 文件。CommitLog 文件的存储目录是 ${ROCKETMQ_HOME}/store/commitlog,目录里会有很多 CommitLog 文件。每个文件默认是 1GB 大小,一个 CommitLog 文件写满了就创建一个新的 CommitLog 文件,文件名就是文件中的第一个偏移量。文件名如果不足 20 位,就用 0 来补齐。
Broker 在把消息写入 CommitLog 文件时,会申请一个 putMessageLock 锁。也就是说,Broker 写入消息到 CommitLog 文件时都是串行的,不会并发写入,因为并发写入文件必然会有数据错乱的问题。
下面是相关的源码片段:
在 asyncPutMessage()方法中,获取到锁之后,会对消息做出一系列处理,包括设置消息的存储时间、创建全局唯一的消息 ID、计算消息的总长度等。然后会执行 MappedFile 的 appendMessage()方法,把消息写入到 MappedFile 里。
上述源码中,其实最关键的是 cb.doAppend()这行代码。cb.doAppend()会把消息追加到 MappedFile 映射的一块内存里去,并没有直接刷入到磁盘上的 CommitLog 文件,如下图示。至于具体什么时候才会把内存里的数据刷入磁盘上的 CommitLog 文件,这就要看配置的刷盘策略了。
另外,不管是同步刷盘还是异步刷盘,如果配置了主从同步,一旦将消息写入到 CommitLog 文件之后,接下来都会进行主从同步复制。
7.Broker 是如何实时更新索引文件的
(1)消息如何进入 CommitLog
Broker 收到一条消息后,会先把消息写入到 CommitLog 里。但是刚开始写入也仅仅是写入到 MappedFile 映射的一块内存,后续才会根据刷盘策略决定是否立即把数据从内存刷入磁盘。如下图示:
(2)消息如何进入 ConsumeQueue 和 IndexFile
Broker 启动时会启动一个叫 ReputMessageService 的线程,这个线程会把写入 CommitLog 的消息转发出去,也就是将消息写入(转发)到 ConsumeQueue 和 IndexFile。如下图示:
在 DefaultMessageStore 的 start()方法里,会启动这个 ReputMessageService 线程。而 DefaultMessageStore 的 start()方法是在 Broker 启动时被调用的,所以相当于 Broker 启动时就会启动这个 ReputMessageService 线程。
下面是 ReputMessageService 线程的源码:
由上述代码可知:在 ReputMessageService 线程里,每隔 1 毫秒就会把最近写入 CommitLog 的消息进行一次转发。其中会通过 doReput()方法将消息转发到 ConsumeQueue 和 IndexFile 中。
在 doReput()方法里,会从 CommitLog 中去获取到一个 DispatchRequest,也就是从 CommitLog 中获取一份需要进行转发的消息。
接着,就会通过调用 doDispatch()方法将消息转发到 ConsumeQueue 和 IndexFile 里,其中会通过遍历 CommitLogDispatcher 来实现。因为这个 CommitLogDispatcher 的实现类有两个,分别负责把消息转发到 ConsumeQueue 和 IndexFile。
ConsumeQueueDispatcher 的写入逻辑,就是找到当前 Topic 的 messageQueueId 对应的一个 ConsumeQueue 文件。一个 MessageQueue 会对应多个 ConsumeQueue 文件,只要找到一个即可,然后就可以把消息写入其中。
IndexDispatcher 的写入逻辑,就是在 IndexFile 里构建对应的索引。
(3)总结
当 Broker 把消息写入 CommitLog 后,会有一个后台线程每隔 1 毫秒拉取 CommitLog 中最新的一批消息,然后分别转发到 ConsumeQueue 和 IndexFile 中。
8.Broker 是如何实现同步刷盘以及异步刷盘的
(1)Broker 收到消息后的存储流程
Broker 首先会将消息写入 CommitLog,并且是先写入 MappedFile 映射的一块内存,而不是先写入磁盘。然后会有一个后台线程把 CommitLog 里的消息写入到 ConsumeQueue 和 IndexFile 里,如下图示:
(2)消息的刷盘时机和策略
当需要写入 CommitLog 的数据进入到 MappedFile 映射的一块内存后,就会开始执行刷盘策略。如果是同步刷盘,那么就会直接把内存里的数据写入磁盘文件。如果是异步刷盘,那么就会过一段时间后再把数据刷入磁盘文件。
在往 CommitLog 写数据时,会调用 CommitLog 的 asyncPutMessage()方法,在这个方法的末尾有两行很关键的代码。一个是调用 submitFlushRequest()方法,用于决定如何进行刷盘。一个是调用 submitReplicaRequest()方法,用于决定如何把消息同步给 Slave Broker。如下所示:
(3)Broker 是如何处理刷盘的
接下来进入 submitFlushRequest()方法看看 Broker 是如何处理刷盘的。
上述代码,就会根据配置的两种不同的刷盘策略,来分别进行处理的。
一.同步刷盘的策略是如何处理的
首先会构建一个 GroupCommitRequest,然后提交给 GroupCommitService 去进行处理,接着调用 request.future()等待同步刷盘成功。
具体的刷盘是由 GroupCommitService 执行的,它的 doCommit()方法会执行同步刷盘的逻辑,代码如下:
上述代码一层一层调用下去,可发现最终刷盘其实是靠 MappedByteBuffer 的 force()方法,如下所示:
这个 MappedByteBuffer 就是 JDK NIO 包下的 API,MappedByteBuffer 的 force()方法会强迫将写入内存的数据刷入到磁盘文件里,执行完 force()方法就代表同步刷盘成功了。
二.异步刷盘的策略是如何处理的
此时会唤醒一个 flushCommitLogService 组件。由于 FlushCommitLogService 是一个线程,它是一个抽象父类,它的子类是 CommitRealTimeService。所以真正唤醒的是 FlushCommitLogService 的子类 CommitRealTimeService 线程。
在该线程里,会每隔一定时间执行一次刷盘,最大间隔是 10s。所以一旦执行异步刷盘,那么最多 10 秒就会执行一次刷盘。
9.Broker 是如何清理存储较久的磁盘数据的
(1)定时检查是否要删除磁盘上的文件
默认情况下,Broker 会启动一个后台线程,这个后台线程会自动检查 CommitLog 文件、ConsumeQueue 文件,因为这些文件都会存在多个。如果发现比较旧的、超过 72 小时的文件,那么就会清理这些文件。
所以,默认情况下,Broker 只会将消息保留 3 天,当然我们也可以通过 fileReservedTime 来自定义配置这个时间。
这个定时检查过期数据文件的线程,在 DefaultMessageStore 这个类里。在 DefaultMessageStore 的 start()方法中,会调用 addScheduleTask()方法每隔 10s 定时执行一个后台检查任务。如下所示:
在这个任务中,就会执行 DefaultMessageStore 的 cleanFilesPeriodically()方法。其实也就是会周期性地清理掉磁盘上超过 72 小时的 CommitLog、ConsumeQueue 文件。
cleanFilesPeriodically()方法中包含了清理 CommitLog 和 ConsumeQueue 文件的逻辑:
(2)触发删除文件的条件
条件一:如果当前时间是预先设置的凌晨 4 点,就会触发执行一次删除文件的逻辑,这个时间是默认的
条件二:如果磁盘空间不足了也就是超过了 85%的使用率,就会马上触发执行一次删除文件的逻辑
条件一指的是:如果磁盘没有满 ,那么每天会进行一次删除磁盘文件的操作,默认在凌晨 4 点执行,因为那个时候基本是业务低峰期。
条件二指的是:如果磁盘使用率超过 85%了,那么此时可以允许继续在磁盘里写入数据,但会马上触发一次删除文件的操作。
注意:如果磁盘使用率超过 90%了,那么此时是不允许再往磁盘里写入新数据的,同时会马上删除文件。因为一旦磁盘满了,那么写入磁盘就会失败,此时 MQ 就会出现故障。
(3)删除文件的具体操作
在删除文件时,无非就是对文件进行遍历。如果一个文件超过 72 小时都没修改过了,此时就可以删除了,哪怕有的消息可能还没有被消费,此时也不会再让消费者去消费了,直接删除掉。
10.Consumer 作为消费者是如何创建和启动的
(1)Cosumer 是如何创建和启动的
一般会通过 DefaultMQPushConsumerImpl 来创建 Consumer,然后调用它的 start()方法进行启动。
在执行 start()方法启动 Consumer 的过程中,就会执行如下代码让 Consumer 和 Broker 建立长连接。只有建立了长连接,Consumer 才能不断地从 Broker 中拉取消息。其中,MQClientFactory 也是基于 Netty 来实现的。
接着看 start()方法的如下代码:
上述代码的 RebalanceImpl 就是专门负责 Consumer 重平衡的。如果 ConsumerGroup 中加入了一个新的 Consumer,那么就会重新分配每个 Consumer 消费的 MessageQueue。如果 ConsumerGroup 里某个 Consumer 宕机了,那么也会重新分配 MessageQueue,这就是所谓的重平衡。
接着看 start()方法的如下代码:
这个 PullAPIWrapper 就是消费者专门用来拉取消息的 API 组件。
接着看 start()方法的如下代码:
上面代码中的 OffsetStore 其实就是用来存储和管理 Consumer 消费进度 offset 的一个组件。
(2)Consumer 启动时的三个核心组件总结
DefaultMQPushConsumerImpl 的 start()方法最核心的就是这三个组件。
首先 Consumer 刚启动,需要根据 Rebalancer 组件进行重平衡,确定自己要分配哪些 MessageQueue 之后才去拉取消息。
然后在拉取消息时,需要根据 PullAPI 组件通过底层网络通信发送请求进行拉取。
接着在拉取消息的过程中,需要根据 OffsetStore 组件来维护 offset 消费进度。
如果 ConsumerGroup 中多了 Consumer 或者少了 Consumer,那么就需要根据 Rebalancer 组件来进行重平衡。
11.消费者组的多个 Consumer 会如何分配消息
(1)Consumer 的负载均衡问题
当一个业务系统部署多台机器时,每个系统里都启动了一个 Consumer。多个 Consumer 会组成一个 ConsumerGroup,也就是消费者组。此时就会有一个消费者组内的多个 Consumer 同时消费一个 Topic,而且这个 Topic 是有多个 MessageQueue 分布在多个 Broker 上的。如下图示:
那么问题来了:如果一个业务系统部署在两台机器上,对应一个消费者组里就有两个 Consumer。而业务系统需要消费的一个 Topic 有三个 MessageQueue,那么应该怎么分配呢?这就涉及到 Consumer 的负载均衡问题了。
前面介绍 Consumer 启动时,就介绍了几个关键的组件,分别是:重平衡组件、消息拉取组件、消费进度组件。其中的重平衡组件,就是专门负责处理多个 Consumer 的负载均衡问题的。
(2)重平衡组件如何分配 MessageQueue
那么这个 RebalancerImpl 重平衡组件是如何将多个 MessageQueue 均匀的分配给一个消费者组内的多个 Consumer 的?
实际上,每个 Consumer 在启动后,都会向所有的 Broker 进行注册,并且持续保持自己的心跳,让每个 Broker 都能感知到一个消费者组内有哪些 Consumer。下图中没有画出 Consumer 向每个 Broker 进行注册以及心跳,只能大致示意一下。
每个 Consumer 在启动后,重平衡组件都会随机挑选一个 Broker,从里面获取该消费者组里有哪些 Consumer 存在。
当重平衡组件知道了消费者组内有哪些 Consumer 后,接下来就好办了。无非就是把 Topic 下的 MessageQueue 均匀地分配给这些 Consumer。这时候其实有几种算法可以进行分配,但比较常用的一种算法就是平均分配。
假设现在一共有 3 个 MessageQueue,有 2 个 Consumer。那么就会给 1 个 Consumer 分配 2 个 MessageQueue,给另外 1 个 Consumer 分配剩余的 1 个 MessageQueue。
假设现在一共有 4 个 MessageQueue,有 2 个 Consumer。那么就可以 2 个 Consumer 各自分配 2 个 MessageQueue。
总之一切都是平均分配,尽量保证每个 Consumer 的负载差不多。这样,一旦 MessageQueue 负载确定后,Consumer 就知道自己要消费哪几个 MessageQueue 的消息,于是就可以连接到那个 Broker 上,从里面不停拉取消息过来进行消费。
12.Consumer 会如何从 Broker 拉取一批消息
(1)什么是消费者组
一.消费者组举例
消费者组的意思就是给一组消费者起一个名字。比如有一个 Topic 叫 TopicOrderPaySuccess,库存系统、积分系统、营销系统、仓储系统都要去消费这个 Topic 中的数据,那么此时应该给这四个系统分别起一个消费者组名字,如下所示:
设置消费者组的方式如下所示:
假设库存系统部署了 4 台机器,每台机器上的消费者组的名字都是 stock_consumer_group,那么这 4 台机器就同属于一个消费者组。以此类推,每个系统的几台机器都是属于各自的消费者组。
下图展示了两个系统,每个系统都有 2 台机器,每个系统都有一个自己的消费者组。
二.不同消费者组之间的关系
假设库存系统和营销系统作为两个消费者组,都订阅了 TopicOrderPaySuccess 这个订单支付成功消息的 Topic,此时如果订单系统作为生产者发送了一条消息到这个 Topic,那么这条消息会被如何消费呢?
一般情况下,这条消息进入 Broker 后,库存系统和营销系统作为两个消费者组,每个组都会拉取到这条消息。也就是说,这个订单支付成功的消息,库存系统会获取到一条,营销系统也会获取到一条,它们俩都会获取到这条消息。
但库存系统这个消费者组里有两台机器,是两台机器都获取到这条消息、还是只有一台机器会获取到这条消息?
一般情况下,库存系统的两台机器中只有一台机器会获取到这条消息,营销系统也是同理。
下图展示了对于同一条订单支付成功的消息,库存系统的一台机器获取到了、营销系统的一台机器也获取到了。所以在消费时,不同的系统应该设置不同的消费者组。如果不同的消费者组订阅了同一个 Topic,对 Topic 里的同一条消息,每个消费者组都会获取到这条消息。
(2)集群模式消费 vs 广播模式消费
对于一个消费者组而言,它获取到一条消息后,如果消费者组内部有多台机器,到底是只有一台机器可以获取到这个消息,还是每台机器都可以获取到这个消息?这就是集群模式和广播模式的区别。
默认情况下都是集群模式:即一个消费者组获取到一条消息,只会交给组内的一台机器去处理,不是每台机器都可以获取到这条消息的。
但是可以通过如下设置来改变为广播模式:
如果修改为广播模式,那么对于消费者组获取到的一条消息,组内每台机器都可以获取到这条消息。但是相对而言广播模式用的很少,基本上都是使用集群模式来进行消费的。
(3)MessageQueue 和 ConsumeQueue 以及 CommitLog 之间的关系
在创建 Topic 时,需要设置 Topic 有多少个 MessageQueue。Topic 中的多个 MessageQueue 会分散在多个 Broker 上,一个 Broker 上的一个 MessageQueue 会有多个 ConsumeQueue 文件。但在一个 Broker 运行过程中,一个 MessageQueue 只会对应一个 ConsumeQueue 文件。
对于 Broker 而言,存储在一个 Broker 上的所有 Topic 及 MessageQueue 数据都会写入一个统一的 CommitLog 文件,一个 Broker 收到的所有消息都会往 CommitLog 文件里面写。
对于 Topic 的各个 MessageQueue 而言,则是通过各个 ConsumeQueue 文件来存储属于 MessageQueue 的消息在 CommitLog 文件中的物理地址(即 offset 偏移量)。
(4)MessageQueue 与消费者的关系
一个 Topic 上的多个 MessageQueue 是如何让一个消费者组中的多台机器来进行消费的?可以简单理解为,它会均匀将 MessageQueue 分配给消费者组的多台机器来消费。
举个例子,假设 TopicOrderPaySuccess 有 4 个 MessageQueue,这 4 个 MessageQueue 分布在两个 Master Broker 上,每个 Master Broker 上有 2 个 MessageQueue。然后库存系统作为一个消费者组,库存系统里有两台机器。那么正常情况下,最好就是让这两台机器各自负责 2 个 MessageQueue 的消费。比如库存系统的机器 01 从 Master Broker01 上消费 2 个 MessageQueue,库存系统的机器 02 从 Master Broker02 上消费 2 个 MessageQueue。这样就能把消费的负载均摊到两台 Master Broker 上。
所以大致可以认为一个 Topic 的多个 MessageQueue 会均匀分摊给消费者组内的多个机器去消费。
这里的一个原则是:一个 MessageQueue 只能被一个消费者机器去处理,但是一台消费者机器可以负责多个 MessageQueue 的消息处理。
(5)Push 消费模式 vs Pull 消费模式
一.一般选择 Push 消费模式
既然一个消费者组内的多台机器会分别负责一部分 MessageQueue 的消费的,那么每台机器都必须要连接到对应的 Broker,尝试消费里面 MessageQueue 对应的消息。于是就涉及到两种消费模式了,一个是 Push 模式、一个是 Pull 模式。
这两个消费模式本质上是一样的,都是消费者主动发送请求到 Broker 去拉取一批消息进行处理。
Push 消费模式是基于 Pull 消费模式来实现的,只不过它的名字叫做 Push 而已。在 Push 模式下,Broker 会尽可能实时把新消息交给消费者进行处理,它的消息时效性会更好。
一般我们使用 RocketMQ 时,消费模式通常都选择 Push 模式来,因为 Pull 模式的代码写起来更加复杂和繁琐,而且 Push 模式底层本身就是基于 Pull 模式来实现的,只不过时效性更好而已。
二.Push 消费模式的实现思路
当消费者发送请求到 Broker 去拉取消息时,如果有新的消息可以消费,那么就马上返回一批消息到消费机器去处理。消费者处理完之后,会接着发送请求到 Broker 机器去拉取下一批消息。
所以,消费者机器在 Push 模式下处理完一批消息,会马上发起请求拉取下一批消息,消息处理的时效性非常好,看起来就像 Broker 一直不停的推送消息到消费机器一样。
此外,Push 模式下有一个请求挂起和长轮询的机制:当拉取消息的请求发送到 Broker,Broker 却发现没有新的消息可以处理时,就会让处理请求的线程挂起,默认是挂起 15 秒。然后在挂起期间,Broker 会有一个后台线程,每隔一会就检查一下是否有新的消息。如果有新的消息,就主动唤醒被挂起的请求处理线程,然后把消息返回给消费者。
可见,常见的 Push 消费模式,本质也是消费者不断发送请求到 Broker 去拉取一批一批的消息。
(6)Broker 如何读取消息返回给消费者
Broker 在收到消费者的拉取请求后,是如何将消息读取出来,然后返回给消费者的?这涉及到 ConsumeQueue 和 CommitLog。
假设一个消费者发送了拉取请求到 Broker,表示它要拉取 MessageQueue0 中的消息,然后它之前都没拉取过消息,所以就从这个 MessageQueue0 中的第一条消息开始拉取。
于是,Broker 就会找到 MessageQueue0 对应的 ConsumeQueue0,从里面找到第一条消息的 offset。接着 Broker 就需要根据 ConsumeQueue0 中找到的第一条消息的地址,去 CommitLog 中根据这个 offset 地址读取出这条消息的数据,然后把这条消息的数据返回给消费者。
所以消费者在消费消息时,本质就是:首先根据要消费的 MessageQueue 以及开始消费的位置,去找到对应的 ConsumeQueue。然后在 ConsumeQueue 中读取要消费的消息在 CommitLog 中的 offset 偏移量。接着到 CommitLog 中根据 offset 读取出完整的消息数据,最后将完整的消息数据返回给消费者。
(7)消费者如何处理消息、进行 ACK 响应以及提交消费进度
消费者拉取到一批消息后,就会将这批消息传入注册的回调函数,如下所示:
当消费者处理完这批消息后,消费者就会提交目前的一个消费进度到 Broker 上,然后 Broker 就会存储消费者的消费进度。
比如现在对 ConsumeQueue0 的消费进度就是在 offset=1 的位置,那么 Broker 会记录下一个 ConsumeOffset 来标记该消费者的消费进度。这样下次这个消费者组只要再次拉取这个 ConsumeQueue 的消息,就可以从 Broker 记录的消费位置开始继续拉取,不用重头开始拉取了。
(8)消费者组出现宕机或扩容应如何处理
此时会进入一个 Rebalance 环节,也就是重新给各个消费者分配各自需要处理的 MessageQueue。
比如现在机器 01 负责 MessageQueue0 和 MessageQueue1,机器 02 负责 MessageQueue2 和 MessageQueue3。如果现在机器 02 宕机了,那么机器 01 就会接管机器 02 之前负责的 MessageQueue2 和 MessageQueue3。如果此时消费者组加入了一台机器 03,那么就可以把机器 02 负责的 MessageQueue3 转移给机器 03,然后机器 01 只负责一个 MessageQueue2 的消费,这就是负载重平衡。
(9)消费源码的流程
拉取消息的源码入口在 DefaultMQPushConsumerImpl 类的 pullMessage()方法,里面涉及了:拉取请求、消息流量控制、通过 PullAPIWrapper 与服务端进行网络交互、服务端根据 ConsumeQueue 文件拉取消息等事情。
文章转载自:东阳马生架构
评论