Hudi 在 vivo 湖仓一体的落地实践
作者:vivo 互联网大数据团队 - Xu Yu
在增效降本的大背景下,vivo 大数据基础团队引入 Hudi 组件为公司业务部门湖仓加速的场景进行赋能。主要应用在流批同源、实时链路优化及宽表拼接等业务场景。
一、Hudi 基础能力及相关概念介绍
1.1 流批同源能力
与 Hive 不同,Hudi 数据在 Spark/Flink 写入后,下游可以继续使用 Spark/Flink 引擎以流读的形式实时读取数据。同一份 Hudi 数据源既可以批读也支持流读。
Flink、Hive、Spark 的流转批架构:
Hudi 流批同源架构:
1.2 COW 和 MOR 的概念
Hudi 支持 COW(Copy On Write)和 MOR(Merge On Read)两种类型:
(1)COW 写时拷贝:
每次更新的数据都会拷贝一份新的数据版本出来,用户通过最新或者指定 version 的可以进行数据查询。缺点是写入的时候往往会有写内存放大的情况,优点是查询不需要合并,直接读取效率相对比较高。JDK 中的 CopyOnWriteArrayList/CopyOnWriteArraySet 容器正是采用了 COW 思想。
COW 表的数据组织格式如下:
(2)MOR 读时合并:
每次更新或者插入新的数据时,并写入 parquet 文件,而是写入 Avro 格式的 log 文件中,数据按照 FileGroup 进行分组,每个 FileGroup 由 base 文件(parquet 文件)和若干 log 文件组成,每个 FileGroup 有单独的 FileGroupID;在读取的时候会在内存中将 base 文件和 log 文件进行合并,进而返回查询的数据。缺点是合并需要花费额外的合并时间,查询的效率受到影响;优点是写入的时候效率相较于 COW 快很多,一般用于要求数据快速写入的场景。
MOR 数据组织格式如下:
1.3 Hudi 的小文件治理方案
Hudi 表会针对 COW 和 MOR 表制定不同的文件合并方案,分别对应 Clustering 和 Compaction。
Clustering 顾名思义,就是将 COW 表中多个 FileGroup 下的 parquet 根据指定的数据大小重新编排合并为新的且文件体积更大的文件块。如下图所示:
Compaction 即 base parquet 文件与相同 FileGroup 下的其余 log 文件进行合并,生成最新版本的 base 文件。如下图所示:
1.4 周边引擎查询 Hudi 的原理
当前主流的 OLAP 引擎等都是从 HMS 中获取 Hudi 的分区元数据信息,从 InputFormat 属性中判断需要启动 HiveCatalog 还是 HudiCatalog,然后生成查询计划最终执行。当前 StarRocks、Presto 等引擎都支持以外表的形式对 Hudi 表进行查询。
1.5 Procedure 介绍
Hudi 支持多种 Procedure,即过程处理程序,用户可以通过这些 Procedure 方便快速的处理 Hudi 表的相关逻辑,比如 Compaction、Clustering、Clean 等相关处理逻辑,不需要进行编码,直接通过 sparksql 的语句来执行。
1.6 项目架构
1. 按时效性要求进行分类
秒级延迟:
分钟级延迟:
当前 Hudi 主要还是应用在准实时场景:
上游从 Kafka 以 append 模式接入 ods 的 cow 表,下游部分 dw 层业务根据流量大小选择不同类型的索引表,比如 bucket index 的 mor 表,在数据去重后进行 dw 构建,从而提供统一数据服务层给下游的实时和离线的业务,同时 ods 层和 dw 层统一以 insert overwrite 的方式进行分区级别的容灾保障,Timeline 上写入一个 replacecommit 的 instant,不会引发下游流量骤增,如下图所示:
1.7 线上达成能力
实时场景:
支持 1 亿条/min 量级准实时写入;流读延迟稳定在分钟级
离线场景:
支持千亿级别数据单批次离线写入;查询性能与查询 Hive 持平(部分线上任务较查询 Hive 提高 20%以上)
小文件治理:
95%以上的合并任务单次执行控制在 10min 内完成
二、组件能力优化
2.1 组件版本
当前线上所有 Hudi 的版本已从 0.12 升级到 0.14,主要考虑到 0.14 版本的组件能力更加完备,且与社区前沿动态保持一致。
2.2 流计算场景
1. 限流
数据积压严重的情况下,默认情况会消费所有未消费的 commits,往往因消费的 commits 数目过大,导致任务频繁 OOM,影响任务稳定性;优化后每次用户可以摄取指定数目的 commits,很大程度上避免任务 OOM,提高了任务稳定性。
2. 外置 clean 算子
避免单并行度的 clean 算子最终阶段影响数据实时写入的性能;将 clean 单独剥离到
compaction/clustering 执行。这样的好处是单个 clean 算子,不会因为其生成 clean 计划和执行导致局部某些 Taskmanager 出现热点的问题,极大程度提升了实时任务稳定性。
3. JM 内存优化
部分大流量场景中,尽管已经对 Hudi 进行了最大程度的调优,但是 JM 的内存仍然在较高水位波动,还是会间隔性出现内存溢出影响稳定性。这种情况下我们尝试对 state.backend.fs.memory-threshold 参数进行调整;从默认的 20KB 调整到 1KB,JM 内存显著下降;同时运行至今 state 相关数据未产生小文件影响。
2.3 批计算场景
1. Bucket index 下的 BulkInsert 优化
0.14 版本后支持了 bucket 表的 bulkinsert,实际使用过程中发现分区数很大的情况下,写入延迟耗时与计算资源消耗较高;分析后主要是打开的句柄数较多,不断 CPU IO 频繁切换影响写入性能。
因此在 hudi 内核进行了优化,主要是基于 partition path 和 bucket id 组合进行预排序,并提前关闭空闲写入句柄,进而优化 cpu 资源使用率。
这样原先 50 分钟的任务能降低到 30 分钟以内,数据写入性能提高约 30% ~ 40%。
优化前:
优化后:
2. 查询优化
0.14 版本中,部分情况下分区裁剪会失效,从而导致条件查询往往会扫描不相关的分区,在分区数庞大的情况下,会导致 driver OOM,对此问题进行了修复,提高了查询任务的速度和稳定性。
eg:select * from `hudi_test`.`tmp_hudi_test` where day='2023-11-20' and hour=23;
(其中 tmp_hudi_test 是一张按日期和小时二级分区的表)
修复前:
修复后:
优化后不仅包括减少分区的扫描数目,也减少了一些无效文件 RPC 的 stage。
3. 多种 OLAP 引擎支持
此外,为了提高 MOR 表管理的效率,我们禁止了 RO/RT 表的生成;同时修复了原表的元数据不能正常同步到 HMS 的缺陷(这种情况下,OLAP 引擎例如 Presto、StarRocks 查询原表数据默认仅支持对 RO/RT 表的查询,原表查询为空结果)。
2.4 小文件合并
1. 序列化问题修复
0.14 版本 Hudi 在文件合并场景中,Compaction 的性能相较 0.12 版本有 30%左右的资源优化,比如:原先 0.12 需要 6G 资源才能正常启动单个 executor 的场景下,0.14 版本 4G 就可以启动并稳定执行任务;但是 clustering 存在因 TypedProperties 重复序列化导致的性能缺陷。完善后,clustering 的性能得到 30%以上的提升。
可以从 executor 的修复前后的火焰图进行比对。
修复前:
修复后:
2. 分批 compaction/clustering
compaction/clustering 默认不支持按 commits 数分批次执行,为了更好的兼容平台调度能力,对 compaction/clustering 相关 procedure 进行了改进,支持按批次执行。
同时对其他部分 procedure 也进行了优化,比如 copy_to_table 支持了列裁剪拷贝、delete_procedures 支持了批量执行等,降低 sparksql 的执行时间。
3. clean 优化
Hudi0.14 在多分区表的场景下 clean 的时候很容易 OOM,主要是因为构建 HoodieTableFileSystemView 的时候需要频繁访问 TimelineServer,因产生大量分区信息请求对象导致内存溢出。具体情况如下:
对此我们对 partition request Job 做了相关优化,将多个 task 分为多个 batch 来执行,降低对 TimelineSever 的内存压力,同时增加了请求前的缓存判断,如果已经缓存的将不会发起请求。
改造后如下:
此外实际情况下还可以在 FileSystemViewManager 构建过程中将 remoteview 和 secondview 的顺序互调,绝大部分场景下也能避免 clean oom 的问题,直接优先从 secondview 中获取分区信息即可。
2.5 生命周期管理
当前计算平台支持用户表级别生命周期设置,为了提高删除的效率,我们设计实现了直接从目录对数据进行删除的方案,这样的收益有:
降低了元数据交互时间,执行时间快;
无须加锁、无须停止任务;
不会影响后续 compaction/clustering 相关任务执行(比如执行合并的时候不会报文件不存在等异常)。
删除前会对 compaction/clustering 等 instants 的元数据信息进行扫描,经过合法性判断后区分用户需要删除的目录是否存在其中,如果有就保存;否则直接删除。流程如下:
三、总结
我们分别在流批场景、小文件治理、生命周期管理等方向做了相关优化,上线后的收益主要体现这四个方向:
部分实时链路可以进行合并,降低了计算和存储资源成本;
基于 watermark 有效识别分区写入的完成度,接入湖仓的后续离线任务平均 SLA 提前时间不低于 60 分钟;
部分流转批后的任务上线后执行时间减少约 40%(比如原先执行需要 150 秒的任务可以缩短到 100 秒左右完成 ;
离线增量更新场景,部分任务相较于原先 Hive 任务可以下降 30%以上的计算资源。
同时跟进用户实际使用情况,发现了一些有待优化的问题:
Hudi 生成文件的体积相较于原先 Hive,体积偏大(平均有 1.3 ~ 1.4 的比例);
流读的指标不够准确;
Hive—>Hudi 迁移需要有一定的学习成本;
针对上述问题,我们也做了如下后续计划:
对 hoodie parquet 索引文件进行精简优化,此外业务上对主键的重新设计也会直接影响到文件体积大小;
部分流读的指标不准,我们已经完成初步的指标修复,后续需要补充更多实时的任务指标来提高用户体验;
完善 Hudi 迁移流程,提供更快更简洁的迁移工具,此外也会向更多的业务推广 Hudi 组件,进一步挖掘 Hudi 组件的潜在使用价值。
版权声明: 本文为 InfoQ 作者【vivo互联网技术】的原创文章。
原文链接:【http://xie.infoq.cn/article/7d82922d27def5d9a183fda69】。文章转载请联系作者。
评论