Iceberg 在图灵落地应用

导读
百度 MEG 上一代大数据产品存在平台分散、易用性差等问题,导致开发效率低下、学习成本高,业务需求响应迟缓。为了解决这些问题,百度 MEG 内部开发了图灵 3.0 生态系统,包括 Turing Data Engine(TDE)计算 &存储引擎、Turing Data Studio(TDS)数据开发治理平台和 Turing Data Analysis(TDA)可视化 BI 产品。依托图灵 3.0 生态,我们引入了数据湖表格式:Apache Iceberg,利用其特性并在多种业务场景下进行优化实践,解决图灵数仓业务实时数据入湖,数据表历史记录更新效率低等多个痛点问题。
01 背景
1.1 图灵 3.0 生态概述
由于百度 MEG 上一代大数据产品存在平台多、易用性差及数据流转繁琐等问题。这些问题导致开发人员研发效率低及多平台间高昂的学习成本;业务部门的感知则是需求交付迟缓、数据产出延迟及数据质量低等问题。为了解决上述问题,我们构建了新一代大数据解决方案——"图灵 3.0",旨在覆盖数据全生命周期,支持全链路数据操作,提供高效敏捷且统一的强大数据生态系统,其中包括数据计算引擎、数据开发和数据分析三个核心部分:
1. TDE(Turing Data Engine):图灵生态的计算引擎,包含基于 Hive、Iceberg 进行数据处理的 Spark 和ClickHouse高性能计算引擎。
2. TDS(Turing Data Studio):一站式数据开发治理平台。
3. TDA(Turing Data Analysis):新一代可视化BI产品。
本文主要介绍数据湖表格式 Iceberg 在图灵 3.0 生态下的应用与实践。

△图灵 3.0 生态产品
1.2 问题
MEG 数据中台基于 Hive 构建了离线数据仓库,已支持手百,搜索,商业,贴吧,小说,用增架构,销售等多个业务需求,但随着业务的发展,业务对数据的实时性以及查询性能等有更高要求,当前主要存在以下几个问题:
1. 商业、电商、销售等业务,周期性地更新行业等信息,单次更新数据量占比小、字段少,但是基于 Hive 的数据更新(以下简称:数据回溯)只能通过全量覆盖写的方式实现,数据回溯周期长、效率低、成本高。
2. 由于 Hive 在实时数据更新以及事务支持上存在一定局限性,无法有效满足业务构建实时数仓的需求。
3. 在处理大规模数据集上,Hive 的查询性能受到如元数据的加载解析以及每次访问数据都需通过分布式文件系统 listFile 遍历文件列表等问题的影响,导致性能降低。
基于上述问题,我们通过技术调研,最终引入了开源的数据湖表格式 Iceberg,构建数据湖存储服务,并借助大数据生态的 Spark、Flink 等计算引擎来实现数据湖的分析,将其无缝集成到图灵生态中,帮助业务提效降本,构建更快速、更高效、更低成本的数据中台产品。
1.3 Hive 和 Iceberg 对比
Hive 作为一个基于 Hadoop 生态系统的开源数据仓库工具,主要用于对大规模结构化数据进行存储、查询和分析。而 Iceberg 作为新一代数据湖表格式,提供了类似传统数据库的事务性,保证和数据一致性,并支持复杂的数据操作,如行级更新和删除等,更加适合实时更新,流批一体数据场景,下表列出 Hive 和 Iceberg 一些主要特性对比:

1.4 Iceberg 的组织结构
Iceberg 文件组织分为元数据层和数据层,主要包含 version-hint,metadata file、snapshot file、manifest file 和 data file 文件类型,具体如下:
metadata 元数据层
a. version-hint:该文件作为元数据索引初始文件,记录了 Iceberg 表的版本号,通过版本号找到对应的 metadata file。
b. metadata file:记录了 Iceberg 表的 schemas、properties 以及快照等信息。
c. snapshot file(manifest-list):每次数据 commit 会生成一个新的快照,保存了该快照下每个 manifest file 路径及对应的分区范围。
d. manifest file:记录数据文件元信息,包含每个数据文件的路径、文件的大小等一系列统计信息(如文件每列的最大最小值、空值数等),实现元数据和数据文件的关联。
data 数据层
data file:实际的数据文件,以 parquet 等列存格式存储数据。

△Iceberg 表结构

△Iceberg 文件组织结构
通过上述 Iceberg 元数据文件组织结构,Iceberg 实现了文件级的元信息统计及版本化管理。
02 Iceberg 能力建设与应用
2.1 图灵生态能力适配
2.1.1 统一元数据服务
由于原生 iceberg 缺少元数据的可视化管理能力,我们通过构建统一的元数据微服务,将 Iceberg 表和 Hive 表元数据进行管理,对应用层提供相关表和分区的增删改查等接口,统一数据存储的元数据操作入口。
该微服务主要包含常驻 SparkSession 模块,EngineMetaService 模块和元数据模块,通过将 SparkSession 常驻,为用户提供 Iceberg 表和 Hive 表元数据和分区数据的增删改查功能,以及可视化的元数据管理界面。

