实时数据湖 Flink Hudi 实践探索
导读:首先做个自我介绍,我目前在阿里云云计算平台,从事研究 Flink 和 Hudi 结合方向的相关工作。目前,Flink + Hudi 的方案推广大概已经有了一年半的时间,在国内流行度也已比较高,主流的公司也会尝试去迭代他们的数仓方案。所以,今天我介绍的主题是 Flink 和 Hudi 在数据湖 Streaming 方向的一些探索和实践,将会围绕以下四点展开:
Apache Hudi 背景介绍
Flink Hudi 设计
Hudi 应用场景
Hudi RoadMap
Apache Hudi 背景介绍
首先和大家分享下数据湖发展的历史背景,以及 Hudi 的基本特性。
1. 数据湖发展的历史背景
在我个人观点看来,传统的数仓方案(如 Hive)其实本身也是数据湖,而且我会把 Hudi、Iceberg、Delta Lake 都看成是数仓下一代新的解决方案,而不仅仅只是一种湖格式。那为什么近一年来会有数据湖这一新的数仓形态的诞生?
伴随着目前云存储(尤其是对象存储)逐步成熟的大背景,数据湖的解决方案也会逐步往云原生靠近。如图一所示,湖格式会适配云厂商的对象存储,做云厂商多云和云厂商用例,同时适配比较流行的大数据计算框架(如 Spark、Flink),以及查询端的 Presto、trino 以及传统 Hive 引擎,因此诞生了这样一套新的数仓解决方案。
2. Hudi 的四大核心特性
由上可知,Hudi 作为下一代的数仓解决方案,借助上下游的计算和查询引擎,实现替代传统 Hive 离线数仓的一套新方案,其核心特色整体可以总结为以下四点:
开放性
开放性体现在两个方面:
第一方面,上游支持多种数据源格式。比如传统数据库的 change log 日志、消息队列 log 等传输方式,都会在 source 端会有非常丰富的支持。
第二方面,下游查询端也同样支持多种查询引擎。像主流的 OLAP 引擎 Presto、国内比较火的 Starrocks、云厂商的 amazon redshift、数据分析产品 impala,都会对接到这样一套数仓架构里面。
所以开放性是 Hudi 的第一个特点。
丰富的事务支持
Hudi 对事务的支持程度,会比原来 Hive 数仓的要求更高,更丰富。其中核心特点是支持在文件存储布局上做更新。在传统基于 Hive 的 T + 1 更新方案中,数据重复度会比较高,只能实现天级别的数据新鲜度。并且伴随着业务需求越来越复杂,实时性要求越来越高,对数仓存储体系提出了更高的要求,对端到端的数据新鲜度要求做到分钟级或者是秒级。
其次是更新效率要求提高,不要每次都去 overwrite 整张表或者整个 partition 去更新,而是能够精确到文件粒度的局部更新来提升存储和计算效率。Hudi 很好地满足了这些需求,因此,对 ACID 语义的增强是这套数仓架构的第二大特点。
基于 ACID 语义的增量处理
在我看来,第三个亮点是在 ACID 语义基础上衍生出来的增量处理,尤其 Hudi 提出的 TimeTravel 概念,或者直接对接 Flink,Spark Streaming 等流式处理引擎的方式,不管是近实时还是常驻的 Streaming 服务,本质都是一种流式消费,都可以理解为一种增量 ETL 处理。相对于传统 batch 调度,在计算上会更加高效,尤其像 Flink 这种有状态的计算框架,会复用之前的计算结果,直接实现端到端的全链路增量处理。其次,在数据新鲜度上有一个数量级的提升,从“天级别”提升到“分钟级别”。
比方说,国内目前有些实践用户会尝试使用 Flink 计算框架做湖表的 Streaming 消费,直接通过一套增量 ETL 链路去分析从源端注入过来的数据,构建传统数仓的分层。还有一点,很多小伙伴会好奇 TimeTravel 这种 incremental 的查询设计,查询两个快照之间的增量数据,有什么用途?如果你是批形式调度的查询,主流场景是 ADS 端到下游的同步,比如说将数仓的生产结果同步到其他库表(如 ES,Mysql)可通过这种 TimeTravel 定期做这种批量同步,当你对 ADS 的同步时间度要求没这么高,就可以用这种幂等 TimeTravel 的查询方式比较高效地同步到其他下游端。
以上三点,是相对于主流 Hive 架构地三个核心区别,也是目前国内外湖仓项目正在努力的方向。
智能化调度
再补充一点,在 Hudi 里面,会尽量优化文件布局,将小文件管理这种数据治理的方案做到框架内部,实现智能化调度。这是 Hudi 区别于其他数仓方案如 Delta Lake,IceBerg 的核心特点。
Flink + Hudi 设计
1. Hudi 写入 pipeline(多算子组成的微服务架构)
从图二中可以看出,Hudi 写入 pipeline 是一个 Serverless 的微服务架构,核心是在整个 pipeline 的服务起来之后,不管是 Flink,还是 Spark Streaming,整套服务可以对表本身能达到自治理的状态。所以,不光光考虑数据高效写入,同时还需要考虑写入过程中的文件管理,尽量避免产生太多小文件从而优化查询端的效率。通过定期的文件合并,文件清理,避免出现小文件数量爆炸式增长的情况出现。
另一方面,是 ACID 事务性,尤其是完成一个待更新的 ACID 事务,需考虑多方面因素。当单 job 或者单节点要 fail over 时,Hudi 可以保证快速找到之前写入的错误数据,并且实现 rollback 回滚。所以,Hudi 的事务层支持是目前三个湖存储里面做的最完善,最高效的。
以 copy on write 的具体实现为例,会将上游的 SQL 原生数据结构转换成 Hudi 的数据结构,为了支持并发写入,我们会对每个 shuffle 后的数据分 bucket。
主要有两点:
第一个是新增数据,会尽量写入到当前已存在的比较小的 bucket 里面。同时,为了避免生成小文件,也会尽量保证每个 bucket 的大小和预期大小相同。
第二点是更新数据,Hudi 设计了 key 主键,每个 key 的消息都维护在一个 bucket 内部,每次更新都会写入到之前的 bucket 里面。而 IceBerg,Delta Lake 就只管写入,不会去管文件布局,因此他们会把查询端的一些合并和清理做得很重,所以查询效率会比较低。相比之下,Hudi 复杂的写入过程和 bucket 策略就是在权衡和考虑读写效率。这里所说的 bucket 概念,有点类似 Snowflake 里的 micro partition 概念,会在传统的 partition 分区下面再细化,以文件粒度来维护某个 range 下消息的生命周期。以更细粒度去维护生命周期,可有效提升数据更新和查询效率。
第二个算子之后,数据根据每个 bucket 做好分区,我们会按照 bucket ID 做一遍 shuffle,交给 write task 去写入。为何要按照 bucket ID 重新 shuffle?主要是为了维护两个 write task 不能同时并发修改一个 bucket 的更新语义,否则容易造成更新冲突。
所以,从整体上看,这三个算子可以高效保证并发写入、更新。可以比较明显看到,第二个算子的并发度其实决定了整个更新的并发度,决定当前能够同时更新和写入的 bucket 数量,而后面的算子可以自由独立地扩展。从实践经验推荐第二个和第三个算子的并发设置一样,当吞吐量不是很高的时候,一个 bucket 交给一个 write task 去写,吞吐量比较高的时候,可能一个 bucket 的 write task 可能会分多个,可以调整到 1:2 的比例。
后台还会启动 clean commits 的清理任务。数据 commit 操作发生在 coordinator 组件内,保证每个 write task 的 commit 大概对齐了 checkpoint 之后,数据才会 flush 出去,并且有一部分元数据信息,会统一提交给 coordinator,coordinator 收集到统计信息之后,会去结合 checkpoint 完成的事件做一次提交,真实的提交是在 coordinator 内。当 coordinator 完成提交之后,Hudi 表会发起一个新的事务,只有当 write task 看到这个新的事务,才能够发起新事务的写入动作。所以中间存在一个异步等待的过程,类似于一个小型的状态机。
而 Flink 的快照所保证的语义其实是一个 best effort 语义,一旦收到某个 checkpoint 的成功事件,就标志前面的状态都是成功的,但中间可能存在 checkpoint 被 abort 情况。
因为 Hudi 需要保证每个写入的完整性和 Exactly once 语义,就需要考量中间的写入不能越界,比如说 checkpoint 的事件数据不能写入下个 checkpoint,这样 Exactly once 语义就没办法保证。
在 0.11 版本会尝试做一些优化,比方说 checkpoint 被 abort 之后的状态能否复用。里面涉及一个状态的切换,相对会比较复杂。不像 Spark Streaming 每次都是微批的抽象,每次先发起一个任务,天然保证了 exactly once,容错语义交给框架。Flink 怎样把这个异步算法和很强的 exactly once 语义结合在一起,是这套架构的一个难点所在。
2. 小文件策略
接下来,我们仔细看看文件写入的第二个算子 bucket assign 的具体决策。即新消息如何去选择放到哪个 bucket,如图三所示,分两种情况介绍。
首先,左侧框图中有三个 bucket,蓝色代表当前已经存储文件的大小,如果是 insert 数据,策略是每次选择当前剩余空间最多的 bucket 写入。为何不考虑选择剩余空间最少的 bucket 呢?因为需要考虑到 COW 的写放大问题,效率比较低。更新数据时,先找到维护当前 key 的 bucket,然后写入。这样并不会造成文件大小的无限增长,因为每个 record 记录更新前后的大小基本近似,文件大小不会有明显的变化。影响文件大小的主要是 insert 数据,文件大小会设置阈值,维持在 120M 左右。
图中右侧框图是一个比较极端的情况,两个 bucket 只剩下很小的写入空间,考虑到写放大影响,会重新创建一个新的 bucket 重新写入。
为了提高并发写的吞吐量,会给每个 bucket assign task 分配一套独立的 bucket 管理策略,并利用 Hash 算法把 bucket ID 以固定的规则 hash 到每个 bucket assign task 下面,做到了并发决策。因此,控制 bucket assign task 并发度就相对控制了写入小文件数量,在写入吞吐量和小文件之间的权衡。
3. 全量 + 增量 读取
介绍完数据写入过程,再看下数据读取的流读部分。流读的全量读和增量读是如何实现的?如图四所示,Hudi 中 TimeLine 保存每个事务提交的毫秒时间戳,每个时间戳会对应一个快照版本,会记录在元数据里面。全量读时会扫全表的文件,会把整个全表的文件扫描出来,当你没有配置内置的 Metadata 索引表时,会直接扫全表,把文件系统中所有的文件都找出来。如果启用了 Metadata 表,就会在 Metadata 表(KV 存储)里扫描这个文件信息,以相对比较高的效率扫描全表文件,然后发给下游,并且增量的部分会定期(默认 60s)监听扫描 TimeLine 观察有没有新的 commits,同步发给下游读写,每次增量的部分会基于上一次下发的时间线点位,然后一直查找到当前最新的 commit time。
Split mornitor 算子负责维护这样一套监听增量文件信息的规则,下发给真正执行读取的 task。
最近在 master 版本也支持了批模式的 TimeTravel 查询(某个时间段的点查),以前的版本虽然支持但是会有些问题,比如增量部分 meta 文件如果被 archive、或者被清理,数据完整性就没有保证。新版本在保证在读取效率前提下,通过实现两个快照、commit 之间的批模式增量读取方式应对这两个问题,保证数据完整度。
Hudi 应用场景
目前 Flink + Hudi 在国内已经是非常流行的技术架构,这边总结三个应用场景向大家介绍一下。
1. 近实时 DB 数据入仓/湖
这套架构的 DB 数据入湖入仓核心特色是把原来 T + 1 的数据新鲜度提升到分钟级别。数据新鲜度通过目前比较火的以 Debezium、Maxwell 为代表的 CDC(change Data Capture)技术实现。以 Streaming 近实时的方式同步到数仓里面。在传统的 Hive 数仓中想保证实时是非常困难的,尤其是文件更新,湖表实时写入更新,基本不可能实现。CDC 技术对数仓本身存储是有要求的,首先是更新效率得足够高,能够支持以 Streaming 方式写入,并且能够非常高效的更新。尤其是 CDC log 在更新过程还可能会乱序,如何保证这种乱序更新的 ACID 语义,是有很高要求的,当前能满足乱序更新的湖格式只有 Hudi 能做到,而且 Hudi 还考虑到了更新的效率问题,是目前来说比较先进的架构。
图五下方的方案相比上面的方案,比较适合目前体量比较大(每天增量能达到亿级别地)、数据平台比较健全的公司,中间有一套统一的数据同步方案(汇总不同源表数据同步至消息队列),消息队列承担了数据的容错、容灾、缓存功能。同时,这套方案的扩展性也更加好。通过 kafka 的 topic subscribe 方式,可以比较灵活地分发数据。
2. 近实时 OLAP
第二个场景是近实时的 OLAP 场景,分钟级别的端到端数据新鲜度,同时又非常开放的 OLAP 查询引擎可以适配。其实是对 kappa 架构或者是原先 Streaming 数仓架构的一套新解法。在没有这套架构之前,实时分析会跳过 Hudi 直接把数据双写到 OLAP 系统中,比如 ClickHouse、ES、MongoDB 等。当仓存储已经可以支持高效率分级别更新,能够对接 OLAP 引擎,那么这套架构就被大大简化,首先不用双写,一份数据就可以保证 only one truth 语义,避免双写带来数据完整性的问题。其次因为湖格式本身是非常开放的,在查询端引擎可以有更多选择,比如 Hudi 就支持 Presto、trino、Spark、Starrocks、以及云厂商的 redshift 引擎,会有非常高的灵活度。、
所以,这种近实时的 OLAP 架构,总结就是以下两点:①统一上游存储端;②开放下游查询端。
但这套架构的数据新鲜度大概是 5 分钟级别,如果要做到像 kappa 秒级别的架构的话,目前 Hudi 还是不太适合的,因为本身比较依赖 Flink 的 CheckPoint 机制(支持端到端的 exactly once 语义),所以不能做到高频次的提交。
3. 近实时 ETL
第三个场景是目前比较前沿的架构,在国内也慢慢开始尝试这套架构。当数据源数据体量本身不大的时候,比方说源头过来的并不是 kafka,可能源头只是一个 Mysql 的 binlog,QPS 每秒可能也就几百。那么这套架构是一个非常稳定且省事的架构,不光光是实现了这种端到端的增量处理,同时还解决中间数据入仓的需求。其实就是提供了两套抽象,首先承担了一个数仓中间存储的一个存储抽象,把数据直接以湖格式入仓;第二个是提供 Queue 能力,类似于 Kafka 这种消息队列的能力,可用 Streaming 方式增量消费,并且可以在其上做一些增量计算。就这一套架构直接统一原来的 Lambda 和 kappa 架构,就是 kafka 的存储抽象加数仓文件的存储抽象合并在一个存储抽象里面,同时没有增加过多的存储成本。大家可能后面用的都是对象存储或者是以廉价存储的形式存在的 HDFS。
整套架构解决了两个问题,第一是双写问题。在 Lambda 架构下,数据先写 kafka,然后入仓,保证这两份数据的一致性语义比较难。而且 kafka 开启 exactly once 写入后吞吐量会下降很多,Kafka 和 HDFS 之间的数据如何保证一致性呢?有人会理解去流读 kafka,把 kafka 数据再起一个 job 同步到 HDFS,这样计算资源、维护作业、同步成本都是原来的两倍。
第二个解决的问题是中间层查询需求。中间层数据直接入仓,并且不是以高效率方式更新,当需要对中间层 DWD 表做一些 join 操作时,可以直接和引擎对接,而不需要去考虑说 Lambda 架构 T+1 更新效率的问题。湖格式分钟级时效性很大程度缓解了这个问题。
除此之外,这套架构还有个好处,可以根据不同地应用场景选择丰富的 OLAP 查询引擎,直接以外表的方式接入库存储,很方便地进行 OLAP 分析。
4. 阿里云 VVP 实时入湖
接下来,简单讲一下目前阿里云 VVP 产品实时入湖的集成,主要还是入湖状态,阿里云内置 Flink 版本会有一个内置 Hudi connector,大家可以通过 FlinkSQL 方式快速构建入湖任务,直接写湖表,对接 Hudi CDC connector 或者对接 kafka CDC format,实现数据快速入湖。
并且在入湖过程中,提供了商业版特性,如 schema evolution。CE,CTAS 语法支持 schema evolution 然后同时我们会主推 DLF catalog 元数据管理组件,DLF catalog 会和 EMR 的 DLF 无缝集成,若是 EMR 通过 spark 写入,这边也可以看到,Flink 入湖任务写完之后也可以管理,通过 DLF 组件,可以直接通过 EMR 查询端引擎分析 Hudi 格式数据。
这是目前的一套推给商业化用户的技术方案,入湖通过 VVP 服务,分析通过 EMR,后期 VVP 可能会集成更多能力,如流批统一,满足用户的流读需求。
近期 Hudi RoadMap
如图九所示,我将简单介绍下近期 Hudi 0.12 版本以及 1.0 版本将会做的一些 feature。
首先,我们会推出类似于 Delta2.0 的 CDC Feed 功能,因为目前我们支持的 CDC 要求输入必须是一个 CDC,Hudi 会用 CDC 格式把它存下来。CDC Feed 的区别就是不用保证整体的输入是 CDC 格式,即使出现 absert 语义,或者 CDC 的中间数据有丢失,可以完整还原出 CDC 给主端。这个特性在读写吞吐量和资源上做些权衡,不会像目前这套架构处理 CDC 那么高效。
第二点是 Meta Service 服务。把元数据的管理插件化,通过统一的 Meta Service plugable 形式,统一管理 Hudi 上的表和任务。
第三点,我们目前还在规划做 Secondary Index 二级索引。因为在目前的 master 版本中,Flink Spark 都已实现 data skipping 能力(在写入时,如果用户开启 Meta Data 表,同时开启 data skipping,会额外记录每个 column 的统计信息),最典型的是每个 column 会建一个 Max, Min,开启一个元数据的加速,提升文件级别的查询效率。后续还会支持类似于数据库的二级索引,为某个专门的 column 实现类似于 LSM 的抽象索引,构建适用于点查场景的高效索引方案。
最后,后续我们还会做类似于特征工程的按列更新功能开发,类似于 Clickhouse 的 Merge Tree 抽象,独立存储某个 column。因为在机器学习的特征工程中大量特征需要成千上万个字段,每次生成一个特征都需要更新一个 column,所以要求单个 column 要具备高效的更新能力。为了适配这样的场景,Hudi 也会持续去探索。
答疑
Q1:直接使用 Hudi 存储更新,相比直接 CDC 到 Starrocks,这两种方案哪一个更好?感觉 Starrocks 的 QPS 应该会高于 Hudi 的更新速度。
A1:的确是这样,因为 Starrocks service 会使用类似于 LSM 的高效主键索引,并且内存里面做 partition 策略,维护比较多的二级索引,元数据信息。而且,最重要的一点就是在写入更新 main table 时会使用攒批的操作,先把多次写入汇入 buffer 然后进行一次 flash,并且在数据的 flush 上也可以攒批,这也是为什么 Starrocks 更新效率上更高的原因。
但同时,因为有了 server 集群,会带来两个问题,首先,会带来较高的运维成本,其次,内存模式相比于 Hudi 的 serverless 格式的开销会更大。
Hudi 格式在开放性上,对于 Starrocks 会有一定优势,不光可以对接 Starrocks、还可以对接 Presto、Spark 等主流 OLAP 引擎。
这就是他们的区别,两种方案的侧重点不同,还需要根据实际应用场景进行选择。如果只是做 OLAP 应用,Starrocks 更适合。但如果想构建数仓,使用 Starrocks 代替 Hudi 的成本应该太高了。因为 Hudi 面向的场景主要时数仓,迭代的主要时 Hive 传统数仓,更有优势。
Q2:流量数据入湖场景下,使用 MOR(Merge on Read)表,还是 COW(Copy on Write)表更合适?
A2:如果流量数据体量比较大,建议使用 MOR 表。以目前的实测方案,QPS 不超过两万,COW 表还是可以支撑的。超过两万之后,比较推荐 MOR online compaction 方式。如果 QPS 更高,那可能需要把压缩任务再独立出来,这是目前能给到的一个方案。
更多信息:
产品官网
[1] 数据湖构建 Data Lake Formation:https://www.aliyun.com/product/bigdata/dlf
[2] 开源大数据平台 EMR: https://www.aliyun.com/product/emapreduce
[3] 大数据知识图谱: https://developer.aliyun.com/learning/topic/bigdata
数据湖系列
[1] 数据湖揭秘—Delta Lake: https://developer.aliyun.com/article/909818
[2] 数据湖构建—如何构建湖上统一的数据权限: https://developer.aliyun.com/article/918403
[3] 关于 Data Lake 的概念、架构与应用场景介绍:https://developer.aliyun.com/article/944650
[4] 数据湖架构及概念简介:
https://developer.aliyun.com/article/1004847
[5] 数据湖统一元数据和权限
评论