写点什么

RocketMQ-Streams 架构设计浅析

  • 2022 年 4 月 06 日
  • 本文字数:3189 字

    阅读完需:约 10 分钟

作者:倪泽,RocketMQ 资深贡献者, RocketMQ-Streams 维护者之一,阿里云技术专家。


RocketMQ-Streams 是一款轻量级流处理引擎,应用以 SDK 的形式嵌入并启动,即可进行流处理计算,不依赖于其他组件,最低 1 核 1G 可部署,在资源敏感场景具有很大优势。同时它支持 UTF/UTAF/UTDF 多种计算类型。目前已经广泛运用于安全,风控,边缘计算等场景。


本期将带领大家从源码的角度,解析 RocketMQ-Streams 的构建,数据流转过程。也会讨论 RocketMQ-Streams 是如何实现故障恢复和扩缩容的。


使用示例


代码示例:


public class RocketMQWindowExample {    public static void main(String[] args) {        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");        source.fromRocketmq(                "topicName",                "groupName",                false,                "namesrvAddr")                .map(message -> JSONObject.parseObject((String) message))                .window(TumblingWindow.of(Time.seconds(10)))                .groupBy("groupByKey")                .sum("字段名", "输出别名")                .count("total")                .waterMark(5)                .setLocalStorageOnly(true)                .toDataSteam()                .toPrint(1)                .start();
}
}
复制代码


pom 文件依赖:


<dependency>  <groupId>org.apache.rocketmq</groupId>  <artifactId>rocketmq-streams-clients</artifactId>  <version>1.0.1-preview</version></dependency>
复制代码


上述代码是一个简单的使用例子,它主要的功能是从 RocketMQ 中指定 topic 读取数据,经过转化成 JSON 格式,以 groupByKey 字段值分组、10 秒一个窗口,对 OutFlow 字段值进行累加,结果输出到 total 字段,并打印到控制台上。上述计算中还允许输入乱序 5 秒,即窗口时间到达后不会马上触发,而是会等待 5s,如果这个段时间内,有窗口数据到达依然有效。上述 setLocalStorageOnly 为 true 表示不对状态进行远程存储,仅使用 RocksDB 做本地存储。目前 1.0.1 的 RocketMQ-Streams 版本依然使用 Mysql 作为远程状态存储,下一版本将使用 RocketMQ 作为远程状态存储。


RocketMQ 总体架构图


image.gif


1.png


RocketMQ-Streams 作为轻量流处理引擎,本质上是作为 RocketMQ 的客户端消费数据,一个流处理实例可以处理多个队列,而一个队列只能被一个实例消费。若干 RocketMQ-Streams 实例组成消费者组共同消费数据,通过扩容实例达到增加处理能力的消费,减少实例则会发生 rebalance,消费的队列自动重平衡到其他消费实例上。从上述图中,我们还可以看出计算实例间不需要直接交换任何数据,可各自独立完成所有计算处理。这种架构简化了 RocketMQ-Streams 本身的设计,同时也可非常方便的进行实例扩缩容。


处理拓扑


处理器拓扑为应用定义了流处理过程的计算逻辑,它由一系列的处理器节点和数据流向组成。例如,在开头的代码示例中,整个处理拓扑由 source、map、groupBy、sum、count、print 等处理节点组成。有两种特殊的处理节点:


  • source 节点


他没有任何上游节点,从外部读入数据到 RocketMQ-Streams,并交由下游处理。

  • sink 节点


他没有任何下游节点,他将处理后的数据写出到外部。


处理拓扑仅仅是流处理代码的逻辑抽象,在流计算启动时将会被实例化。为了设计简单,目前一个流处理实例中仅有一张计算拓扑。


在所有流处理算子之中,有两种特别的算子,一种是涉及数据分组的算子 groupBy,另一种是有状态计算例如 count 等。这两种算子会影响整个计算拓扑的构建,下面将具体分析 RocketMQ-Streams 是如何处理他们的。


groupBy


分组算子 groupBy 特殊是因为经过 groupBy 操作,后续算子期望对相同 key 的数据进行操作,例如经过 groupBy("年级")之后再进行 sum 就是对按照年级分组求和,这就要求需要将具有相同“年级”的数据重新路由到一个流计算实例上处理,如果不这样做,每个实例上得出的结果都将是不完整的,整体输出结果也将是错误的。


RocketMQ-Streams 采用 shuffle topic 这种方式来处理。具体说来,计算实例将 groupBy 数据重新发回 RocketMQ 的一个 topic,并且在发回过程中按照 key 的 hash 值来选择目标队列,再从这个 topic 读取数据进行后续流处理。按照 key hash 后相同的 key 一定在一个队列里面,而一个队列只会被一个流处理实例消费,这样就达到相同 key 被路由到一个实例上处理的效果。


