写点什么

深入了解 RocketMQ 之 Broker

用户头像
邱学喆
关注
发布于: 3 小时前
深入了解RocketMQ之Broker

一. 概述

消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

RocketMQ 之 Store 模块已经大体介绍了消息是如何存储到磁盘的;这里就不在过多介绍,重点介绍其他模块的知识点;细节不在这里详解,会在单独的特性中进行详细介绍,可以理解为概况;


二. ConfigManager 家族

配置管理抽象类,是基于配置文件来对各个信息管理;其类图如下:

我们先大体介绍一下相关的子类的功能

  • TopicConfigManager 有关 topic 下的配置信息

  • ConsumerFilterManager 有关消费者的过滤器的管理

  • SubscriptionGroupManager 有关订阅组的管理

  • ScheduleMessageService 有关定时消息的消费位点的管理;

  • ConsumerOffsetManager 有关消费位点的管理

1. TopicConfigManager

该类记录了 topic 相关的配置信息;

默认存放的路径:${storePathRootDir}/config/topic.json。存储文件内容格式如下:

{	"dataVersion":{		"counter": AtomicLong,		"timestamp": long	},	"topicConfigTable":{		"topic名称":{			"order": boolean,			"perm": int,			"readQueueNums": int,			"writeQueueNums": int,			"topicFilterType": TopicFilterType,			"topicName": String,			"topicSysFlag": int				},		.......	}}
复制代码
  • dataVersion ,每次更改都会刷新其版本信息

  • order 目前没有发现其有重要的作用,先忽略,我看起其默认值都是为 false;

  • perm 权限,这里有三个权限,可读、可写、可继承;可读可写,没啥可说的,关键在于可继承方面,稍微比较特殊,在发送消息时同时需要创建 topic 时,会去校验默认 topic 中是否有可继承权限,如果有,则会去继承默认 topic 相关的配置信息;如果没有,则返回客户端 topic 不存在;具体代码 AbstractSendMessageProcessor.msgCheck

  • topicFilterType 过滤类型,目前支持两个,单 tag 过滤和多个 tag 过滤;这个会单独一篇来进行讲解;

  • topicSysFlag 目前没有找到其用途,先忽略;

  • readQueueNums 可读队列数量,默认值为 16

  • writeQueueNums 可写队列数量,默认值为 16

2. ConsumerFilterManager

该类记录了消费者的过滤器的配置信息;

默认存放的路径:${storePathRootDir}/config/consumerFilter.json。存储文件内容格式如下:

{	"filterDataByTopic":{		"Topic":{			"topic": String,			"groupFilterData": {				"consumerGroup":{					"consumerGroup" : String,					"topic":  String,					"expression": String,					"expressionType": String,					"bornTime": long,					"deadTime": long,					"bloomFilterData":{						"bitPos":  int[],						"bitNum": int					},					"clientVersion": long				}			},			....		},		....	}}
复制代码

过滤这一块,会单独一篇进行完整介绍,不再这篇文章这里介绍;

3. SubscriptionGroupManager

该类记录了订阅组的信息;

默认存放的路径:${storePathRootDir}/config/subscriptionGroup.json。存储文件内容格式如下:

{	"dataVersion":{		"counter": AtomicLong,		"timestamp": long	},	"subscriptionGroupTable":{		"groupName":{			"groupName": String,			"consumeEnable":boolean,			"consumeFromMinEnable": boolean,			"consumeBroadcastEnable": boolean,			"retryQueueNums":int,			"retryMaxTimes": int,			"brokerId": long,			"whichBrokerWhenConsumeSlowly": long,			"notifyConsumerIdsChangedEnable": boolean					},		.....	}}
复制代码
  • groupName 订阅组名称

  • consumeEnable 是否能消费消息,在消息拉取时,根据其属性来决定消费者能否消费消息;

  • consumeFromMinEnable 是否能从最小偏移量进行消费,目前没有找到其引用,先忽略;

  • consumeBroadcastEnable 是否支持广播模式,默认值 true

  • retryQueueNums 重试队列数量,默认值为 1;当消费者无法消费消息时,会将该消息推送会 Broker 服务器;如果该值设置为 0,则直接丢失该消息;

  • retryMaxTimes 最大重试消费次数,默认值为 16;如果超过该次数,会将该消息丢入死信队列中去;

  • brokerId 拉取消息时,提供建议消费 brokerId,但是消费者并没有针对这个值做进一步处理,先忽略;

  • whichBrokerWhenConsumeSlowly 如果拉取消息缓慢时,提供建议消费 BrokerId;同样消费者并没有针对这个值做进一步处理,先忽略

  • notifyConsumerIdsChangedEnable 消费者数量变化后是否立即通知 RebalanceServer 线程,以便于进行消费队列重新负载,默认为 true

