深入了解 RocketMQ 之 Broker
一. 概述
消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
在RocketMQ 之 Store 模块已经大体介绍了消息是如何存储到磁盘的;这里就不在过多介绍,重点介绍其他模块的知识点;细节不在这里详解,会在单独的特性中进行详细介绍,可以理解为概况;
二. ConfigManager 家族
配置管理抽象类,是基于配置文件来对各个信息管理;其类图如下:
我们先大体介绍一下相关的子类的功能
TopicConfigManager 有关 topic 下的配置信息
ConsumerFilterManager 有关消费者的过滤器的管理
SubscriptionGroupManager 有关订阅组的管理
ScheduleMessageService 有关定时消息的消费位点的管理;
ConsumerOffsetManager 有关消费位点的管理
1. TopicConfigManager
该类记录了 topic 相关的配置信息;
默认存放的路径:${storePathRootDir}/config/topic.json。存储文件内容格式如下:
dataVersion ,每次更改都会刷新其版本信息
order 目前没有发现其有重要的作用,先忽略,我看起其默认值都是为 false;
perm 权限,这里有三个权限,可读、可写、可继承;可读可写,没啥可说的,关键在于可继承方面,稍微比较特殊,在发送消息时同时需要创建 topic 时,会去校验默认 topic 中是否有可继承权限,如果有,则会去继承默认 topic 相关的配置信息;如果没有,则返回客户端 topic 不存在;具体代码 AbstractSendMessageProcessor.msgCheck
topicFilterType 过滤类型,目前支持两个,单 tag 过滤和多个 tag 过滤;这个会单独一篇来进行讲解;
topicSysFlag 目前没有找到其用途,先忽略;
readQueueNums 可读队列数量,默认值为 16
writeQueueNums 可写队列数量,默认值为 16
2. ConsumerFilterManager
该类记录了消费者的过滤器的配置信息;
默认存放的路径:${storePathRootDir}/config/consumerFilter.json。存储文件内容格式如下:
过滤这一块,会单独一篇进行完整介绍,不再这篇文章这里介绍;
3. SubscriptionGroupManager
该类记录了订阅组的信息;
默认存放的路径:${storePathRootDir}/config/subscriptionGroup.json。存储文件内容格式如下:
groupName 订阅组名称
consumeEnable 是否能消费消息,在消息拉取时,根据其属性来决定消费者能否消费消息;
consumeFromMinEnable 是否能从最小偏移量进行消费,目前没有找到其引用,先忽略;
consumeBroadcastEnable 是否支持广播模式,默认值 true
retryQueueNums 重试队列数量,默认值为 1;当消费者无法消费消息时,会将该消息推送会 Broker 服务器;如果该值设置为 0,则直接丢失该消息;
retryMaxTimes 最大重试消费次数,默认值为 16;如果超过该次数,会将该消息丢入死信队列中去;
brokerId 拉取消息时,提供建议消费 brokerId,但是消费者并没有针对这个值做进一步处理,先忽略;
whichBrokerWhenConsumeSlowly 如果拉取消息缓慢时,提供建议消费 BrokerId;同样消费者并没有针对这个值做进一步处理,先忽略
notifyConsumerIdsChangedEnable 消费者数量变化后是否立即通知 RebalanceServer 线程,以便于进行消费队列重新负载,默认为 true
4. ScheduleMessageService
该类记录了定时消息即延迟队列的消费位点信息;
默认存放的路径:${storePathRootDir}/config/delayOffset.json。存储文件内容格式如下:
其内部定义了定时器,定时处理对应的消息的;
5. ConsumerOffsetManager
该类记录了集群消费组消息位点的信息;
默认存放的路径:${storePathRootDir}/config/consumerOffset.json。存储文件内容格式如下:
结构相对于简单,不再过多简述;
三. 功能模块
1. 客户端管理模块
主要是接收如下三类请求,具体的代码在 ClientManageProcessor:
1.1. 接受心跳请求
请求码 HEART_BEAT 接收客户端的发送过来的心跳数据,主要是包含生产者、消费者相关的数据;
我们先看一下整体的数据结构,如图所示:
上面列出了在 Broker 端对客户端中的消费者以及生产者尽心管理的关键类以及数据结构。同时也列出了客户端发送心跳的数据结构;
当接收到心跳数据时,主要做几大步骤:
broker 会去 TopicConfigManager 注册对应的重试 topic 信息;
向 ConsumerManager 注册消费者信息;这里有分为好几个子步骤
更新 consumerTable 数据;
如果有发生变更,则向各个客户端发送 NOTIFY_CONSUMER_IDS_CHANGED 请求,通知消费者 IDS 已经发生变更;
向 ConsumerFilterManager 注册过滤信息;
向 ProducerManager 注册生产者;
1.2. 注销客户端
请求码 UNREGISTER_CLIENT,接收客户端发送过来的注销数据,其逻辑与心跳请求完全相反;这里就不在过多讲述;
请求结构如下:
1.3. 检查客户端配置
请求码 CHECK_CLIENT_CONFIG, 主要是检查订阅相关的配置信息;请求 body 如下:
1.4 执行器
只处理注销客户端、检查客户端配置的请求的执行,而注册心跳请求的,则用专门的的执行器来处理,我们通过配置项可以配置其容量等信息
clientManagerThreadPoolQueueCapacity 客户端管理线程池任务队列初始大小,默认 100,0000
clientManageThreadPoolNums 服务端处理客户端管理(注册、取消注册)线程池线程数量,默认为 32
拒绝策略为中断,直接抛出异常;
heartbeatThreadPoolQueueCapacity 客户端管心跳处理线程池任务队列初始大小,默认 50000
clientManageThreadPoolNums 服务端处理客户端管理(心跳)线程池线程数量,如果超过 32 核数的话,则选择 32,否则 CPU 核数;
拒绝策略为中断,直接抛出异常;
2. 消费者管理模块
主要是接收如下三类请求,具体的代码在 ConsumerManageProcessor:
2.1. 获取消费者列表
请求码 GET_CONSUMER_LIST_BY_GROUP,其逻辑是直接从 ConsumerManager 对象中直接获取客户端信息,逻辑比较简单;
请求 body
2.2. 更新消费位点
请求码 UPDATE_CONSUMER_OFFSET,其逻辑通过 ConsumerOffsetManager 对象来更新消费位点
2.3. 查询消费位点
请求码 QUERY_CONSUMER_OFFSET,其逻辑通过 ConsumerOffsetManager 对象来查询消费位点
2.4 执行器
consumerManageThreadPoolNums 服务端处理消费管理(获取消费者列表、更新消费进度、查询消费进度)线程池线程数量,默认为 32
3. 最终事务处理模块
其主要是处理请求码 END_TRANSACTION,也就是最终事务请求;在两阶段中,就是第二阶段的提交;具体的代码在 EndTransactionProcessor:
3.1 执行器
endTransactionThreadPoolNums 默认为 8 加上当前操作系统 CPU 核数的两倍
endTransactionPoolQueueCapacity 默认值 100000
4. 转发请求模块
主要的类 ForwardRequestProcessor,目前没有在使用;先忽略;
5. 拉取消息模块
主要是处理类请求码 PULL_MESSAGE 的消息拉取请求,具体的代码在 PullMessageProcessor:具体的入口 processRequest 方法;其主要的逻辑是在前期做大量的校验工作,如权限校验,topic 校验,订阅组校验等等,具体细节需要看里面的代码才行;接着通过 Store 模块中获取对应的消息数据;接着有选择的更新消费位点;
然而这里值得注意的地方,不要随意更改其一些关键的配置,如:
autoCreateSubscriptionGroup 是否自动创建消费组订阅配置信息,默认值为 true;一旦该为 false,那么就会出现拉取失败的现象,除非等心跳完后,broker 自动将订阅组配置信息记录后,才可以进行拉取消息成功;
5.1 执行器
有关接收客户端拉取消息请求的线程池执行器,我们通过配置项可以配置其容量等信息
pullThreadPoolQueueCapacity 消息拉取线程池任务队列初始大小,默认为 100000;
pullMessageThreadPoolNums 服务端处理消息拉取线程池线程数量,默认为 16 加上当前操作系统 CPU 核数的两倍
拒绝策略为中断,直接抛出异常;
5.2 长轮询机制
当消费者定时消费时,向 Broker 拉取消息,Broker 找不到其消息时,如果满足两个条件后,就会将该请求保存到 PullRequestHoldService 后台线程的集合属性 pullRequestTable,等待生产者推送消息过来;条件如下:
允许 Broker 阻塞其请求,默认是为 true;
消息中 SysFlag 是否带有阻塞标志,PullSysFlag.FLAG_SUSPEND=0x10
相关的长轮询机制的配置项如下:
longPollingEnable 是否支持长轮询,默认为 true;
shortPollingTimeMills 短轮询毫秒数,默认值为 1s;
如果 longPollingEnable 为 true,则间隔 5s 去处理 pull 请求;否则间隔 shortPollingTimeMills(1s)去处理 pull 请求;
6. 查询消息模块
主要是接收如下查阅消息的请求,具体的代码在 QueryMessageProcessor:
6.1. 查询消息
请求码 QUERY_MESSAGE,其通过 IndexFile 去查询,有关的可以查阅RocketMQ 之 Store 模块,以及对应的代码;不算太复杂;
6.2 查阅消息
请求码 VIEW_MESSAGE_BY_ID,其通过 CommitLog 去查询,有关的可以查阅RocketMQ 之 Store 模块,以及对应的代码;不算太复杂;
6.3 执行器
有关接收客户端查阅消息的线程池执行器,我们通过配置项可以配置其容量等信息
queryThreadPoolQueueCapacity 查询消息线程池任务队列初始大小,默认为 20000;
queryMessageThreadPoolNums 服务端处理查询消息线程池线程数量,默认为 8 加上当前操作系统 CPU 核数
拒绝策略为中断,直接抛出异常;
7. 回复消息模块
主要是接收带有回复性质的消息发送请求,具体的代码在 ReplyMessageProcessor:
当生产者发送消息后,broker 接收该请求后,会将该消息带原封不动的返回给客户端;
当配置项 storeReplyMessageEnable 设置为 true 时,会将该消息存放到 broker 持久化模块中;
具体的流程如下:
客户端发送带有回复性质的消息给到 broker,并创建 RequestResponseFuture 对象;等待 broker 端通通过 PUSH_REPLY_MESSAGE_TO_CLIENT 这个接口发送数据给客户端;是基于 CORRELATION_ID 来查找对应的 RequestResponseFuture 对象;
Broker 端口收到该请求后,会原封不动的将该数据通过 PUSH_REPLY_MESSAGE_TO_CLIENT 接口发回给客户端;如果 storeReplyMessageEnable 为 true 时,会将该消息存放到 broker 持久化模块中;
客户端通过 RequestResponseFuture 对象拿到刚开始的消息;
不太清楚为什么会这样子设计,总感觉有点怪怪的;
7.1 执行器
有关接收客户端带有回复性质的消息发送请求的线程池执行器,我们通过配置项可以配置其容量等信息
replyThreadPoolQueueCapacity 带有回复性质的消息发送请求的线程池任务队列初始大小,默认为 10000;
processReplyMessageThreadPoolNums 服务端处理消息拉取线程池线程数量,默认为 16 加上当前操作系统 CPU 核数的两倍
拒绝策略为中断,直接抛出异常;
8. 发送消息模块
主要是接收客户端发送过来的消息,这里消息有一般的消息,也有重试消息的;这里稍微讲一下重试消息的概念,Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次;具体的代码在 SendMessageProcessor;
8.1 一般消息的发送
请求码包含 SEND_MESSAGE、SEND_MESSAGE_V2、SEND_BATCH_MESSAGE;
这里会区分是否是事务消息;
如果是事务消息,会多出一个步骤;会将该消息进行篡改,将真实的 topic 以及队列 ID 放入到属性当中,然后将该消息保存到事务队列中去,然后再通过 Store 将消息保存到文件当中去;
8.2 重试消息
请求码 CONSUMER_SEND_MSG_BACK,这个跟事务消息的处理逻辑差不多;
会将该消息保存到重试队列中去:其会将该消息保存到延迟队列中去,然后等待指定时间后再保存到重试队列中去;
如果超过了重试次数,则将该消息保存到死信队列中去;
有关重试消息的,这里稍微提醒一下:在消费者中,只有在集群模式下,才会自动去消费重试队列中的消息;后续会对 RocketMQ 的特性进行一篇介绍;
8.3 执行器
有关接收客户端发送消息的请求的线程池执行器,我们通过配置项可以配置其容量等信息
sendMessageThreadPoolNums 服务端处理发送线程池线程数量,默认为 1
sendThreadPoolQueueCapacity 消息发送线程池任务队列初始大小,默认为 10000;
拒绝策略为中断,直接抛出异常;
8.4 拒绝消息发送请求策略
如果大量的发送请求过来,且线程池没有达到峰值,SendMessageProcessor 内部也提供了对应的拒绝该请求;具体代码如下:
拒绝策略的相关的配置项;
osPageCacheBusyTimeOutMills putMessage 锁占用超过该时间,表示 PageCache 忙,默认为 1s
transientStorePoolEnable Commitlog 是否开启 transientStorePool 机制,默认为 false.
transientStorePoolSize transientStorePool 中缓存 byteBuffer 个数,默认 5 个
flushDiskType 刷盘方式,默认为 ASYNC_FLUSH(异步刷盘),可选值:SYNC_FLUSH(同步刷盘)
9. Broker 管理模块
主要对 broker 进行管理工作,关键类:AdminBrokerProcessor。
有关执行器介绍如下:
adminBrokerThreadPoolNums 服务端处理控制台管理命令线程池线程数量,默认为 16
功能如下:
9.1 topic 相关的接口
UPDATE_AND_CREATE_TOPIC 更新或创建 Topic
DELETE_TOPIC_IN_BROKER 删除 Topic
GET_ALL_TOPIC_CONFIG 获取所有 Topic 的配置信息
9.2 Broker 相关的接口
UPDATE_BROKER_CONFIG 更新 Broker 配置信息
GET_BROKER_CONFIG 获取 Broker 配置信息
SEARCH_OFFSET_BY_TIMESTAMP 根据时间戳来查找消费位点
GET_MAX_OFFSET 获取 topic 和 queueId 下的最大位点
GET_MIN_OFFSET 获取 topic 和 queueId 下的最小位点
GET_EARLIEST_MSG_STORETIME 获取 topic&queueId 下的最早的消息存储时间
GET_BROKER_RUNTIME_INFO 获取 Broker 运行时信息
LOCK_BATCH_MQ & UNLOCK_BATCH_MQ 主要给加上锁以及释放锁,这一块还没理解太清楚;
9.3 订阅组相关的接口
UPDATE_AND_CREATE_SUBSCRIPTIONGROUP 更新或创建订阅组信息
GET_ALL_SUBSCRIPTIONGROUP_CONFIG 获取所有的订阅组配置信息
DELETE_SUBSCRIPTIONGROUP 删除订阅组信息
9.4 统计 &状态相关的接口
VIEW_BROKER_STATS_DATA 查阅 Broker 统计数据
GET_BROKER_CONSUME_STATS 获取 Broker 消费统计数据
GET_CONSUMER_RUNNING_INFO 获取消费者运行时信息
INVOKE_BROKER_TO_GET_CONSUMER_STATUS 获取 broker 中的消费者状态信息
GET_CONSUME_STATS 获取消费统计信息
GET_TOPIC_STATS_INFO 获取 topic 下的统计信息
GET_CONSUMER_CONNECTION_LIST 获取消费者连接信息列表
GET_PRODUCER_CONNECTION_LIST 获取生产者连接信息列表
9.5 查询相关的接口
GET_ALL_CONSUMER_OFFSET 获取所有消费者消费位点
GET_ALL_DELAY_OFFSET 获取所有延迟队列消费位点
QUERY_TOPIC_CONSUME_BY_WHO 查询 topic 下消息是被谁消费
GET_SYSTEM_TOPIC_LIST_FROM_BROKER 获取系统 topic 列表
QUERY_CONSUME_QUEUE 查询消费队列
QUERY_CORRECTION_OFFSET 查询正确的消费位点;目前没有过多深入查看其接口逻辑;先忽略;
9.6 安全相关的接口
UPDATE_AND_CREATE_ACL_CONFIG 更新和创建 ACK 配置信息
DELETE_ACL_CONFIG 删除 ACL 配置信息
GET_BROKER_CLUSTER_ACL_CONFIG 获取 Broker 集群 ACL 配置信息
GET_BROKER_CLUSTER_ACL_INFO 获取 Broker 集群的 ACL 信息
UPDATE_GLOBAL_WHITE_ADDRS_CONFIG 更新全局白名单配置信息
9.7 其他服务接口
RESUME_CHECK_HALF_MESSAGE 将之前的消息重新保存到 Half 队列中
CLEAN_EXPIRED_CONSUMEQUEUE 清除过期消费队列
REGISTER_FILTER_SERVER 注册过滤服务器
QUERY_CONSUME_TIME_SPAN 查询消费
INVOKE_BROKER_TO_RESET_OFFSET 重置 Broker 中的消费位点
CLEAN_UNUSED_TOPIC 清除未使用的 topic
CONSUME_MESSAGE_DIRECTLY 直接消费信息
CLONE_GROUP_OFFSET 克隆订阅组消费位点
四. 总结
经过大体的梳理,画出了对应的图形,便于对 Broker 有一定的认知,查阅相关的代码可以更进一步了解其原理
版权声明: 本文为 InfoQ 作者【邱学喆】的原创文章。
原文链接:【http://xie.infoq.cn/article/24ad12a68fc4b289a153b6c03】。文章转载请联系作者。
评论