有状态算子


有状态算子与无状态算子相对。如果计算结果只与当前输入有关,和上一次输入无关就是无状态算子,例如 filter、map、foreach 结果只与当前输入有关系。还有一种算子的输出结果不仅与当前算子有关系还与上一次输入有关,例如 sum,需要对一段时间内输入进行求和,他就是有状态算子。


RocketMQ-Streams 利用 RocksDB 作为本地存储,Mysql 作为远程存储来保存状态数据。他具体做法是:

  1. 当发现消息来自新的队列时,检查是否需要加载状态,如果需要异步加载状态到 RocksDB。


  2. 数据到达有状态算子时,如果加载完成使用 RocksDB 中状态进行计算,如果没有,使用 Mysql 中状态计算。


  3. 计算完成后,将状态数据保存到 RocksDB 和 Mysql 中。


  4. 窗口触发后,从 RocksDB 中查询出状态数据,并将结果向下游算子传递。



整体数据流向图如下:


2.png


扩缩容与故障恢复


扩缩容和故障恢复是一个硬币的两面,即同一个事物的两种表达,计算集群如果能正确扩缩容就等于具备故障恢复的能力,反之亦然。通过前面介绍我们知道,RocketMQ-Streams 具有非常良好的扩缩容性能,扩容时只需要新部署一个流计算实例即可,缩容时停止计算实例即可。对于无状态的计算来说比较简单,扩容后,数据计算不需要之前的状态。有状态计算的扩缩容涉及到状态的迁移。有状态的扩缩容可由下图表示:


image.gif


3.png


当计算实例从 3 个缩容到 2 个,借助于 RocketMQ 的 rebalance,MQ 会在计算实例之间重新分配。

Instance1 上消费的 MQ2 和 MQ3 被分配到 Instance2 和 Instance3 上,这两个 MQ 的状态数据也需要迁移到 Instance2 和 Instance3 上,这也暗示,状态数据是根据源数据分片保存的;扩容则是刚好相反的过程。


具体实现上,RocketMQ-Streams 采用系统消息来触发状态的加载和持久化。

系统消息类别:


//新增消费队列NewSplitMessage
//不在消费某个队列RemoveSplitMessage
//客户端持久化消费位点到MQCheckPointMessage
复制代码


当发现消息来自一个新的 RocketMQ 队列(MessageQueue),RocketMQ-Streams 之前没有处理过来自该队列的消息,会先于数据前发送 NewSplitMessage 消息,通过处理拓扑下游算子传递,当有状态算子收到该消息时会将新增队列对应的状态加载到本地内存 RocksDB 中,当数据真正到达时,就根据这个状态继续计算。


当因为计算实例增加或者 RocketMQ 集群变动,rebalance 后,计算实例不再消费某个队列(MessageQueue)时,会发出 RemoveSplitMessage 消息,有状态算子删除本地 RocksDB 中的状态。


CheckPointMessage 是一种特别的系统消息,他的作用与实现 exactly-once 有关。我们在扩缩容过程中需要做到 exactly-once,才能保证扩缩容或故障恢复对计算结果没有影响。RocketMQ-streams 向 broker 提交消费 offset 前会产生 CheckPointMessage 消息,向下游拓扑传递,他将保证即将提交消费位点的所有消息都已经被 sink 处理掉。


开源地址:

RocketMQ-Streams 仓库地址:

https://github.com/apache/rocketmq-streams

RocketMQ 仓库地址:

https://github.com/apache/rocketmq


加入 Apache RocketMQ 社区


十年铸剑,Apache RocketMQ 的成长离不开全球接近 500 位开发者的积极参与贡献,相信在下个版本你就是 Apache RocketMQ 的贡献者,在社区不仅可以结识社区大牛,提升技术水平,也可以提升个人影响力,促进自身成长。


社区 5.0 版本正在进行着如火如荼的开发,另外还有接近 30 个 SIG(兴趣小组)等你加入,欢迎立志打造世界级分布式系统的同学加入社区,添加社区开发者微信:rocketmq666 即可进群,参与贡献,打造下一代消息、事件、流融合处理平台。


发布于: 刚刚阅读数: 2
用户头像

云原生技术是云时代释放云价值的最短路径。 2020.06.11 加入

还未添加个人简介

评论

发布
暂无评论
RocketMQ-Streams架构设计浅析_阿里云_阿里巴巴中间件_InfoQ写作平台