4. ScheduleMessageService

该类记录了定时消息即延迟队列的消费位点信息;

默认存放的路径:${storePathRootDir}/config/delayOffset.json。存储文件内容格式如下:

{	"offsetTable":{		"level级别":offset(消费位点),		.....	}}
复制代码

其内部定义了定时器,定时处理对应的消息的;

5. ConsumerOffsetManager

该类记录了集群消费组消息位点的信息;

默认存放的路径:${storePathRootDir}/config/consumerOffset.json。存储文件内容格式如下:

{	"offsetTable":{		"topic@group":{			"队列ID":位点,			.......		}	}}
复制代码

结构相对于简单,不再过多简述;

三. 功能模块

1. 客户端管理模块

主要是接收如下三类请求,具体的代码在 ClientManageProcessor:

1.1. 接受心跳请求

请求码 HEART_BEAT 接收客户端的发送过来的心跳数据,主要是包含生产者、消费者相关的数据;

我们先看一下整体的数据结构,如图所示:

上面列出了在 Broker 端对客户端中的消费者以及生产者尽心管理的关键类以及数据结构。同时也列出了客户端发送心跳的数据结构;

当接收到心跳数据时,主要做几大步骤:

  • broker 会去 TopicConfigManager 注册对应的重试 topic 信息;

  • 向 ConsumerManager 注册消费者信息;这里有分为好几个子步骤

  • 更新 consumerTable 数据;

  • 如果有发生变更,则向各个客户端发送 NOTIFY_CONSUMER_IDS_CHANGED 请求,通知消费者 IDS 已经发生变更;

  • 向 ConsumerFilterManager 注册过滤信息;

  • 向 ProducerManager 注册生产者;

1.2. 注销客户端

请求码 UNREGISTER_CLIENT,接收客户端发送过来的注销数据,其逻辑与心跳请求完全相反;这里就不在过多讲述;

请求结构如下:

1.3. 检查客户端配置

请求码 CHECK_CLIENT_CONFIG, 主要是检查订阅相关的配置信息;请求 body 如下:

1.4 执行器

只处理注销客户端、检查客户端配置的请求的执行,而注册心跳请求的,则用专门的的执行器来处理,我们通过配置项可以配置其容量等信息

  • clientManagerThreadPoolQueueCapacity 客户端管理线程池任务队列初始大小,默认 100,0000

  • clientManageThreadPoolNums 服务端处理客户端管理(注册、取消注册)线程池线程数量,默认为 32

拒绝策略为中断,直接抛出异常;

  • heartbeatThreadPoolQueueCapacity 客户端管心跳处理线程池任务队列初始大小,默认 50000

  • clientManageThreadPoolNums 服务端处理客户端管理(心跳)线程池线程数量,如果超过 32 核数的话,则选择 32,否则 CPU 核数;

拒绝策略为中断,直接抛出异常;

2. 消费者管理模块

主要是接收如下三类请求,具体的代码在 ConsumerManageProcessor:

2.1. 获取消费者列表

请求码 GET_CONSUMER_LIST_BY_GROUP,其逻辑是直接从 ConsumerManager 对象中直接获取客户端信息,逻辑比较简单;

请求 body

2.2. 更新消费位点

请求码 UPDATE_CONSUMER_OFFSET,其逻辑通过 ConsumerOffsetManager 对象来更新消费位点

2.3. 查询消费位点

请求码 QUERY_CONSUMER_OFFSET,其逻辑通过 ConsumerOffsetManager 对象来查询消费位点

