写点什么

Apache RocketMQ 5.0 在 Stream 场景的存储增强

  • 2022 年 10 月 10 日
    浙江
  • 本文字数:5249 字

    阅读完需:约 17 分钟

1.jpeg


本文作者:刘振东,Apache RocketMQ PMC Member

RocketMQ 基础介绍

2.png


RocketMQ 的诞生是为了解决微服务解耦的问题。微服务解耦指将传统的巨大服务拆分为分布式的微服务。


拆分之后,产生了一个新的问题:服务之间需要进行通信才能对外形成完整的服务。通信方式分为两种:其一为 RPC 方式,也称为同步通信;其二为异步通信方式,比如 RocketMQ。


RocketMQ 的广泛使用证明异步通信方式存在极大优势。最显著的特点即异步解耦,所谓解耦指一个服务不需要知道另外一个服务的存在。比如开发 A 服务,即使其他服务需要 A 服务的数据,A 服务也并不需要知道它们的存在,不需要依赖其他服务的发布,其他服务的新增也不会对 A 服务造成影响,从而实现了团队的解耦,指一个微服务由一个特定的团队去完成,而其他团队并不需要知道该团队的存在,只需要根据事先约定的数据格式,通过 RocketMQ 实现异步通信。这种组织方式大大促进了生产力的发展,自然也促使 RocketMQ 得到广泛应用。


3.png


在异步解耦过程中,有的组件生产消息,有的组件消费消息。RocketMQ 的 API model 是其异步解耦过程的抽象概念。API model 的两端是 RocketMQ 领域最典型的两个概念:其一为 producer,指消息的生产方或者数据生产方;其二为 consumer,指消息的消费方。


除此之外,topic 也是 API model 的一个重要概念。因为异步解耦的需要,一条数据从 producer 发出到最终被 consumer 消费的过程并不是直接连接,中间有一个抽象层,这个抽象概念称为 topic,相当于一个逻辑地址。topic 就像一个仓库,当一条数据被发送到一个 topic 时,它会负责将消息暂存,其他组件需要使用时可以拿取。


RocketMQ 是一个分布式的消息中间件,因此 topic 实质上是一个逻辑概念,真正的物理概念是分布在每一个 broker 上的队列,即 message queue。一个 topic 可以具有很多 message queue,可以分布在很多 broker 上,从而具有了无限扩展的能力,这是 topic 的一个基本特性。


此外,topic 接收消息还有一个非常重要的特性,即消息不可变。消息的不可变特性使其可以被重复地读取。通过引入 consumer group 的概念,可以看到不同组的消费者读取消息的行为相互之间不会造成影响。Topic 里的数据不会因为有 consumer 去读取而消失,可实现一处发送多处消费的能力。比如在一个组织内,订单团队发了一条消息到订单 topic,该组织内的其他所有团队都可以直接进行读取,且一个团队的读取并不会影响其他团队的读取,实现了读取的相互独立。


4.png


MQ 的一个重要特性是异步解耦,在互联网的超大流量场景下,异步解耦之后往往会跟随着削峰填谷问题。为了实现削峰填谷,需要持久化的能力。MQ 是一个存储引擎,它可以暂存发送者的数据。如果消费者暂时无法处理,数据可以先堆积在 MQ ,等到有足够的能力消费时再读取数据。


持久化也更好地支持了异步解耦的特性,即使 consumer 全部不在线也并不影响 producer 的发送。持久化是 MQ 的一项重要能力。在持久化能力里,为了配合顺序的特性,MQ 的引擎是一个顺序存储的引擎。


RocketMQ 设计时略微有所不同,它将所有消息集中式地存储,再根据不同的 topic、不同的队列分别建立索引。这种设计是 RocketMQ 针对微服务场景特别优化的,它具有能够很好地支持同步刷盘的能力,在海量 Topic 的场景下写入延迟依然能够保持平稳,这也是 RocketMQ 可与其他消息引擎竞争的重要特性。


5.png


流场景最初应用于用户行为分析。用户行为分析指根据用户的行为日志去猜测用户的喜好。比如推荐系统的搜索推荐广告等业务就是流场景最典型的场景。


流场景的第一步为将各个系统里产生的用户行为,包括日志、数据库记录等,集中导入到某些分析引擎。过程中数据来源多,数据的分析引擎也很多,包括离线引擎、实时引擎等。


为了降低复杂度,我们引入了类似 MQ 的工具,使得数据源和数据使用者不直接交互,而是先将数据发送到 MQ 里,整个系统的连接复杂度会从 O(N2)变为 O(N),复杂度大大降低。


从用户行为或者流处理的角度分析 MQ 扮演的角色以及它最终所期望的 IT 架构,可以发现其与微服务解耦的架构非常相似,两者之间的所有概念比如 consumer、producer、topic、message queue、分片等,都可以一一对应。因此,如果只考虑 RocketMQ 的功能,它本身就能支持流处理场景。


目前有很多公司在使用 RocketMQ 进行流处理,但 RocketMQ 在解决流处理问题时仍然存在可优化的空间。

Stream 场景特征分析

6.png