△统一元数据服务架构
2.1.2 打通 Iceberg 和 Hive 联邦查询
为了兼容历史业务存量 Hive 表,同时降低用户使用 Iceberg 的成本。我们在计算引擎层面打通 Iceberg 和 Hive 联邦查询能力,并保证了 Iceberg 表与原有方式语法一致。
通常在一条 SQL 执行过程中,主要可简化以下 Parse、Analyzer、Optimizer、CBO 四个流程。通过在 Analyzer 和 Plan 阶段进行改进优化,来打通 Iceberg 和 Hive 表联邦查询。
Analyzer 阶段:该阶段主要是将 spark 未解析的逻辑计划进行解析,我们通过对 SparkSessionCatalog 加载方式改造,优先加载 iceberg 表使用的 catalog 类型,如果用户 SQL 使用的是 Iceberg 表,则对应会使用 IcebergCatalog 和 iceberg 数据源访问,否则使用 SessionCatalog 与 Hive 数据源访问。
Optimizer 阶段:为加强数据安全管理,我们进一步打通 Iceberg 表鉴权能力,在基于逻辑计划生成物理计划阶段,解析注入表、字段信息以及表操作类型规则,并与公司内数管平台交互,实现对 Iceberg 表和字段的鉴权

△Iceberg 和 Hive 联邦查询适配流程
2.2 存量 Hive 低成本迁移 Iceberg
现有数仓业务数据主要存储于 Hive 表,为支持业务快速切换 Iceberg 应用新技术,我们建设了存量 Hive 表低成本迁移至 Iceberg 表的能力。
以下是在实践过程中的两种迁移方案对比:
方式 1:使用 Iceberg 功能 migrate 进行原地迁移,通过社区提供的 CALL migrate 语法,直接执行如下示例的 SQL 语句,即可将 Hive 表升级为 Iceberg 表。
该方案操作简单且可回滚,但这种方式在图灵生态落地过程中也存在一些问题:
该方式会基于原 Hive 表的数据信息构建 Iceberg 元数据信息,并将原 Hive 表名重命名为 sample_backup_,同时数据路径也进行重命名。
下游无法读:在执行迁移过程中,原 Hive 表对应的路径已经被重命名,进而导致下游业务无法正常读取正在迁移中的表。
多表挂载冲突:在业务的使用场景中,存在同一份物理数据被多个 Hive 表挂载可能,直接修改路径会导致其他表失效。
方式 2:基于上述问题,我们进一步对现有方案进行优化,不改变 Hive 表原有的数据路径,来实现 Hive 低成本迁移 Iceberg,具体流程如下:
构建 Iceberg 元数据:直接复用 Hive 的分区数据,新建同名的 Iceberg 表,并重建 Iceberg 元数据,最终新 Iceberg 表的元数据信息实际指向是 Hive 分区数据存储位置。
数据校验:当 Iceberg 元数据构建完成后,查询 Iceberg 表中字段数据,和迁移之前 Hive 表字段数据,进行一致性校验,验证迁移是否符合预期。
读写切换:数据校验完成后,我们只需要将对应表的属性更新为 Iceberg。因为我们已经打通了 Iceberg 和 Hive 的查询,且迁移后表名未变,业务可正常使用原有表名及语法进行查询和写入,降低迁移成本。

△Hive 迁移 Iceberg 整体实现流程
2.3 Iceberg 在图灵的应用和性能优化
2.3.1 图灵实时数仓应用
在图灵数仓大部分场景中,用户主要依托天级或小时级运行的离线 Spark 任务来完成数据入仓。在这种模式下,难以满足部分对数据实时性要求较高的需求。
为解决该问题,我们基于 Iceberg+Flink 构建的图灵实时湖仓架构,整体重构流程如下图所示。该架构模式实现了数据分钟级别实时入仓,显著提升了数据入仓的时效性。进一步扩展了整个图灵的应用场景。
针对数据分析和 case 排查等场景,业务可基于图灵常驻计算引擎进行实时查询,快速获取所需要的数据支持业务分析决策;
针对策略迭代、特征生产以及机器学习等复杂计算场景,可基于 spark 例行任务进行加工生产;
针对策略数据调研分析、科学计算等复杂场景通过数据交互计算引擎 Jupyter 进行数据计算。通过构建图灵实时湖仓架构,既保证了数据分析的时效性又兼顾了复杂计算任务的处理能力,有效提升了业务的数据处理效率和分析决策能力。