2.4 执行器

  • consumerManageThreadPoolNums 服务端处理消费管理(获取消费者列表、更新消费进度、查询消费进度)线程池线程数量,默认为 32

3. 最终事务处理模块

其主要是处理请求码 END_TRANSACTION,也就是最终事务请求;在两阶段中,就是第二阶段的提交;具体的代码在 EndTransactionProcessor:

3.1 执行器

  • endTransactionThreadPoolNums 默认为 8 加上当前操作系统 CPU 核数的两倍

  • endTransactionPoolQueueCapacity 默认值 100000

4. 转发请求模块

主要的类 ForwardRequestProcessor,目前没有在使用;先忽略;

5. 拉取消息模块

主要是处理类请求码 PULL_MESSAGE 的消息拉取请求,具体的代码在 PullMessageProcessor:具体的入口 processRequest 方法;其主要的逻辑是在前期做大量的校验工作,如权限校验,topic 校验,订阅组校验等等,具体细节需要看里面的代码才行;接着通过 Store 模块中获取对应的消息数据;接着有选择的更新消费位点;

然而这里值得注意的地方,不要随意更改其一些关键的配置,如:

  • autoCreateSubscriptionGroup 是否自动创建消费组订阅配置信息,默认值为 true;一旦该为 false,那么就会出现拉取失败的现象,除非等心跳完后,broker 自动将订阅组配置信息记录后,才可以进行拉取消息成功;

5.1 执行器

有关接收客户端拉取消息请求的线程池执行器,我们通过配置项可以配置其容量等信息

  • pullThreadPoolQueueCapacity 消息拉取线程池任务队列初始大小,默认为 100000;

  • pullMessageThreadPoolNums 服务端处理消息拉取线程池线程数量,默认为 16 加上当前操作系统 CPU 核数的两倍

拒绝策略为中断,直接抛出异常;

5.2 长轮询机制

当消费者定时消费时,向 Broker 拉取消息,Broker 找不到其消息时,如果满足两个条件后,就会将该请求保存到 PullRequestHoldService 后台线程的集合属性 pullRequestTable,等待生产者推送消息过来;条件如下:

  • 允许 Broker 阻塞其请求,默认是为 true;

  • 消息中 SysFlag 是否带有阻塞标志,PullSysFlag.FLAG_SUSPEND=0x10

相关的长轮询机制的配置项如下:

  • longPollingEnable 是否支持长轮询,默认为 true;

  • shortPollingTimeMills 短轮询毫秒数,默认值为 1s;

如果 longPollingEnable 为 true,则间隔 5s 去处理 pull 请求;否则间隔 shortPollingTimeMills(1s)去处理 pull 请求;

6. 查询消息模块

主要是接收如下查阅消息的请求,具体的代码在 QueryMessageProcessor:

6.1. 查询消息

请求码 QUERY_MESSAGE,其通过 IndexFile 去查询,有关的可以查阅RocketMQ 之 Store 模块,以及对应的代码;不算太复杂;

6.2 查阅消息

请求码 VIEW_MESSAGE_BY_ID,其通过 CommitLog 去查询,有关的可以查阅RocketMQ 之 Store 模块,以及对应的代码;不算太复杂;

6.3 执行器

有关接收客户端查阅消息的线程池执行器,我们通过配置项可以配置其容量等信息

  • queryThreadPoolQueueCapacity 查询消息线程池任务队列初始大小,默认为 20000;

  • queryMessageThreadPoolNums 服务端处理查询消息线程池线程数量,默认为 8 加上当前操作系统 CPU 核数

拒绝策略为中断,直接抛出异常;

7. 回复消息模块

主要是接收带有回复性质的消息发送请求,具体的代码在 ReplyMessageProcessor:

当生产者发送消息后,broker 接收该请求后,会将该消息带原封不动的返回给客户端;

当配置项 storeReplyMessageEnable 设置为 true 时,会将该消息存放到 broker 持久化模块中;