流处理的场景具有三个特点:


(1)单条消息 size 很小:微服务解耦中,一条消息一般就是一条订单,包含的数据非常多,买家、卖家等各种信息糅杂在一起发给下游,一条消息通常会达到至少 1KB 甚至几 KB。但在流场景里,数据类似于用户的行为日志,比如某个用户登录、某个用户下线、某个用户浏览某个页面等描述。用户行为的表达很可能只占几个字节到 100 个字节。


(2)消息数量很多:用户的浏览行为数量远远大于操作行为数量,整体消息数量急剧增多。通常在微服务解耦场景中,单机不会超过 10 万 TPS。但是在流场景或者日志搜集等场景当中,单机百万 TPS 很常见。


(3)Catch Up 读常态化:在流场景中,经常有任务的 replay,即读取历史数据再计算历史结果,也称为 cache up read。相对于微服务解耦场景,catch up read 在流场景中会更常见。


总而言之,在整个流场景里,吞吐变得更加重要。

存储增强三步曲——批、分、合

7.png


RocketMQ 起初为微服务解耦设计时,是面向单条记录,因此吞吐并不高。RocketMQ 5.0 针对吞吐引入了一个新特性 batch。


在传统 RocketMQ 里,一条消息一条记录,一条消息一条索引。这种传统设计的优点是能够保证延迟更加稳定,但也意味着吞吐不高。因为通信链路层的 RPC 次数太多,对 CPU 的消耗太大。因此,RocketMQ 5.0 针对该问题,推出了 batch 功能。


Batch 的基本逻辑是:在客户端自动组装,将多条消息按照 topic 和队列合并,作为一个请求发送到服务端;服务端收到消息一般不解压,而是直接存储;消费端一次拿下一批,将多个 byte 的消息拿到本地,再进行解压。如果每个 batch 包含 10 条消息,TPS 可以很轻松地上升 10 倍。原本一条消息要发一次远程请求,而加入 batch 后 10 条消息发一次远程请求即可。


因为服务端不进行解压,所以对服务端的 CPU 增加非常小,将解压和合并的功能下放到各个客户端,从而使服务端资源不容易形成瓶颈,TPS 可以很轻松地得到提高。


8.png


流场景中另一个典型问题是扩容和数据重均衡。在微服务场景中,流量不大的情况下,扩容问题并不明显。但是在流场景中,单机流量本来就高,一旦扩容,扩容和数据重均衡问题就难以忽略。在扩容过程中,假如原先是一个 node,需要扩容变成两个 node,则会产生重均衡的问题。


为了解决该问题,通常有两个办法:


(1)直接增加队列的数量,即“Add a Shard”。这种方法会产生一个问题,队列的数量发生变化导致整个数据的分布也发生变化。比如做 word count 单词个数计算,原本 A 单词位于队列 0,队列数发生变化之后,A 单词位于队列 2,计算的结果会出现问题。因此如果增加队列个数,流计算任务需要重新运行一遍来修正数据。另一问题为分片数,如果每动一次就增加分片数,则会导致分片数量膨胀而且很难减少,这也会产生问题。


(2)不增加队列,但是复制队列。比如原本队列 1 在 node 0 上,增加一个 node 1,将队列 1 从 node 0 转移到 node 1,过程中队列数量没有发生变化,数据分布也没发生变化,因此客户端、发送端、消费端等流接收任务都不需要重跑。此方法对用户很友好,但也会带来一个新的问题:复制过程会导致额外带宽消耗。在流场景中需要扩容的本质原因是机器的流量过高,但是为了将流量引走还需要新增一个复制任务,在还未完成引流之前就给系统带来额外的性能消耗,可能会导致扩容的过程直接产生网络风暴,系统崩溃。另外,复制分区时,因为流计算任务的每个分片数据量很大,复制过程耗时会很长。


因此采用复制方式来解决大数据存储引擎的扩容其实很困难,可用性与可靠性难以权衡。


9.png


RocketMQ 针对该问题提出了 logic queue 解决方案。logic queue 是暴露给客户的队列,一个物理 queue 分布在一个 node 上,用于实际存储数据的队列。一个 logic queue 由多个物理 queue 通过位点映射组合而成。


位点映射的原理如下:假设 LogicQueue-1 由 Queue-1 和 Queue-2 组成,Queue-1 包含 0 到 100,Queue-2 包含 101 到 200,可映射成一个总位点是 0 到 200 的 LogicQueue-1。


上述情况下,只需修改映射关系,将逻辑队列修改到新 node 上的队列里,即可实现扩容。


写入时,新进的数据写入新节点,即实现了写入端的负载均衡。而读取过程有所不同,最新的数据会从本机读取,老数据会采用远程读取。


在流计算的整个生命周期中,数据在不断产生、不断地消费。因此在大多数情况下,如果没有产生堆积,远程读取的数量很小,几乎能够瞬间完成,写入和读取都在新节点上,以此完成扩容。这种扩容有两个明显优势:其一是不需要搬运数据;其二是分片数量不用发生变化。这也意味着上下游的客户端都无需重启,也不用发生变化,数据任务都是完整而正确的。


