vivo 鲁班 RocketMQ 平台的消息灰度方案
一、方案背景
RocketMQ(以下简称 MQ)作为消息中间件在事务管理,异步解耦,削峰填谷,数据同步等应用场景中有着广泛使用。当业务系统进行灰度发布时,Dubbo 与 HTTP 的调用可以基于业界通用的灰度方式在我们的微服务治理与网关平台来实现,但 MQ 已有的灰度方案都不能完全解决消息的隔离与切换衔接问题,为此,我们在鲁班 MQ 平台(包含根因分析、资源管理、订阅关系校验、延时优化等等的扩展)增加了 MQ 灰度功能的扩展实现。
二、RocketMQ 技术特点
为什么 MQ 的灰度方案迟迟没有实现呢?我们先来回顾一下 RocketMQ 的几个核心技术点。
2.1 存储模型的简述
(图 2.1 MQ 的存储模型)
CommitLog:消息体实际存储的地方,当我们发送的任一业务消息的时候,它最终会存储在 commitLog 上。MQ 在 Broker 进行集群部署(这里为也简洁,不涉及主从部分)时,同一业务消息只会落到集群的某一个 Broker 节点上。而这个 Broker 上的 commitLog 就会存储所有 Topic 路由到它的消息,当消息数据量到达 1 个 G 后会重新生成一个新的 commitLog。
Topic:消息主题,表示一类消息的逻辑集合。每条消息只属于一个 Topic,Topic 中包含多条消息,是 MQ 进行消息发送订阅的基本单位。属于一级消息类型,偏重于业务逻辑设计。
Tag:消息标签,二级消息类型,每一个具体的消息都可以选择性地附带一个 Tag,用于区分同一个 Topic 中的消息类型,例如订单 Topic, 可以使用 Tag=tel 来区分手机订单,使用 Tag=iot 来表示智能设备。在生产者发送消息时,可以给这个消息指定一个具体的 Tag, 在消费方可以从 Broker 中订阅获取感兴趣的 Tag,而不是全部消息(注:严谨的拉取过程,并不全是在 Broker 端过滤,也有可能部分在消费方过滤,在这里不展开描述)。
Queue:实际上 Topic 更像是一个逻辑概念供我们使用,在源码层级看,Topic 以 Queue 的形式分布在多个 Broker 上,一个 topic 往往包含多条 Queue(注:全局顺序消息的 Topic 只有一条 Queue,所以才能保证全局的顺序性),Queue 与 commitLog 存在映射关系。可以理解为消息的索引,且只有通过指定 Topic 的具体某个 Queue,才能找到消息。(注:熟悉 kafka 的同学可以类比 partition)。
消费组及其 ID:表示一类 Producer 或 Consumer,这类 Producer 或 Consumer 通常生产或消费同应用域的消息,且消息生产与消费的逻辑一致。每个消费组可以定义全局维一的 GroupID 来标识,由它来代表消费组。不同的消费组在消费时互相隔离,不会影响彼此的消费位点计算。
2.2 消息发送与消费
(图 2.2 消息发送与拉取模型)
2.2.1 客户端标识
在生产者或消费者集群中,每一个 MQ 客户端的运行实例,在 MQ 的客户端会保证产生唯一的 ClientID。注:同一应用实例中,既充当生产者,也充当消费者时,其 ClientID 实际上是同一个取值。
2.2.2 消息发送
当向某个 Topic 发送消息的时候,会首先获得这个 Topic 的元数据,元数据会包括它有多少个 Queue,每个 Queue 属于哪个 Broker。默认的情况下,发送方会选择一条 Queue 发送当前消息,算法类型是轮询,也就是下一条消息会选择另一条 Queue 进行发送。另外 MQ 也提供了指定某条 Queue 或者自定义选择 Queue 的方法进行消息的发送,自定义选择 Queue 需实现 MessageQueueSelector 接口。
2.2.3 消息消费
进行消息的消费时,同一消费组(GroupID)的消费者会订阅 Topic,消费者首先获得 Topic 的元数据,也就是会获得这个 Topic 的所有 Queue 信息。然后这些 Queue 按规则分配给各个具体的客户端(ClientID),各个客户端根据分配到的 Queue 计算对应的需要拉取消息的 offset 并生成 PullRequest,拉取消息并消费完成后,客户端会生成 ACK 并更新消费进度。
这里的消费进度是该批消息未消费成功的最小 offset,如图 2.3 所示,一批消息中如果 1、5 未消费,其余的消息已消费,此时更新的 offset 仍是 1,消费者如果宕机重启,会从 1 号开始消费消息,此时 2、3、4 号消息将会重复消费。
(图 2.3 消费进度更新示意图)
因此 RocketMQ 只保证了消息不会丢失,无法保证消息不会重复消费,消息的幂等性需要业务自己实现。
另外,消费方可以指定消费某些 Tag 的消息,在 pullRequest 进行拉取时,会在 Broker 里会按照存储模型的 Queue 索引信息按 hash 值进行快速过滤,返回到消费方,消费方也会对返回的消息进行精准的过滤。
2.3 订阅关系一致性
在消费端,同一消费组内(GroupID 相同,本节所描述的前提都是同一消费组内)的各个应用实例 MQ 客户端需要保持订阅关系的一致性,所谓订阅关系的一致性就是同一消费组内的所有客户端所订阅的 Topic 和 Tag 必须完全一致,如果组内订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。
2.3.1 订阅关系的维护
每一个应用实例 MQ 客户端都有独立的 ClientID,我们简单地讲解一下订阅关系的维护:
各个 MQ 消费客户端会带着它的 ClientID 向所有 Broker 发送心跳包,心跳包中含有具体的本客户端订阅的 Topic & Tag 等信息,registerConsumer 方法会按照消费组将客户端分组存储,同一消费组内的 ClientID 信息在同一个 ConsumerGroupInfo 中;
Broker 在接收到任何消费客户端发过来的心跳包后,会在 ConsumerManager 类中的 ConcurrentMapconsumerTable 按照消费组名称 GroupID 作为 key 存储不同的消费组信息,同一消费组的订阅信息会在每一次收到心跳包后,都会按当前的订阅 Topic & Tag 信息进行更新维护,也就是相当于只保存最新的心跳包订阅信息(心跳包中的 subVersion 会标记心跳包版本,当重平衡结果发生改变后,subVersion 会更新,Broker 只会保存最新版本的心跳包中的订阅信息),不管这个心跳包来源于这个消费组的哪个 ClientID。(由于 Tag 是依赖于 Topic 的属性,Topic 和 Tag 订阅关系不一致时 Broker 对应的处理结果也略有不同,具体可见 updateSubscription 方法)。
2.3.2 订阅不一致影响
在这里使用例子的方式来阐述订阅关系不一致带来的部分问题。假设同一组内的消费者 clientA 订阅了 TOPIC_A,clientB 订阅了 TOPIC_B;TOPIC_A、TOPIC_B 均为 4 条 Queue,在进行消费分配时,最终 Queue 的分配结果如下:
(表 2.1 消息费者的 Queue 分配结果表)
因为 clientB 没有订阅 TOPIC_A,clientA 也没有订阅 TOPIC_B,所以 TOPIA_A 中的 Queue-2、Queue-3,TOPIC_B 中的 Queue-0、Queue-1 中的消息则无法正常消费。而且在实际过程中,clientA 也没法正常消费 TOPIC_B 的消息,这时会在客户端看到很多异常,有些 Queue 的消息得不到消费产生堆积。
此外,在订阅关系中,不同 client 所订阅的 Tag 不一样时,也会发生拉取的消息与所需要订阅的消息不匹配的问题。
三、业界 MQ 灰度方案
(图 3.1 灰度调用示意图)
通常,业务灰度只严格地保证 RPC 服务之间的调用,部分消息灰度流量的流失或错误是可以容忍的,如图 3-1 所示,V_BFF 产生的灰度消息会被 V_TRADE 的正常版本与灰度版本收到并随机消费,导致部分灰度流量没有进入期望的环境,但整体 RPC 服务的调用还是隔离了灰度与非灰度环境。当业务对消息消费的逻辑进行了更改,或者不想让灰度的消息影响线上的数据时,MQ 的灰度就必须要实现。
由于订阅关系的限制,当前的业界实现的 MQ 灰度方案都是正常版本与灰度版本使用不同的 GroupID 来实现。以下的方案均使用了不同的 GroupID。
3.1 影子 Topic 的方案
新建一系列新的 Topic 来处理隔离灰度的消息。例如对于 TOPIC_ORDER 会创建 TOPIC_ORDER_GRAY 来给灰度环境使用。
发送方在发送时,对灰度的消息写入到影子 Topic 中。消费时,灰度环境只使用灰度的分组订阅灰度的 Topic。
3.2 Tag 的方案
发送方在发送时,对灰度环境产生的消息的 Tag 加注灰度标识。消费方,每次灰度版本发布时只订阅灰度 Tag 的消息,正常的版本订阅非灰度的 Tag。
3.3 UserProperty 的方案
发送方在发送时,对灰度环境产生的消息的 UserProperty 加注灰度标识。消费方的客户端需要进行改写,根据 UserProperty 来过滤,正常的版本会跳过这类消息,灰度的版本会处理灰度消息。
3.4 当前的方案缺陷
以上三种方案各自的优势在这里不做比较,但都存在以下共同的缺陷(也有其它的缺陷或开发诉求,但不致命),无法真正实现灰度状态切换回正常状态时消息不丢失处理,导致整个灰度方案都是从入门到放弃的过程:
因为使用不同的消费组,那么灰度版本验证通过后,如何正确地衔接回原正常版本的消费组的消费位移,做到高效地不丢失信息处理呢?
灰度的消息如何确保准确地消费完毕,做到落在灰度标识的消息做到高效地不丢失信息处理呢?
开启灰度时,灰度消息的位点从那里开始?状态的细节化如何管控?
四、鲁班 MQ 平台的灰度方案
本质上,MQ 灰度问题的核心就是高效地将灰度与非灰度的消息隔离开,消费方按照自己的需求来准确获取到对应版本的消息;当灰度完成后,能够正确地拼接回来消息的位移,做到不丢失处理必要的消息,也就是状态细节上的管理。为了实现这个目的,本方案分别在以下几点进行了改造。
本方案中涉及到的代码为测试代码,主要用于说明方案,实际代码会更精细处理。
4.1 Queue 的隔离使用
(图 4.1 Queue 的区分使用)
我们已经知道了 Queue 是 topic 的实际执行单元,我们的思路就是借助 Queue 来实现 v1(正常)消息、v2(灰度)消息的区分,我们可以固定首尾两条【可配置】Queue 专门用来发送与接收灰度的消息,其余的 Queue 用来发送正常的线上消息。我们使用相同的消费组(也就是和业界的通用方案不一样,我们会使用相同的 GroupID),让灰度消费者参与灰度 Queue 的重平衡,非灰度消费者参与非灰度 Queue 的重平衡。
这里我们解决了消息的存储隔离问题。
4.2 Broker 订阅关系改造
灰度版本往往需要变更 Topic 或 Tag,由于我们没有新增独立的灰度消费组,当灰度版本变更 Topic/Tag 时,消费组内订阅关系就会不一致,前文也简单解释了订阅关系一致性的原理,我们需要在 Broker 做出对应的改造,来兼容灰度与非灰度订阅关系不一致的情况。
同一消费组的订阅信息会在维护在 ConsumerGroupInfo 的 subscriptionTable 中,可以在 ConsumerGroupInfo 中增加创建一份 graySubscriptionTable 用来存储灰度版本的订阅信息,客户端向 Broker 发送的心跳包会改造成带有自身的灰度标记 grayFlag,根据灰度标记 grayFlag 来选择订阅关系存储在 subscriptionTable 还是 graySubscriptionTable;在拉取消息时,同样向 Broker 传来 grayFlag 来选择从 subscriptionTable 还是 graySubscriptionTable 中获取对应的订阅信息。
这里我们解决了消费订阅一致性问题。
4.3 Producer 的改造
发送方的改造相对简洁,只需确定发送的消息是否为灰度消息,通过实现 MessageQueueSelector 接口,将灰度消息投递到指定数量的灰度 Queue 即可。这里我们把用于灰度的 grayQueueSize 定义到配置中心中去,当前更多是约定使用 Broker 的指定 Queue 号作为灰度使用。
TOPIC_V_ORDER 共有 6 条 Queue,如图 4.2 所示,灰度消息只会发送至首尾的 0 号与 5 号 Queue,非灰度消息则会选择其余的 4 条 Queue 发送消息。
(图 4.2 发送结果)
这里我们解决了生产者正确投递的问题。
4.4 Consumer 的改造
消费方涉及的改造点主要是灰度 Queue 与非灰度 Queue 的重平衡分配策略,与各个客户端灰度标记 grayFlag 的更新与同步。
灰度重平衡策略的核心就是分类处理灰度和非灰度的 Queue,要将灰度的 Queue 分配至灰度 ClientID,将非灰度的 Queue 分配至非灰度的 ClientID,因此,在重平衡之前,会通过 Namesrv 获取同组内的所有客户端 clientId 对应最新的 grayFlag(也就是状态会记录到 Namesrv)。
当灰度版本需要变更为线上版本时,各客户端会同步 grayFlag 到 Namesrv,同时,为了避免灰度消息还未消费完成,在更新 grayFlag 之前会先判断灰度 Queue 中是否存在未消费的消息,在保证灰度消息消费完成后才会进行 grayFlag 的更新。
消费者需使用 AllocateMessageQueueGray 作为重平衡策略,传入灰度 Queue 的数量,灰度消费者 setGrayFlag 为 true,可以看出只消费了首尾的 0 号与 5 号 Queue 的消息,非灰度消费者 setGrayFlag 为 false,可以看出只会消费中间的 4 条 Queue 的消息,在控制台也可以非常清晰的看到 Queue 的分配结果,grayFlag 为 true 的 v2 客户端分配到了首尾的 Queue,grayFlag 为 false 的 v1 客户端则分配到了中间的 4 条 Queue。
(图 4.3 消费与订阅结果)
当灰度版本需要切换至线上版本时,只需调用 updateClientGrayFlag 来更新状态即可,可以看出在调用 updateClientGrayFlag 后,原先 v2 的两个灰度客户端在消费完灰度 Queue 的消息后,grayFlag 才真正变为 false【状态在 namesrv 保存】,加入到中间的 4 条非灰度 Queue 的重平衡中,原先首尾的 2 条灰度 Queue 则没有消费者订阅。
(图 4.4 grayFlag 更新)
这里我们解决了状态切换的细节控制处理问题。
4.5 Namesrv 的改造
前文提到过,消费者在重平衡时是需要获取组内所有客户端的灰度标识 grayFlag,因此,我们需要一个地方来持久化存储这些 grayFlag,这个地方是每个消费者都可以访问的,我们选择将这些信息存储在 Namesrv。
Namesrv 相对比较轻量,稳定性很好;
消费者本身就会与 Namesrv 建立长连接,如果该 namesrv 挂掉,消费者会自动连接下一个 Namesrv,直到有可用连接为止;
Broker 是实际存储消息的地方,自身运行压力就相对较大,用来做灰度数据的同步一定程度上会加大 Broker 的压力。
但是 Namesrv 本身是无状态的节点,节点之间是不会进行信息同步的,灰度数据的一致性需要借助数据库来保证,Namesrv 共同访问同一套数据库就好了,数据库持久化存储灰度信息,每次更新 v1、v2 的灰度状态时,通过 Namesrv 修改数据库的数据,在每次重平衡之前,再通过 Namesrv 拉取自己消费组内的所有实例的灰度状态即可。
(图 4.5 Namesrv 存储灰度数据示意图)
这里我们解决了状态存储与同步的问题。
五、灰度场景的校验
测试是校验方案可行性的真理,下面用简单的 demo 来验证鲁班平台的 MQ 灰度方案。
5.1 灰度版本 Topic & Tag 不变
这种场景在 4.3、4.4 时已经做了验证,不再赘述。
5.2 灰度版本 Topic 增加
假设 v1、v2 的订阅信息如表 5.1 所示,则 Topic 订阅结果如图 5.1 所示,TOPIC_V_ORDER 被 v1、v2 同时订阅,首尾两条 Queue 分配给灰度 v2 的客户端,中间 4 条 Queue 则分配给非灰度 v1 的客户端;TOPIC_V_PAYMENT 只被灰度版本 v2 订阅,则只会将首尾两条 Queue 分配给 v2 的客户端,其余四条 Queue 不会被客户端订阅。我们向 TOPIC_V_ORDER 分别发送 4 条非灰度消息和灰度消息,向 TOPIC_V_PAYMENT 发送 4 条灰度消息,从图 5.2 中可以看出 TOPIC_V_ORDER 中的非灰度消息由 v1 的两个客户端成功消费,TOPIC_V_ORDER 与 TOPIC_V_PAYMENT 的灰度消息则由 v2 的两个客户端成功消费。
(表 5.1 订阅信息表)
(图 5.1 订阅结果)
(图 5.2 消费结果)
5.3 灰度版本 Topic 减少
假设 v1、v2 的订阅信息如表 5.2 所示,则 Topic 订阅结果如图 5.3 所示,TOPIC_V_ORDER 被 v1、v2 同时订阅,首尾两条 Queue 分配给灰度 v2 的客户端,中间 4 条 Queue 则分配给非灰度 v1 的客户端;TOPIC_V_PAYMENT 只被非灰度版本 v1 订阅,则只会将中间的四条 Queue 分配给 v1 的客户端,首尾两条 Queue 不会被客户端订阅。我们向 TOPIC_V_ORDER 分别发送 4 条非灰度消息和灰度消息,向 TOPIC_V_PAYMENT 发送 4 条非灰度消息,从图 5.4 中可以看出 TOPIC_V_ORDER 与 TOPIC_V_PAYMENT 的非灰度消息由 v1 的两个客户端成功消费,TOPIC_V_ORDER 中的灰度消息则由 v2 的两个客户端成功消费。
(表 5.2 订阅信息表)
(图 5.3 订阅结果)
(图 5.4 消费结果)
5.4 灰度版本 Tag 变化
假设 v1、v2 的订阅信息如表 5.3 所示,则 Topic 订阅结果如图 5.5 所示,TOPIC_V_ORDER 被 v1、v2 同时订阅,首尾两条 Queue 分配给灰度 v2 的客户端,中间 4 条 Queue 则分配给非灰度 v1 的客户端,我们向 TOPIC_V_ORDER 分别发送 4 条 Tag=v1 的非灰度消息和 Tag=v2 的灰度消息,从图 5.6 中可以看出 Tag 为 v1 的非灰度消息由 v1 的两个客户端成功消费,Tag 为 v2 的灰度消息则由 v2 的两个客户端成功消费。
(表 5.3 订阅信息表)
(图 5.5 订阅结果)
(图 5.6 消费结果)
5.5 灰度版本 Topic & Tag 混合变化
假设 v1、v2 的订阅信息如表 5.4 所示,则 Topic 订阅结果如图 5.7 所示,与 5.2 情况相同不再赘述。我们向 TOPIC_V_ORDER 分别发送 4 条 Tag=v1 的非灰度消息和 Tag=v2 的灰度消息,向 TOPIC_V_PAYMENT 发送 4 条灰度消息,消费结果如图 5.8 所示,可以看出 v2 的两个客户端成功消费了 TOPIC_V_PAYMENT 及 TOPIC_V_ORDER 中 Tag=v2 的灰度消息,而 v1 的两个客户端则只消费了 TOPIC_V_ORDER 中 Tag=v1 的非灰度消息。
(表 5.4 订阅信息表)
(图 5.7 订阅结果)
(图 5.8 消费结果)
六、结语
实际的 MQ 灰度版本,我们还对 MQ 的发送与消费方做了统一的封装,业务方只需配置 graySwitch、grayFlag 即可,graySwtich 标记是否需要开启灰度消息,在 graySwitch 开启的前提下,grayFlag 才会生效,用来标记当前客户端是否为灰度客户端。
在多系统交互时,业务系统可通过开关 graySwitch 来控制是否全量消费其他系统的灰度与非灰度消息,通过 grayFlag 来控制是单独消费灰度消息还是非灰度消息。graySwitch、grayFlag 参数可放在配置中心做到热生效,当需要切换灰度流量时,可开发相应的脚本统一化更改 grayFlag,实现全链路灰度流量的无损切换。
另外,我们对于切换状态借助 Namesrv 做了充分细节上的控制,保证在真正执行切换前,未消费完的消息会被消费完毕才真正的执行切换。
在此,也非常感谢阿里开源 RocketMQ 这个消息中间件!
作者:vivo 流程 IT 团队-Ou Erli、Xiong Huanxin
版权声明: 本文为 InfoQ 作者【vivo互联网技术】的原创文章。
原文链接:【http://xie.infoq.cn/article/837ad1876487d32f624d72c97】。文章转载请联系作者。
评论