具体的流程如下:

  • 客户端发送带有回复性质的消息给到 broker,并创建 RequestResponseFuture 对象;等待 broker 端通通过 PUSH_REPLY_MESSAGE_TO_CLIENT 这个接口发送数据给客户端;是基于 CORRELATION_ID 来查找对应的 RequestResponseFuture 对象;

  • Broker 端口收到该请求后,会原封不动的将该数据通过 PUSH_REPLY_MESSAGE_TO_CLIENT 接口发回给客户端;如果 storeReplyMessageEnable 为 true 时,会将该消息存放到 broker 持久化模块中;

  • 客户端通过 RequestResponseFuture 对象拿到刚开始的消息;

不太清楚为什么会这样子设计,总感觉有点怪怪的;

7.1 执行器

有关接收客户端带有回复性质的消息发送请求的线程池执行器,我们通过配置项可以配置其容量等信息

  • replyThreadPoolQueueCapacity 带有回复性质的消息发送请求的线程池任务队列初始大小,默认为 10000;

  • processReplyMessageThreadPoolNums 服务端处理消息拉取线程池线程数量,默认为 16 加上当前操作系统 CPU 核数的两倍

拒绝策略为中断,直接抛出异常;

8. 发送消息模块

主要是接收客户端发送过来的消息,这里消息有一般的消息,也有重试消息的;这里稍微讲一下重试消息的概念,Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次;具体的代码在 SendMessageProcessor;

8.1 一般消息的发送

请求码包含 SEND_MESSAGE、SEND_MESSAGE_V2、SEND_BATCH_MESSAGE;

这里会区分是否是事务消息;

如果是事务消息,会多出一个步骤;会将该消息进行篡改,将真实的 topic 以及队列 ID 放入到属性当中,然后将该消息保存到事务队列中去,然后再通过 Store 将消息保存到文件当中去;

8.2 重试消息

请求码 CONSUMER_SEND_MSG_BACK,这个跟事务消息的处理逻辑差不多;

会将该消息保存到重试队列中去:其会将该消息保存到延迟队列中去,然后等待指定时间后再保存到重试队列中去;

如果超过了重试次数,则将该消息保存到死信队列中去;

有关重试消息的,这里稍微提醒一下:在消费者中,只有在集群模式下,才会自动去消费重试队列中的消息;后续会对 RocketMQ 的特性进行一篇介绍;

8.3 执行器

有关接收客户端发送消息的请求的线程池执行器,我们通过配置项可以配置其容量等信息

  • sendMessageThreadPoolNums 服务端处理发送线程池线程数量,默认为 1

  • sendThreadPoolQueueCapacity 消息发送线程池任务队列初始大小,默认为 10000;

拒绝策略为中断,直接抛出异常;

8.4 拒绝消息发送请求策略

如果大量的发送请求过来,且线程池没有达到峰值,SendMessageProcessor 内部也提供了对应的拒绝该请求;具体代码如下:

//SendMessageProcessorpublic boolean rejectRequest() {  return this.brokerController.getMessageStore().isOSPageCacheBusy() /*磁盘是否繁忙*/ ||    this.brokerController.getMessageStore().isTransientStorePoolDeficient()/**/;}
复制代码

拒绝策略的相关的配置项;

  • osPageCacheBusyTimeOutMills putMessage 锁占用超过该时间,表示 PageCache 忙,默认为 1s

  • transientStorePoolEnable Commitlog 是否开启 transientStorePool 机制,默认为 false.

    transientStorePoolSize transientStorePool 中缓存 byteBuffer 个数,默认 5 个

  • flushDiskType 刷盘方式,默认为 ASYNC_FLUSH(异步刷盘),可选值:SYNC_FLUSH(同步刷盘)


9. Broker 管理模块

主要对 broker 进行管理工作,关键类:AdminBrokerProcessor。

有关执行器介绍如下:

  • adminBrokerThreadPoolNums 服务端处理控制台管理命令线程池线程数量,默认为 16


功能如下:

9.1 topic 相关的接口

  • UPDATE_AND_CREATE_TOPIC 更新或创建 Topic

  • DELETE_TOPIC_IN_BROKER 删除 Topic

  • GET_ALL_TOPIC_CONFIG 获取所有 Topic 的配置信息

