深入了解 RocketMQ 之客户端
一. 概述
RocketMQ 的客户端主要是生产者与消费者;
二. 消息结构
我们在发送消息时,主要是封装 Message 或 MessageBatch 对象进行发送;而 MessageExt 对象以及其子类,主要是 Broker 接受请求后,将其对象 Message 或 MessageBatch 对象封装成 MessageExt 的其子类保存到 CommitLog;
在发送消息时,里面的一些属性都容易根据其属性名可以大概猜出它的用意,里面有两个属性,需要特别介绍的:
flag 暂时不知道有什么用,可以先理解为用户自定义的标签
properties 存储 message 额外的属性值,在 message 提供的方法中,设置了几个关键的属性,我们查看 MessageConst 常量中的个别属性;
三. 关键类总览
DefaultMQProducer 是生产者对象,专门用来向 Broker 服务器发送消息的;
TransactionMQProducer 是有关生产者对象是针对事务型消息的发送;
DefaultMQPullConsumer 主动从 Broker 中拉取消息和消费消息,提交消费位点
然而上述的实现类都是门面,具体操作还是交给其对象中的一个关键属性对象,即类图中标红的属性对象;
1. ClientConfig
上图是生产者类与消费者类的类图,这三个实现类都继承了 ClientConfig 配置项;该类图如下:
介绍该类的属性用途:
namesrvAddr NameServer 的地址,多个用“;”分开;****
clientIP 客户端的 IP 地址
instanceName 客户端的实例名称,默认值为 DEFAULT
clientCallbackExecutorThreads 客户端回调线程数
namespace 命名空间, 如果不为空,会往消息属性中添加 INSTANCE_ID 对应的 value;
accessChannel 主要用于跟踪用途用的,这篇并没有涉及到该知识点,想需要了解的细节的,可以先查阅 TraceDispatcher 对应的实现类 AsyncTraceDispatcher;
pollNameServerInterval 获取 topic 路由信息的间隔时长,单位为 ms,默认为 30s;
heartbeatBrokerInterval 与 Broker 心跳间隔的时长,单位为 ms,默认为 30s;
persistConsumerOffsetInterval 持久化消费位点的间隔时长,单位为 ms,默认为 5s;
pullTimeDelayMillsWhenException 消息拉取异常时,延迟拉取消息;相关的代码 PullMessageService 以及 DefaultMQPushConsumerImpl.pullMessage 方法的代码;
unitMode 这个目前没有找到是怎么用的
unitName 这个目前没有找到是怎么用的
vipChannelEnabled 是否开启 VIP 通道,VIP 通道与非 VIP 通道的区别是:在通信过程中使用的端口号不同;VIP 通道的端口=非 VIP 通道端口-2
useTLS 是否使用 ssl 方式交互
2. DefaultMQProducerImpl
DefaultMQProducer 对象发送消息时,有三种发送方式:
同步发送,带有阻塞性质的发送,向 Broker 发送消息,等待响应结果;
异步发送,会交由线程池去执行发送动作,一般情况下都会设置回调函数;
单向发送,不等待 Broker 服务器的结果;
接下来我们看一下门面对象 DefaultMQProducer 中关键属性,现在先介绍 DefaultMQProducerImpl
topicPublishInfoTable 存放 topic 对应的映射关系,生产者会根据其信息通过 MQFaultStrategy 进行选择最合适的 Broker 进行发送消息;至于 topicPublishInfoTable 信息是从 NameServer 服务器拉取,具体细节在后面详细介绍。
sendMessageHookList 有关发送消息的回调动作集合
endTransactionHookList 有关事务消息的确认动作所触发的动作集合,事务消息发送拆分成两个阶段,第一阶段是预提交,第二阶段是真正的提交。所以“确认动作”就是第二阶段的动作
rpcHook 有关调用远程调用所触发的回调函数;
asyncSenderThreadPoolQueue 存放异步发送消息任务的容器;默认值容量为 50000
defaultAsyncSenderExecutor 默认的异步发送消息的执行器,核心线程数与最大线程数跟 CPU 的核数保持一致;
timer 定期清理超时的请求,频率为 1s
checkRequestQueue 针对事务消息的确认任务
checkExecutor 针对事务消息的确认任务执行器;主要是接受 Broker 端发送过来的事务确认请求;具体代码在 checkTransactionState 方法;
mQClientFactory 有关客户端的实例对象,该对象稍后结合消费者对象统一介绍;
checkForbiddenHookList 用于在发送消息前检测是否允许发送,如果无权发送,则需要抛出异常;
zipCompressLevel 用于对消息中的 body 内容进行压缩,默认值为 5;我们可以通过系统变量 rocketmq.message.compressLevel 进行配置;具体压缩代码在 UtilAll.compress;其会判断消息 body 中的内容大小是否超过指定的阈值,如果超过则进行压缩,并且在消息中的 sysFlag 中添加压缩标志;该阈值默认为 4K
mqFaultStrategy 有关路由的决策对象;
asyncSenderExecutor 如果有进行设置,会优先使用这个执行器替代 defaultAsyncSenderExecutor 默认异步发送消息的执行器;
3. MQConsumerInner
消费者消费消息时一般有两种方式:
主动拉取,客户端手动拉取消息,进行消费;
被动拉取,主要指的是客户端定时拉取消息,进行消费;
同时需要记录消费情况,即“消费位点”,接口 OffetStore;
上面有关消费者的类图对象中几个关键属性:
mQClientFactory 有关客户端的实例对象,该对象稍后结合消费者对象统一介绍;
pullAPIWrapper 用于向 Broker 拉取消息,最终发送拉取请求是由 mQClientFactory 对象中 MQClientAPIImpl 对象去发送;
offsetStore 记录消息被消费的记录位点;
rebalanceImpl 用来做消费者负载均衡,该对象是由后台一个线程 RebalanceService 去调用;这个稍后重点介绍。
DefaultMQPullConsumerImpl
由客户端主动拉取消息进行消费,目前已设置为废弃状态,将在 2022 年移除;要想实现定时拉取消息的,需要 MQPullConsumerScheduleService 对象去配合使用,同时还要实现 PullTaskCallback 接口;
上面的对象中,红色标注的几大关键属性稍微提前介绍一下:
DefaultMQPullConsumerImpl 对象就不在介绍了,重点介绍其他两个对象的关键属性。
DefaultMQPushConsumerImpl
其原理是客户端后台线程定时拉取消息进行消费,同时会记录其消费位点;
messageListenerInner 用来监听消息,一般由应用系统自行实现业务逻辑;会配合 consumeMessageService 使用;
consumeMessageService 封装了线程池,用来消费消息;
queueMaxSpanFlowControlTimes 以及 queueFlowControlTimes 属性,当触发流量控制后,每 1000 次就会打印一次警告日志;
DefaultLitePullConsumerImpl
跟 DefaultMQPullConsumerImpl 有点类似,主要的不同点是系统自动帮忙更新消费位点;还有一点不同的是消息拉取方式采用后台线程定时拉取,放入缓存。但是不会主动去消费消息,而是有应用程序主动通过 poll 方法去获取消息,自行消费消息;具体的细节将会在下文详细介绍;
subscriptionType 订阅类型,决定后台线程 PullTaskImpl 如何选择订阅对象;值得注意的是,只能支持一种形式,意味着只能调用 DefaultLitePullConsumerImpl 对象中的 assign 方法或者 subscribe,不能同时调用这两个方法;
taskTable 记录这后台线程任务信息
assignedMessageQueue 用来存放订阅 topic 的信息,这个属性名有一定的误导;如果将其重命名为 subscriptionMessageQueue,应该就很清晰;
consumeRequestCache 用来缓存从 Broker 拉取过来的消息;
scheduledThreadPoolExecutor 用来添加添加 PullTaskImpl 拉取任务的;
messageQueuesForTopic 存放 topic 中对应的 messageQueue 集合信息
topicMessageQueueChangeListenerMap 存放 topic 下对应的 messageQueue 发生变更时所触发的监听器动作
scheduledExecutorService 用来定时调度从 NameServer 中拉取 topic 下所有的 messageQueue 集合信息与 messageQueuesForTopic 进行比对覆盖;如果有发生变更的,则触发对应的监听器动作;频率默认值 30s;这个跟 MQClientInstance 中的拉取路由信息感觉有点冲突。
messageQueueLock 用来管理 MessageQueue 对应的锁的对象;
queueMaxSpanFlowControlTimes 以及 queueFlowControlTimes 属性,当触发流量控制后,每 1000 次就会打印一次警告日志;
4. MQClientInstance
该对象是重点对象,消费者和生产者对象内部都包含该对象。该对象主要是封装了 RPC 方法,同时增加了一些特性,如定时发送心跳、定时更新路由信息、定时清理无效的 broker 信息、定时持久化消费位点、定时调整线程池配置;还有起了后台线程,如 RebalanceService 去做平衡处理、PullMessageService 去拉取消息;但是该类角色有点定位不明确,感觉什么事都都做;在代码设计方面,该设计有点不合理,功能拆分不够单一;另外,PullMessageService 只有在 DefaultMQPushConsumerImpl 对象中才会有用到,因为 DefaultMQPullConsumerImpl 不会通过 PullMessageService 定时去拉取消息而是通过 MQPullConsumerScheduleService 中的 PullTaskImpl 去拉取消息,DefaultLitePullConsumerImpl 是通过其内部类 PullTaskImpl 去拉取;这只是我个人的感觉,仅供参考;
接下来介绍其中的关键的属性:
producerTable 存放生产者对象实例
consumerTable 存放消费者对象实例
adminExtTable 存放管理者对象实例
topicRouteTable 存放 topic 下的路由信息
brokerAddrTable 存放 broker 信息
scheduledExecutorService 单线程执行器,用来执行如下工作
fetchNameServerAddr 如果没有配置 nameServer 服务器地址,定时向 wsAddr 去拉取该服务器信息;频率为 2min
updateTopicRouteInfoFromNameServer 定时向 NameServer 服务器拉取 topic 下的路由信息;默认的频率 30s
cleanOfflineBroker 定时清理下线的 Broker 信息,默认的频率 30s
sendHeartbeatToAllBrokerWithLock 定时发送心跳给到 broker 服务器,默认的频率为 30s
persistAllConsumerOffset 定时持久化消费位点,默认的频率 5s
adjustThreadPool 调整各个对象实例的线程池参数,频率 1min
收集消费者统计信息,里面有很多任务,具体可以查阅 ConsumerStatsManager 对象。这个不是这篇的重点
clientRemotingProcessor 有客户端接受 broker 发送过来的请求处理器,主要处理的请求如下:
CHECK_TRANSACTION_STATE 确认事务消息的状态
NOTIFY_CONSUMER_IDS_CHANGED 通知消费者 IDS 已经发生变更,让客户端立即做平衡操作,即 Rebalance 操作;
RESET_CONSUMER_CLIENT_OFFSET 重置消费者的消费位点
GET_CONSUMER_STATUS_FROM_CLIENT 获取消费者的状态,将会被遗弃,主要用 GET_CONSUMER_RUNNING_INFO 来发送
GET_CONSUMER_RUNNING_INFO 获取消费者的运行状态信息
CONSUME_MESSAGE_DIRECTLY broker 直接发送消息给消费者,消费者直接消费该消息;
PUSH_REPLY_MESSAGE_TO_CLIENT 接受 broker 的回复消息;
mQClientAPIImpl 在客户端提起服务器,接受 broker 发送过来的请求;同时封装了 RPC,用来向 broker 以及 NameServer 服务器发送请求;
pullMessageService 拉取消息服务,是一个后台线程;
rebalanceService 平衡操作服务,会调用对象实例中的 RebalanceImpl 对象的 doRebalance 方法;是一个后台线程
一个疑问:如果在应用系统中同时有生产者对象和消费者对象,那么其对应 MQClientInstance 是同一个对象实例,还是不同的实例对象?
我们通过代码可以找到 MQClientManager 单例对象,看到其根据 ClientConfig 中的 clientIP、instanceName、unitName 组合一个客户端 ID,来决定是否是同一个 MQClientInstance。所以我们一般操作下,没有设置对应的属性值,意味着生产者与消费者都公用同一个 MQClientInstance 对象;
5. 总结
通过上述的关键类的介绍以及对象中的关键属性介绍,对其原理有一定的了解;想要进一步深入,需看起代码;
现在对其进行总结,
生产者发送消息,关键的一个步骤是路由选择,其后续会重点介绍;当然还有一个消息的确认过程;
消费者消费消息,关键的步骤如下:
消息队列分配策略,即客户端该消费哪些 broker 中的消息,具体的接口 AllocateMessageQueueStrategy,后面会重点介绍;
消费位点管理,客户端根据消费类型,如集群消费,广播消费等方式,是如何记录消息被消费的;
里面还有一些属性还没介绍的,如流量控制,所谓流量控制,就是有效的减少 Broker 压力机制;后面会重点介绍;
四. 生产者
1. 负载均衡
Producer 端在发送消息的时候,会先根据 Topic 找到指定的 TopicPublishInfo,在获取了 TopicPublishInfo 路由信息后,RocketMQ 的客户端在默认方式下 MQFaultStrategy.selectOneMessageQueue()方法会从 TopicPublishInfo 中的 messageQueueList 中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在 MQFaultStrategy 这个类中定义。这里有一个 sendLatencyFaultEnable 开关变量,如果开启,在随机递增取模的基础上,再过滤掉 not available 的 Broker 代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的 latency 超过 550Lms,就退避 30s;超过 1000L,就退避 60s;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance 机制是实现消息发送高可用的核心关键所在。
规避毫秒: 50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L
不可用毫秒:0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L
当请求超时时,默认设置不可用为 10min,意味这个 Broker 在 10 分钟内不可用,不会发消息给到这个 Broker 服务器;
当请求成功时,请求所消耗的时间在规避哪个区间,假设请求所消耗 1100ms,那么会设置该 broker 在 60s 内不可用;
2. 事务消息
发送事务消息会分为两个步骤,
对消息中的属性进行相关操作,接着向 Broker 发送;
properties 中添加 MessageConst.PROPERTY_TRANSACTION_PREPARED 的 key 为 true,
设置 sysFlag 与 MessageSysFlag.TRANSACTION_PREPARED_TYPE 进行或操作。
发送确认请求;
在客户端中会保留本地事务监听器 TransactionListener 或者 LocalTransactionExecuter,该监听器会记录其本地事务消息的记录信息;
当第一步发送失败时,失败抛出异常,中断这次事务消息的发送;如果第二步发送失败,则会忽略该异常,意味着只要第一次发送成功,这个事务消息就会成功;之所以这样做,是因为在客户端中有一个后台执行器去监听 Broker 发送过来的 CHECK_TRANSACTION_STATE 的请求,从而触发客户端再次第二步的事务确认消息发送到 broker 中;
五. 消费者
1. 负载均衡
广播模式下每个节点都会消费完所有消息,是不存在负载均衡的;在集群模式下会进行负载,这里主要介绍消息队列分配策略,AllocateMessageQueueStrategy,其具体的实现如下:
AllocateMessageQueueAveragelyByCircle 环形平均分配,假设客户端 ID 集合为 3 个,自身客户端所在的位置为 1;队列集合的数量为 16,那么队列中的 1,4,7,10,13,15 都归该客户端去消费;该源码如果不采用遍历的形式,效率稍微会高一点;
AllocateMessageQueueByConfig 通过配置的形式指定客户端消费哪些队列中的消息
AllocateMessageQueueConsistentHash 一致性哈希分配;这里不是太了解 MD5 的 hash 算法,没太明白其逻辑;
AllocateMessageQueueAveragely 平均哈希分配算法, 假设客户端 ID 集合为 3 个,自身客户端所在的位置为 1,;队列集合的数量为 16,那么队列中的 5,6,7,8,9 都归该客户端去消费;这个是默认的分配算法
AllocateMessageQueueByMachineRoom 目前没有深入查阅其逻辑,先忽略;
AllocateMachineRoomNearby 目前没有深入查阅其逻辑,先忽略;
借用官方文档的一句话:消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。
2. 消费位点管理
记录消息队列的消费情况,这里主要讲的是消费者端的消费位点管理,没有涉及到 Broker 中的消费位点管理;
接口 OffsetStore 有两个实现类,一个是 LocalFileOffsetStore 用于本地消费端的位点管理,用于广播模式下的消费位点管理;另外一个是 RemoteBrokerOffsetStore 用于远程消费端的位点管理,用于集群模式下的消费位点管理;但远程不会没更新一次就向 Broker 更新其消费进度(位移),而是通过定时任务与 Broker 更新,频率为 5s。
我们先看 LocalFileOffsetStore,这里主要解读更新位点动作;
一般情况下,都是直接覆盖其位点;只有在并发消费的时候,会去比较其大小,如果比当前的位点要大则进行更新,否则将忽略;
RemoteBrokerOffsetStore 跟 LocalFileOffsetStore 逻辑差不多,只是多了与 Broker 交互逻辑;
这里稍微注意的是:在并发消费时更新其位点,有可能消息会被丢失;
六.流量控制
借用官方的文档介绍:
生产者流控,因为 broker 处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。
生产者流控:
commitLog 文件被锁时间超过 osPageCacheBusyTimeOutMills 时,参数默认为 1000ms,返回流控。
如果开启 transientStorePoolEnable == true,且 broker 为异步刷盘的主机,且 transientStorePool 中资源不足,拒绝当前 send 请求,返回流控。
broker 每隔 10ms 检查 send 请求队列头部请求的等待时间,如果超过 waitTimeMillsInSendQueue,默认 200ms,拒绝当前 send 请求,返回流控。
broker 通过拒绝 send 请求方式实现流量控制。
注意,生产者流控,不会尝试消息重投。
消费者流控:
消费者本地缓存消息数超过 pullThresholdForQueue 时,默认 1000。
消费者本地缓存消息大小超过 pullThresholdSizeForQueue 时,默认 100MB。
消费者本地缓存消息跨度超过 consumeConcurrentlyMaxSpan 时,默认 2000。
消费者流控的结果是降低拉取频率。
这里重点解读消费端的流量控制,具体的代码逻辑在定时拉取消息的执行器,会触发流量控制;
七. 总结
版权声明: 本文为 InfoQ 作者【邱学喆】的原创文章。
原文链接:【http://xie.infoq.cn/article/b10cde63dcc97a948c2f81c6a】。文章转载请联系作者。
评论