△图灵实时湖仓架构演变
2.3.2 行级更新策略
在图灵数仓业务场景下,商业、搜索、电商、销售等业务,周期性地更新行业等信息。而 Hive 在该场景下支持相对较弱,需通过全量覆盖写方式刷新数据,这种方式在大数据量场景下,回溯数据周期长,消耗资源大,所需要的人力时间成本也高。我们通过利用 Iceberg 行级更新的特性,基于 update、merge into 等方式回溯进行字段变更,能够很大程度的提高回溯效率,降低资源和人力成本。
针对数据行级更新,Iceberg 提供了两种策略,分别为 COW(Copy on Write: 写时复制) 或 MOR (Merge on Read:读时合并),其中 MOR 根据其标记删除文件的区别又细分了两种方式(Equality Delete File 和 Position Delete File)。

关于 COW 和 MOR 更新策略的文件表现形式如下图所示,我们针对不同场景采用不同更新策略:
对于日常数据查询分析场景,小时级 &天级离线例行生成加工场景,由于查询次数会远多于数据更新次数,可默认采用 COW 策略;
针对一些业务更新少量字段进行长周期回溯场景,以及实时场景,写入频繁,通过使用 MOR 策略,来支持用户进行数据回溯变更字段信息,以提升数据更新效率并节省资源。

△COW 和 MOR 两种更新策略对比

△MOR 两种删除文件类型 &更新字段示例
在业务进行数据回溯应用过程中,我们采用 MOR(Position Delete File)进行行级数据更新,通过原 Hive 回溯和新 Iceberg 回溯两种方式对比,在一天 24 小时不同分区上,验证了 Hive 和 Iceberg 新旧的回溯效率,如下图所示,业务回溯效率整体可平均提升 50%+;进一步地对比单次回溯一年数据消耗的计算资源量对比,平均整体降低 70%+的计算资源消耗,整体上极大提升回溯效率,并降低资源成本。

△ Hive 和 Iceberg 回溯效率对比
2.3.3 Iceberg 表生命周期管理和性能优化
在 Iceberg 应用实践的过程中,针对不同业务场景遇到的问题,我们汇总如下:
小文件过多:在实时湖仓业务场景,为了要保证数据的时效性,通常是分钟级别的 commit 操作,在这种场景下,单个作业执行一天,则需要 1440 个 commit,如果执行时间更长,则会产生更多的 commit,随着时间的累积,元数据以及数据文件等都会产生大量的小文件,对于整体查询的性能会产生一定的影响。
存储资源增加:如果 iceberg 表的快照不及时进行清理,可能会造成数据存储增加,导致存储账号资源紧张。
缺乏分区数据统一管理:在一些业务场景,只需要保存一定天数的分区数据,针对无用数据需要进行删除处理。
数据文件组织不均衡且无序:由于表数据写入是随机无序,且针对表数据文件大小会存在不均衡的情况。
针对上述问题,我们通过对 Iceberg 表进行全生命周期管理,并结合 Iceberg 特性优化表查询性能,保障整个数据链路的稳定性,整体框架如下图所示:

△Iceberg 表生命周期管理和性能优化流程
以上流程主要包含表数据生命周期管理和表性能优化两部分。
一方面,对于表数据生命周期管理,我们通过在线服务执行定时任务,来实现对表数据和元数据进行全生命周期监控,具体如下:
数据分区过期:基于用户配置的表生命周期,进行分区数据删除,保证数据文件按期清理。
元数据快照清理:为用户提供按照时间维度天级别和按照个数维度小时级别两种快照过期策略,精细化元数据快照过期处理,实现存储资源的高效利用。
元数据孤儿文件清理:通过天级例行任务来触发清理由于计算引擎执行任务失败等情况产生的一些没有被引用的孤儿文件,避免元数据累积影响性能。
另一方面,在表性能优化方面,我们结合图灵数仓表使用情况,并基于 Iceberg 原生特性,为用户在平台侧提供 Iceberg 表优化算子(如下图示例),主要包含以下两种能力:
小文件合并:通过制定合并文件大小,对表数据文件进行重写合并,避免产生大量小文件。
z-order 排序优化:实现对表相关字段进行重排序,提升查询性能。

△Iceberg 表优化算子任务创建示例
我们通过对 Iceberg 表整体的生命周期管理,实现了数据和元数据的统一治理,表元数据小文件数万个降低到数百级别,合理控制了元数据产生的数量,并解决了数据频繁回溯场景下存储快速增加的问题。而在表查询优化方面,通过在一些表的数据重分布和字段重排序应用,在部分业务表查询性能提速 50%。
03 未来规划
Iceberg 作为图灵 3.0 生态中的重要组成部分,基于其高时效性、行级更新能力、小文件合并以及 Z-order 等成体系的数据优化的技术解决方案,为 MEG 数据中台业务提供构建湖仓一体,解决数据回溯等痛点问题的能力。目前 Iceberg 的应用已覆盖搜索,商业,销售,用增架构等多个业务线,通过低成本助力业务将存量 Hive 迁移 Iceberg 表,为业务提供高性能数据查询,同时实现对业务的降本增效。此外,我们也在不断完善 Iceberg 数据存储引擎的各项能力,包含表数据智能治理、查询优化、智能索引以及特定场景的性能问题等,并不断扩大 Iceberg 的业务覆盖范围。
评论