一张图进阶 RocketMQ - 消息发送
前 言
三此君看了好几本书,看了很多遍源码整理的 一张图进阶 RocketMQ 图片,关于 RocketMQ 你只需要记住这张图!觉得不错的话,记得点赞关注哦。
要】视频在 B 站同步更新,欢迎围观,轻轻松松涨姿势。一张图进阶 RocketMQ-消息发送(视频版)
本文是“一张图进阶 RocketMQ” 系列第 3 篇,对 RocketMQ 不了解的同学可以先看看三此君的一张图进阶 RocketMQ-整体架构,一张图进阶 RocketMQ - NameServer。在了解了 RocketMQ 的整体架构之后,我们来深入的分析下生产者消息发送的设计与实现。本文从一个生产者示例开始,以两行代码为切入点,逐步剖析生产者启动流程以及同步消息发送流程。
生产者示例
消息发送分为同步消息、异步消息和单向消息,简单来说:
同步消息:消息发送之后会等待 Broker 响应,并把响应结果传递给业务线程,整个过程业务线程在等待。
异步消息:调用异步发送 API,Producer 把消息发送请求放进线程池就返回。逻辑处理,网络请求都在线程池中进行,等结果处理完之后回调业务定义好的回调函数。
单向消息:只负责发送消息,不管发送结果。
我们先来回顾下同步消息发送的例子:
首先,实例化一个生产者
producer
,并告诉它 NameServer 的地址,这样生产者才能从 NameServer 获取路由信息。然后
producer
得做一些初始化(这是很关键的步骤),它要和 NameServer 通信,要先初始化通信模块等。producer
已经准备好了,那得准备好要发的内容,把 "我一定会关注三此君" 发送到 Topic=”sanicjun“。内容准备好,那
producer
就可以把消息发送出去了。producer
怎么知道 Broker 地址呢?他会去 NameServer 获取路由信息,得到 Broker 的地址是 localhost:10909,然后通过网络通信将消息发送给 Broker。生产者发送的消息通过网络传输给 Broker,Broker 需要对消息按照一定的结构进行存储。存储完成之后,把存储结果告知生产者。
其中有两个关键的地方:producer.start()
及 producer.send()
,也就是生产者初始化及消息发送。我们以这两行代码为切入点,看看 RocketMQ Producer 的设计与实现。
Tips:因为本文是 RocketMQ 设计与实现分析,虽然不会粘贴任何源码,但是图文中会有大量的类名和方法名,看的时候不必执着于这些陌生的类名和方法名,三此君会解释这些类和方法的用途。
生产者启动
我们实例化一个生产者 DefaultMQProducer,并调用 DefaultMQProducer.start() 方法进行初始化:
启动流程比较长,其实最重要的就是初始化了通信模块,并启动了多个定时任务,这些在后面的消息发送过程中都会用到:
检查配置是否合法:生产者组名是否为空、是否满足命名规则、长度是否满足等。
启动通信模块服务 Netty RemotingClient:RemotingClient 是一个接口,底层使用的通讯框架是 Netty,提供了实现类 NettyRemotingClient,RemotingClient 在初始化的时候实例化 Bootstrap,方便后续用来创建 SocketChannel;后文会介绍 RocketMQ 的通信机制,大家稍安勿躁。
启动 5 个后台定时任务:定时更新 NameServerAddr 信息,定时更新 topic 的路由信息,定时向 Broker 发送心跳及清理下线的 Broker,定时持久化 Consumer 的 Offset 信息,定时调整线程池;
生产者每 30s 会从某台 NameServer 获取 Topic 和 Broker 的映射关系(路由信息)存在本地内存中,如果发现新的 Broker 就会和其建立长连接,每 30s 会发送心跳至 Broker 维护连接。
Tips:生产者为什么要启动消息拉取服务?重平衡服务是什么?简单来说,这两个服务都是用于消费者的,这里我们暂且不理会。消息拉取服务 pullMessageService 是从 Broker 拉取消息的服务 ,重平衡服务 rebalanceService 用于消费者的负载均衡,负责分配消费者可消费的消息队列。
同步发送
总体上讲,消息发送可以划分为三个层级:
业务层:准备需要发送的消息。
消息处理层:获取业务发送的 Message,经过一系列的参数检查、消息发送准备、参数包装等操作。
通信层:基于 Netty 封装的一个网络通信服务,将消息发送给 Broker。
我们通过前面的示例来看整个同步消息发送的处理流程,整个过程我们的主要目标就是把消息发送到 Broker:
第一步:业务层构建待发送消息
Message msg = new Message("sancijun","order", "orderId", "我一定会关注三此君".getBytes("UTF-8"));
第二步:然后我们调用
producer.send(msg)
发送消息,可是 producer 怎么知道发给谁呢?消息本身又需要经过哪些处理呢?我们进入调用链直到 sendDefaultImpl检查消息是否为空,消息的 Topic 的名字是否为空或者是否符合规范,消息体大小是否符合要求,最大值为 4MB,可以通过 maxMessageSize 进行设置。
执行 tryToFindTopicPublishInfo() 方法:获取 Topic 路由信息,如果不存在则抛出异常。如果本地缓存没有路由信息,就通过 Namesrv 获取路由信息,更新到本地。消息构建的时候我们指定了消息所属 Topic,根据 Topic 路由信息我们可以找到对应的 Broker。
Tips:从 NameServer 获取的路由信息 TopicRouteData 会包含指定 Topic 的 topicQueueTable、brokerAddrTable。在 NameServer 集群元数据管理部分我们讲过,通过 topicName 从 topicQueueTable 获取对应的 brokerName,再根据 brokerName 从 brokerAddrTable 中获取 Broker IP 地址。
计算消息发送的重试次数,同步重试和异步重试的执行方式是不同的。在同步发送情况下如果发送失败会默认重投两次(默认 retryTimesWhenSendFailed = 2),并且不会选择上次失败的 Broker,会向其他 Broker 投递。
执行队列选择方法 selectOneMessageQueue()。根据 lastBrokerName(上次发送消息失败的 Broker 的名字)和 Topic 路由信息选一个 MessageQueue。首次发送时 lastBrokerName 为 null,采用轮询策略选择一个 MessageQueue。如果上次发送失败,也是采用轮询策略选择一个 MessageQueue,但是会跳过上次发送失败 Broker 的 MessageQueue,也就是换一个 Broker 发送。
Tips:选择一个 MessageQueue,什么是 MessageQueue 呢?这和 Broker 的存储结构相关,我们会在存储部分详细介绍,这里先说结论,我们创建 Topic 时指定了这个 Topic 的读写队列数,每个 MessageQueue 有不同的 queueId(0-3)。
我们也可以通过 sendLatencyFaultEnable 来设置是否总是发送到延迟级别较低的 Broker,默认值为 False,我么这里就不展开讨论了。
执行 sendKernelImpl() 方法。
第三步:sendDefaultImpl 做了一系列逻辑处理,我们已经得到了待发送的 BrokerName,而我们的目标是把消息发送到 Broker。sendKernelImpl 方法是发送消息的核心方法,主要用于准备通信层的入参(比如 Broker 地址、请求体等),将请求传递给通信层。
根据 MessageQueue.brokerName 获取 Broker IP 地址,给 message 添加全局唯一 ID。
Tips:sendKernelImpl 也有很多的逻辑处理,我们暂时先略过这里的压缩、事务消息、钩子函数、重试消息:
对大于 4k 的普通消息进行压缩,并设置消息的系统标记为 MessageSysFlag.COMPRESSED_FLAG。
如果是事务 Prepared 消息,则设置消息的系统标记为 MessageSysFlag.TRANSACTION_PREPARED_TYPE
如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑,通过 DefaultMQProducerImpl#registerSendMessageHook 注册钩子处理类,并且可以注册多个。
构建发送消息请求头:生产者组、主题名称、默认创建主题 Key、该主题在单个 Broker 默认队列数、队列 ID(队列序号)、消息系统标记(MessageSysFlag)、消息发送时间、消息标记、消息扩展属性、消息重试次数、是否是批量消息等
处理重试消息。
调用 MQClientAPIImpl.sendMessage(),首先构建一个远程请求 RemotingCommand,根据发送类型(同步或异步)调用不同的通信层实现方法。我们这里是同步消息,则调用
RemotingClient.invokeSync()。
处理返回结果,将通信层返回的结果封装成 SendResult 对象返回给业务层。
第四步:RemotingClient 是基于 Netty 实现的,熟悉 Netty 的同学已经大概知道后面的流程,不熟悉的同学也没有关系,这里先混个眼熟,下面我们会对 Netty 做简单的介绍。
RemotingClient.invokeSync() 先是通过 Broker Addr 获取或者创建 Netty Channel。先从 channelTables Map 本地缓存中,以 Broker Addr 为 key 获取 Channel,没有获取到则通过 Netty Bootstrap.connect( Broker Addr) 创建 Channel,并放入缓存。
然后生成<opaque, ResponseFuture>的键值对放入 responseTable 缓存中,结果返回的时候根据 opaque 从缓存中获取结果。
调用 channel.writeAndFlush() 将消息通过网络传输给指定 Broker。这里是 Netty 框架的 API,已经不在 RocketMQ 范畴。
调用 ResponseFuture.waitResponse() 方法,直到 Netty 接收 Broker 的返回结果。其实就是执行 countDownLatch.await()。
第五步:结果处理及返回。
Broker 处理结果返回,Netty 产生可读事件,由 Channelhandler 处理可读事件,这里是 NettyClientHandler.channelRead0()接收写入数据,处理可读事件。
然后处理返回结果,从 responseTable 取出 ResponseFuture,并执行 responseFuture.putResponse()。实际上就只执行 countDownLatch.countDown() 唤醒第四步中等待的调用线程,返回 Broker 的处理结果 RemotingCommand。
结果层层返回,直到 MQClientAPIImpl.sendMessageSync() 出手了,这里调用 MQClientAPIImpl.processSendResponse() 处理返回结果,封装成 SendResult 对象返回给业务层。
到这里,生产者已经将消息发送到指定的 Broker 了,其中包括了消息的层层校验及封装;还有很重要的是如何选择一个 MessageQueue 进行发送(重试),重试是保证消息发送可靠的关键步骤;最后通过 Netty 将请求发送给 Broker。我们先不管 Broker 收到请求如何处理,但是要明白消息如何送到 Broker 进行存储,需要对 Netty 有简单的理解。
总结
以上就是 RocketMQ 消息发送的主要内容,我们简单的总结下:
生产者启动:主要是调用 NettyRemotingClient.start() 初始化 Netty 客户端,并启动 5 个后台线程;
消息发送:业务层封装发送的消息,逻辑层进行层层校验及封装,轮询策略选择一个 MessageQueue 发送(重试),通信层基于 Netty 将消息发送给 Broker。
参考文献
丁威, 周继锋. RocketMQ 技术内幕:RocketMQ 架构设计与实现原理. 机械工业出版社, 2019-01.
李伟. RocketMQ 分布式消息中间件:核心原理与最佳实践. 电子工业出版社, 2020-08.
杨开元. RocketMQ 实战与原理解析. 机械工业出版社, 2018-06.
版权声明: 本文为 InfoQ 作者【三此君】的原创文章。
原文链接:【http://xie.infoq.cn/article/068ae86205389bd51569a5205】。文章转载请联系作者。
评论