9.2 Broker 相关的接口

  • UPDATE_BROKER_CONFIG 更新 Broker 配置信息

  • GET_BROKER_CONFIG 获取 Broker 配置信息

  • SEARCH_OFFSET_BY_TIMESTAMP 根据时间戳来查找消费位点

  • GET_MAX_OFFSET 获取 topic 和 queueId 下的最大位点

  • GET_MIN_OFFSET 获取 topic 和 queueId 下的最小位点

  • GET_EARLIEST_MSG_STORETIME 获取 topic&queueId 下的最早的消息存储时间

  • GET_BROKER_RUNTIME_INFO 获取 Broker 运行时信息

  • LOCK_BATCH_MQ & UNLOCK_BATCH_MQ 主要给加上锁以及释放锁,这一块还没理解太清楚

9.3 订阅组相关的接口

  • UPDATE_AND_CREATE_SUBSCRIPTIONGROUP 更新或创建订阅组信息

  • GET_ALL_SUBSCRIPTIONGROUP_CONFIG 获取所有的订阅组配置信息

  • DELETE_SUBSCRIPTIONGROUP 删除订阅组信息

9.4 统计 &状态相关的接口

  • VIEW_BROKER_STATS_DATA 查阅 Broker 统计数据

  • GET_BROKER_CONSUME_STATS 获取 Broker 消费统计数据

  • GET_CONSUMER_RUNNING_INFO 获取消费者运行时信息

  • INVOKE_BROKER_TO_GET_CONSUMER_STATUS 获取 broker 中的消费者状态信息

  • GET_CONSUME_STATS 获取消费统计信息

  • GET_TOPIC_STATS_INFO 获取 topic 下的统计信息

  • GET_CONSUMER_CONNECTION_LIST 获取消费者连接信息列表

  • GET_PRODUCER_CONNECTION_LIST 获取生产者连接信息列表

9.5 查询相关的接口

  • GET_ALL_CONSUMER_OFFSET 获取所有消费者消费位点

  • GET_ALL_DELAY_OFFSET 获取所有延迟队列消费位点

  • QUERY_TOPIC_CONSUME_BY_WHO 查询 topic 下消息是被谁消费

  • GET_SYSTEM_TOPIC_LIST_FROM_BROKER 获取系统 topic 列表

  • QUERY_CONSUME_QUEUE 查询消费队列

  • QUERY_CORRECTION_OFFSET 查询正确的消费位点;目前没有过多深入查看其接口逻辑;先忽略;

9.6 安全相关的接口

  • UPDATE_AND_CREATE_ACL_CONFIG 更新和创建 ACK 配置信息

  • DELETE_ACL_CONFIG 删除 ACL 配置信息

  • GET_BROKER_CLUSTER_ACL_CONFIG 获取 Broker 集群 ACL 配置信息

  • GET_BROKER_CLUSTER_ACL_INFO 获取 Broker 集群的 ACL 信息

  • UPDATE_GLOBAL_WHITE_ADDRS_CONFIG 更新全局白名单配置信息

9.7 其他服务接口

  • RESUME_CHECK_HALF_MESSAGE 将之前的消息重新保存到 Half 队列中

  • CLEAN_EXPIRED_CONSUMEQUEUE 清除过期消费队列

  • REGISTER_FILTER_SERVER 注册过滤服务器

  • QUERY_CONSUME_TIME_SPAN 查询消费

  • INVOKE_BROKER_TO_RESET_OFFSET 重置 Broker 中的消费位点

  • CLEAN_UNUSED_TOPIC 清除未使用的 topic

  • CONSUME_MESSAGE_DIRECTLY 直接消费信息

  • CLONE_GROUP_OFFSET 克隆订阅组消费位点


四. 总结

经过大体的梳理,画出了对应的图形,便于对 Broker 有一定的认知,查阅相关的代码可以更进一步了解其原理


发布于: 3 小时前阅读数: 7
用户头像

邱学喆

关注

计算机原理的深度解读,源码分析。 2018.08.26 加入

在IT领域keep Learning。要知其然,也要知其所以然。原理的爱好,源码的阅读。输出我对原理以及源码解读的理解。个人的仓库:https://gitee.com/Michael_Chan

评论

发布
暂无评论
深入了解RocketMQ之Broker