10.png


严格意义上来看,RocketMQ 是一个流存储引擎。但 RocketMQ 5.0 推出了 RocketMQ Streams——一个轻量级的流计算。


在 RocketMQ Streams 中,source 端是数据源头,中间有算子,最后数据会进入 Sink 端。通常它们都是有状态的,比如计算 word count,每一条数据进来,一个新的单词出现,首先要拿取过去数据的 count 值,加 1 后生成新的 count 值,这个中间的数据为状态村塾,一般称为 state store。


State store 的特征包括:


(1)本地化 locality:一个轻量级的流计算如果要遵循高效的计算性能,通常需要本地化。本地化指将 state store 的数据和计算节点放在一起,至少要缓存到计算节点里。从 state store 里面取数据,就相当于从本地取数据,提升性能。如果每次读取 state store 都是远程读取,可想而知性能会显著下降。


(2)持久化 persistency:一旦灾难发生后,其他计算节点要能完整地读到 state store。例如原本在 A 计算节点计算数据,A 计算节点出问题后要到 B 计算节点计算,在 B 计算节点也需要恢复出 state store。


(3)可搬运 exchangeable:state store 需要能便利地从一个系统搬到另外一个系统。比如算完 word count,一个 BI 系统或者一个网站系统想尝试感知到该变化,则需要将数据从 state store 搬运到系统里。


11.png


针对以上三个特性,RocketMQ 推出了一种新的 topic,称为 compacted topic。compacted topic 的存储方式和使用方式和与正常 topic 一样,唯一区别在于其服务端会将 key 相同的 record 删掉,进行规整。


比如图中原本 offset=3 的节点是 K1V4,会覆盖之前的 offset=0 的 K1V1 和 offset=2 的 K1V3,使最后只剩下一个 K1V4。此设计的优点在于重新恢复时需要读取的数据量非常少。


State store 是一个 NoSQL 型的 table,比如 word count 就是一个 KV 结构,key 是单词,count 是单词出现的个数,但是 word count 在不断变化,需要将表转变成一个流,表发生的所有变化形成的数、记录的列表就形成了一个流,该过程称为流转,即将一个表转成一个流。RocketMQ 可以通过以上方式轻松地将一张动态表存下来。


Compacted topic 相当于一张动态表,且为流的形式,因此 compacted topic 是一种流表二象性的状态。这种特殊的 topic 可以充当 state store 的存储层——一个持久化层。


State store 本身是表,且 key 和 value 不断变化。为了实现容灾的特性,需要将该表持久化,将该表的所有修改记录形成一个流,存到 RocketMQ 的一个 compacted topic,state store 即相当于被持久化。假如一个计算节点 A 崩溃,B 计算节点接管任务时,可以将 topic 以普通 API 的方式读出,再在本地恢复 state store,以此实现流计算任务的 disaster recovery,即实现了容灾特性,可以帮助 RocketMQ 构建一个轻量级、没有任何外部依赖的流计算引擎。


12.png


Batch、logic queue、compacted topic 三个存储的基本特性,分别用于解决增强吞吐的问题、弹性的问题以及 state store 的问题。将三个存储特性进行结合,再配合 RocketMQ Streams,可以形成一个轻量级的流计算解决方案。只需要 RocketMQ 和 RocketMQ Streams,即可实现一个通用的流计算存储引擎。


13.png


RocketMQ 5.0 从原先的微服务解耦转变为流存储引擎,原先的异步解耦、削峰填补等特性依然可以在新场景中充分得以使用。


此外,RocketMQ 5.0 针对流存储的场景实现了三个重大特性的增强:其一是 batch,可提升性能,将吞吐能力提升 10 倍;其二是 logic queue,可以实现秒级扩容,并且无需搬运数据,也无需改变分片数量;其三是针对流计算场景中所用 state store 实现了 compacted topic。


经过增强,RocketMQ 向流存储引擎发展的过程更进了一步。

加入 Apache RocketMQ 社区

十年铸剑,Apache RocketMQ 的成长离不开全球接近 500 位开发者的积极参与贡献,相信在下个版本你就是 Apache RocketMQ 的贡献者,在社区不仅可以结识社区大牛,提升技术水平,也可以提升个人影响力,促进自身成长。


社区 5.0 版本正在进行着如火如荼的开发,另外还有接近 30 个 SIG(兴趣小组)等你加入,欢迎立志打造世界级分布式系统的同学加入社区,添加社区开发者微信:rocketmq666 即可进群,参与贡献,打造下一代消息、事件、流融合处理平台。


14.jpeg


微信扫码添加小火箭进群


另外还可以加入钉钉群与 RocketMQ 爱好者一起广泛讨论:


15.png


钉钉扫码加群


关注「Apache RocketMQ」公众号,获取更多技术干货

发布于: 刚刚阅读数: 3
用户头像

阿里巴巴云原生 2019.05.21 加入

还未添加个人简介

评论

发布
暂无评论
Apache RocketMQ 5.0 在Stream场景的存储增强_阿里云_阿里巴巴云原生_InfoQ写作社区