RocketMQ 学习笔记
根据最近对 RocketMQ 的学习,记录了对应的学习笔记和知识点。
学习过程应该从以下几个方面入手:
消息队列的作用、应用场景和通用知识点
目前主流消息队列产品,以及各自的特点
根据消息队列产品特点和自身业务场景,做出消息队列的选型
深入学习一种消息队列知识,例如 RocketMQ,从基本概念、编码使用、实现原理、集群部署、生产运维等
下面的学习笔记只包括的 RocketMQ 本身基础概念、关键设计相关的知识点,其他相关的知识待后续整理。
1. 基础概念和特性
消息模型(Message Model)
RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。
Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个 Topic 的消息也可以分片存储于不同的 Broker。
Message Queue 用于存储消息的物理地址,每个 Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。
RocketMQ 中的 Queue 的概念和 Kafka 中的分区概念是一致的,有些文档为了通用,提到的分区的概念,等同于 RocketMQ 中的 Queue。
消息顺序性
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个 Topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一个分区的消息被顺序消费即可。
例如一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。
RocketMQ 可以保证分区里的消息是顺序性的,如果需要全 Topic 顺序性,可以用一些折中的方案,例如把 Topic 只设置成 1 个 Queue(默认是 4 个)。
消息过滤
支持按照 Tag 进行过滤,也支持自定义属性过滤,消息过滤是在 Broker 端实现。优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂。
消息可靠性
RocketMQ 支持消息的高可靠,影响消息可靠性的几种情况:
1) Broker 非正常关闭
2) Broker 异常 Crash
3) OS Crash
4) 机器掉电,但是能立即恢复供电情况
5) 机器无法开机(可能是 cpu、主板、内存等关键设备损坏)
6) 磁盘设备损坏
1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ 在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ 在这两种情况下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。注:RocketMQ 从 3.0 版本开始支持同步双写
所以,最保证消息可靠的设置是:单机同步刷盘 + 集群同步双写。但是势必要牺牲性能,需要根据实际业务场景进行权衡。
消息消费方式
支持至少一次(At least Once),和 OneWay 消费方式。在至少一次(At least Once)模式下,需要做好幂等性设计。
支持回溯消费。例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。
支持事务消息,这是 RocketMQ 的一大亮点。是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。但是这种方式带来的缺点就是业务入侵性比较强。需要编写处理成功以及失败后的业务逻辑代码。
支持延迟消息(定时消息),broker 有配置项 messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18 个 level。可以配置自定义 messageDelayLevel。
注意,messageDelayLevel 是 broker 的属性,不属于某个 topic。
定时消息会暂存在名为 SCHEDULE_TOPIC_XXXX 的 topic 中,并根据 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel – 1,即一个 queue 只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker 会调度地消费 SCHEDULE_TOPIC_XXXX,将消息写入真实的 topic。
需要注意的是,定时消息会在第一次写入和调度写入真实 topic 时都会计数,因此发送数量、tps 都会变高。
消息重试和消息重投
Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。
通常出现这样的场景有两种:1. 消息本身的原因,例如反序列化失败、消息本身处理失败等;2. 依赖的下游服务不可用,例如 db 连接断开等。
RocketMQ 的处理方式为:每个消费组都设置一个 Topic 名称为 %RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个 Topic 的重试队列是针对消费组,而不是针对每个 Topic 设置的),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。
考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 Topic 名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至“%RETRY%+consumerGroup”的重试队列中。
消息重投是发生在 Producer 端,同步消息会重投,异步消息会重试,对于 Oneway 发送方式的消息没有任何保证。重投会更换 Broker,异步重试只在当前 Broker 上进行。
具体如下:
retryTimesWhenSendFailed:同步发送失败重投次数,默认为 2,因此生产者会最多尝试发送 retryTimesWhenSendFailed + 1 次。不会选择上次失败的 broker,尝试向其他 broker 发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投。
retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他 broker,仅在同一个 broker 上做重试,不保证消息不丢。
retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或 slave 不可用(返回状态非 SEND_OK),是否尝试发送到其他 broker,默认 false。十分重要消息可以开启。
消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer 负载变化也会导致重复消息。
流量控制
分为生产者流量控制和消费者流量控制,生产者流控是 Broker 达到了处理瓶颈,消费者流控是 Consumer 达到了处理瓶颈。
生产者流控:
commitLog 文件被锁时间超过 osPageCacheBusyTimeOutMills 时,参数默认为 1000ms,返回流控。
如果开启 transientStorePoolEnable == true,且 broker 为异步刷盘的主机,且 transientStorePool 中资源不足,拒绝当前 send 请求,返回流控。
broker 每隔 10ms 检查 send 请求队列头部请求的等待时间,如果超过 waitTimeMillsInSendQueue,默认 200ms,拒绝当前 send 请求,返回流控。
broker 通过拒绝 send 请求方式实现流量控制。
注意,生产者流控,不会尝试消息重投。
消费者流控:
消费者本地缓存消息数超过 pullThresholdForQueue 时,默认 1000。
消费者本地缓存消息大小超过 pullThresholdSizeForQueue 时,默认 100MB。
消费者本地缓存消息跨度超过 consumeConcurrentlyMaxSpan 时,默认 2000。
消费者流控的结果是降低拉取频率。
死信队列
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
2. 关键技术架构
2.1 技术架构
RocketMQ 架构上主要分为四部分,如上图所示:
Producer:消息发布的角色,支持分布式集群方式部署。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
Consumer:消息消费的角色,支持分布式集群方式部署。支持以 push 推,pull 拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
NameServer:NameServer 是一个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的 zookeeper,支持 Broker 的动态注册与发现。主要包括两个功能:Broker 管理,NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。然后 Producer 和 Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。NameServer 通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,Broker 仍然可以向其它 NameServer 同步其路由信息,Producer,Consumer 仍然可以动态感知 Broker 的路由的信息。
BrokerServer:Broker 主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker 包含了以下几个重要子模块:
Remoting Module:整个 Broker 的实体,负责处理来自 clients 端的请求。
Client Manager:负责管理客户端(Producer/Consumer)和维护 Consumer 的 Topic 订阅信息
Store Service:提供方便简单的 API 接口处理消息存储到物理硬盘和查询功能。
HA Service:高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。
Index Service:根据特定的 Message key 对投递到 Broker 的消息进行索引服务,以提供消息的快速查询。
2.2 部署架构
NameServer 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。 注意:当前 RocketMQ 版本在部署架构上支持一 Master 多 Slave,但只有 BrokerId=1 的从服务器才会参与消息的读负载。
Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。
Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,消费者在向 Master 拉取消息时,Master 服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读 I/O),以及从服务器是否可读等因素建议下一次是从 Master 还是 Slave 拉取。
结合部署架构图,描述集群工作流程:
启动 NameServer,NameServer 起来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。
Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。
RocketMQ 支持如下几种部署方式:
主从模式
Master 宕机,Broker 可读不可写
集群搭建方式
单 Master 模式
多 Master 模式
多 Master 多 Slave 模式-异步复制
多 Master 多 Slave 模式-同步双写
2.3 消息存储设计
消息存储整体架构:
消息存储是 RocketMQ 中最为复杂和最为重要的一部分,主要概念有:CommitLog、ConsumeQueue、IndexFile(索引文件)。
(1) CommitLog:消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G ,文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
(2) ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于 RocketMQ 是基于主题 topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件中根据 topic 检索消息是非常低效的。Consumer 即可根据 ConsumeQueue 来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值。consumequeue 文件可以看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue 文件采取定长设计,每一个条目共 20 个字节,分别为 8 字节的 commitlog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M;
(3) IndexFile:IndexFile(索引文件)提供了一种可以通过 key 或时间区间来查询消息的方法。Index 文件的存储位置是:$HOME \store\index${fileName},文件名 fileName 是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引,IndexFile 的底层存储设计为在文件系统中实现 HashMap 结构,故 rocketmq 的索引文件其底层实现为 hash 索引。
消息刷盘
(1) 同步刷盘:如上图所示,只有在消息真正持久化至磁盘后 RocketMQ 的 Broker 端才会真正返回给 Producer 端一个成功的 ACK 响应。同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
(2) 异步刷盘:能够充分利用 OS 的 PageCache 的优势,只要消息写入 PageCache 即可将成功的 ACK 返回给 Producer 端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了 MQ 的性能和吞吐量。
注意:这里说的刷盘,都是单 Broker 上的消息可靠性保障,就像之前概念里提到的,如果整个服务器或者磁盘坏掉,还是不能保证高可用。所以为了保险,还需要搭建集群,通过 Master-Slave 的方式进行异步写入或者同步双写。
2.4 消息过滤
RocketMQ 分布式消息队列的消息过滤方式有别于其它 MQ 中间件,是在 Consumer 端订阅消息时再做消息过滤的。RocketMQ 这么做是在于其 Producer 端写入消息和 Consumer 端订阅消息采用分离存储的机制来实现的,Consumer 端订阅消息是需要通过 ConsumeQueue 这个消息消费的逻辑队列拿到一个索引,然后再从 CommitLog 里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。其 ConsumeQueue 的存储结构如下,可以看到其中有 8 个字节存储的 Message Tag 的哈希值,基于 Tag 的消息过滤正式基于这个字段值的。
主要支持如下 2 种的过滤方式:
Tag 过滤方式:Consumer 端在订阅消息时除了指定 Topic 还可以指定 TAG,如果一个消息有多个 TAG,可以用||分隔。其中,Consumer 端会将这个订阅请求构建成一个 SubscriptionData,发送一个 Pull 消息的请求给 Broker 端。Broker 端从 RocketMQ 的文件存储层—Store 读取数据之前,会用这些数据先构建一个 MessageFilter,然后传给 Store。Store 从 ConsumeQueue 读取到一条记录后,会用它记录的消息 tag hash 值去做过滤,由于在服务端只是根据 hashcode 进行判断,无法精确对 tag 原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始 tag 字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。
SQL92 的过滤方式:这种方式的大致做法和上面的 Tag 过滤方式一样,只是在 Store 层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由 rocketmq-filter 模块负责的。每次过滤都去执行 SQL 表达式会影响效率,所以 RocketMQ 使用了 BloomFilter 避免了每次都去执行。SQL92 的表达式上下文为消息的属性。
2.5 负载均衡
RocketMQ 的负载均衡都是在 Client 端完成的,分为 Producer 端发送消息时候的负载均衡和 Consumer 端订阅消息时的负载均衡。
Producer 端负载均衡:
定时获取 Queue 信息
负载均衡算法:随机递增取模
容错机制:故障延迟
每个实例在发消息的时候,默认会-轮询(调度方式)所有的 message queue 发送,以达到让消息平均落在不同的 queue 上。而由于 queue 可以散落在不同的 broker,所以消息就发送到不同的 broker 下。所以选择 broker 是负载均衡的关键,基于方法 selectOneMessageQueue(),这个方法会随机选择一个 broker。跟根据 selectOneMessageQueue()方法的实现内容,来选择一个队列(MessageQueue)进行发送消息。
有一个关键变量值 sendLatencyFaultEnable(默认是 false),如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息;如果开启,在随机递增取模的基础上,再过滤掉 not available 的 Broker 代理。
所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的 latency 超过 550Lms,就退避 3000Lms;超过 1000L,就退避 60000L。latencyFaultTolerance 机制是实现消息发送高可用的核心关键所在。
Consumer 端负载均衡:
客户端心跳上报数据
定时 Rebalance 20S
获取队列信息
获取消费者信息
排序平均分配
在 RocketMQ 中,Consumer 端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在 Push 模式只是对 pull 模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要 Consumer 端在知道从 Broker 端的哪一个消息队列—队列中去获取消息。因此,有必要在 Consumer 端来做负载均衡,即 Broker 端中多个 MessageQueue 分配给同一个 ConsumerGroup 中的哪些 Consumer 消费。
LongPoll:
Consumer 发送拉取消息
Broker hold 住请求,直到有新消息再返回
请求超时,Consumer 再次发起请求
请求超时时间默认 30S
在集群消费模式下,每条消息只需要投递到订阅这个 topic 的 Consumer Group 下的一个实例即可。RocketMQ 采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条 message queue。
Consumer 通过重平衡(Rebalance)的方式进行实例的重新分配(默认值每 20 秒执行一次)。而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照 queue 的数量和实例的数量平均分配 queue 给每个实例。
常用的负载均衡算法有以下几种:
平均分配策略(默认)(AllocateMessageQueueAveragely)环形分配策略(AllocateMessageQueueAveragelyByCircle)
手动配置分配策略(AllocateMessageQueueByConfig)
机房分配策略(AllocateMessageQueueByMachineRoom)
一致性哈希分配策略(AllocateMessageQueueConsistentHash)
靠近机房策略(AllocateMachineRoomNearby)
2.6 事务消息
RocketMQ 采用了 2PC 的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示:
其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
1.事务消息发送及提交:
(1) 发送消息(half 消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见)
2.补偿流程:
(5) 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”
(6) Producer 收到回查消息,检查回查消息对应的本地事务的状态
(7) 根据本地事务状态,重新 Commit 或者 Rollback
其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。
2.7 消息查询
RocketMQ 支持两种维度的查询:
按照 MessageId 查询消息,没有业务含义
按照 Message Key 查询消息,通常是用业务字段定义。
最佳事件:每个消息在业务层面的唯一标识码要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过 topic、key 来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
版权声明: 本文为 InfoQ 作者【大刘】的原创文章。
原文链接:【http://xie.infoq.cn/article/305175126001b68662d8ab207】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论