MaxCompute 近实时增全量处理一体化新架构和使用场景介绍
随着当前数据处理业务场景日趋复杂,对于大数据处理平台基础架构的能力要求也越来越高,既要求数据湖的大存储能力,也要求具备海量数据高效批处理能力,同时还可能对延时敏感的近实时链路有强需求,本文主要介基于 MaxCompute 的离线近实时一体化新架构如何来支持这些综合的业务场景,提供近实时增全量一体的数据存储和计算(Transaction Table2.0)解决方案。
业务背景与现状
当前典型的数据处理业务场景中,对于时效性要求低的大规模数据全量批处理的单一场景,直接使用 MaxCompute 足以很好的满足业务需求。但随着 MaxCompute 承载的业务无论是规模,还是使用场景,都越来越丰富,在处理好大规模离线批处理链路的同时,用户对近实时和增量处理链路也有很多的需求,下图展示了部分业务场景。
比如近实时数据导入链路,依赖平台引擎具备事务隔离,小文件自动合并等能力,又比如增全量数据合并链路,还依赖增量数据存储和读写,主键等能力。MaxCompute 以前不具备新架构能力之前,要支持这些复杂的综合业务场景,只能通过下图所示的三种解决方案,但无论使用单一引擎或者联邦多引擎都存在一些无法解决的痛点。
方案一,只使用单一的 MaxCompute 离线批处理解决方案,对于近实时链路或者增量处理链路通常需要转化成 T+1 的批处理链路,会一定程度上增加业务逻辑复杂度,且时效性也较差,存储成本也可能较高。
方案二,只使用单一的实时引擎,那资源成本会较高,性价比较低,且对于大规模数据批处理链路的稳定性和灵活性也存在一些瓶颈。
方案三,使用典型的 Lambda 架构,全量批处理使用 MaxCompute 链路,时效性要求比较高的增量处理使用实时引擎链路,但该架构也存在大家所熟知的一些固有缺陷,比如多套处理和存储引擎引发的数据不一致问题,多份数据冗余存储和计算引入的额外成本,架构复杂以及开发周期长等问题。
这些解决方案在成本,易用性,低延时,高吞吐等方面互相制约,很难同时具备较好的效果,这也驱动着 MaxCompute 有必要开发新的架构既能满足这些业务场景需求,也能提供较低的成本和较好的用户体验。
近几年在大数据开源生态中,针对这些问题已经形成了一些典型的解决方案,最流行的就是 Spark/Flink/Trino 开源数据处理引擎,深度集成 Hudi / Delta Lake / Iceberg / Paimon 开源数据湖,践行开放统一的计算引擎和统一的数据存储思想来提供解决方案,解决 Lamdba 架构带来的一系列问题。同时 MaxCompute 近一年多在离线批处理计算引擎架构上,自研设计了离线 &近实时数仓一体化架构,在保持经济高效的批处理优势下,同时具备分钟级的增量数据读写和处理的业务需求,另外,还可提供 Upsert,Time travel 等一系列实用功能来扩展业务场景,可有效地节省数据计算,存储和迁移成本,切实提高用户体验。
离线 &近实时增全量一体化业务架构
上图所示即为 MaxCompute 高效支持上述综合业务场景的全新业务架构
写入端会融合多种数据集成工具将丰富的数据源近实时增量或批量导入到统一的 MaxCompute 表存储中,存储引擎的表数据管理服务会自动优化编排数据存储结构来治理小文件等问题;使用统一的计算引擎支持近实时增量和大规模离线批量分析处理链路;由统一的元数据服务支持事务机制和海量文件元数据管理。统一的新架构带来的优势也是非常显著,可有效解决纯离线系统处理增量数据导致的冗余计算和存储、时效低等问题,也能避免实时系统高昂的资源消耗成本,同时可消除 Lambda 架构多套系统的不一致问题,减少冗余多份存储成本以及系统间的数据迁移成本。
简言之,一体化新架构既可以满足增量处理链路的计算存储优化以及分钟级的时效性,又能保证批处理的整体高效性,还能有效节省资源使用成本。
目前新架构已支持了部分核心能力,包括主键表,Upsert 实时写入,Time travel 查询,增量查询,SQL DML 操作,表数据自动治理优化等,更详细的架构原理和相关操作指导请参考↓官网架构原理和用户操作文档。
业务场景实践
本章节重点介绍新架构如何支持一些典型的业务链路以及产生的优化效果。
1. 表存储和数据治理优化
本章节主要介绍建表操作和关键表属性的含义,以及根据业务场景如何设置表属性值以达到最佳效果,也会简单描述一下存储引擎后台如何自动优化表数据。
▶ 建表
首先,一体化新架构需要设计统一的表格式来存储不同格式的数据以支撑不同业务场景的数据读写,这里称为 Transaction Table2.0,简称 TT2,可以同时支持既有的批处理链路,以及近实时增量等新链路的所有功能。
建表语法参考官网,简单示例:
只需要设置主键 Primary Key(PK),以及表属性 transactional 为 true,就可以创建一张 TT2。PK 用来保障数据行的 unique 属性,transactional 属性用来配置 ACID 事务机制,满足读写快照隔离。
▶ 关键的表属性配置
详细属性配置参考官网,简单示例:
表属性: write.bucket.num
此属性非常重要,表示每个 partition 或者非分区表的分桶数量,默认值为 16,所有写入的记录会根据 PK 值对数据进行分桶存储,相同 PK 值的记录会落在同一个桶中。非分区表不支持修改,分区表可修改,但只有新分区生效。
数据写入和查询的并发度可通过 bucket 数量来水平扩展,每个并发可至少处理一个桶数据。但桶数量并不是越多越好,对于每个数据文件只会归属一个桶,因此桶数量越多,越容易产生更多的小文件,进一步可能增加存储成本和压力,以及读取效率。因此需要结合数据写入的吞吐,延时,总数据的大小,分区数,以及读取延时来整体评估合理的桶数量。
此外,数据分桶存储也非常有助于提升点查场景性能,如果查询语句的过滤条件为具体的 PK 值,那查询时可进行高效的桶裁剪和数据文件裁剪,极大减少查询的数据量。
评估桶数量建议
对于非分区表,如果数据量小于 1G,桶数量建议设置为 4-16; 如果总数据量大于 1G,建议按照 128M-256M 作为一个桶数据的大小,如果希望查询的并发度更多的话,可以进一步调小桶数据量大小; 如果总数据量大于 1T,建议按照 500M-1G 作为一个桶数据的大小; 但目前能够设置的最大桶数量是 4096,因此对于更大的数据量,单个桶的数据量也只能越来越大,会进一步影响查询效率,后续平台也会考虑是否可放开更大的限制。
对于分区表,设置的桶数量是针对每个分区的,并且每个分区的桶数量可以不同。每个分区的桶数量设置原则可以参考上面非分区表的配置建议。对于存在海量分区的表,并且每个分区的数据量又较小的话,比如在几十 M 以内,建议每个分区的桶数量尽可能少,配置在 1-2 个即可,避免产生过多的小文件。
表属性: acid.data.retain.hours
此属性也很重要,代表 time travel 查询时可以读取的历史数据实践范围,默认值是 1 天,最大支持 7 天。
建议用户按真实的业务场景需求来设置合理的时间周期,设置的时间越长,保存的历史数据越多,产生的存储费用就越多,而且也会一定程度上影响查询效率,如果用户不需要 time travel 查询历史数据,建议此属性值设置为 0,代表关掉 time travel 功能,这样可以有效节省数据历史状态的存储成本。
▶ Schema Evolution 操作
TT2 支持完整的 Schema Evolution 操作,包括增加和删除列。在 time travel 查询历史数据时,会根据历史数据的 Schema 来读取数据。另外 PK 列不支持修改。
详细 DDL 语法参考官网,简单示例:
▶ 表数据自动治理优化
存在的问题
TT2 典型场景之一是支持分钟级近实时增量数据导入,因此可能导致增量小文件数量膨胀,尤其是桶数量较大的情况,从而引发存储访问压力大、成本高,数据读写 IO 效率低下,文件元数据分析慢等问题,如果 Update/Delete 格式的数据较多,也会造成数据中间状态的冗余记录较多,进一步增加存储和计算的成本,查询效率降低等问题。
为此,后台存储引擎配套支持了合理高效的表数据服务对存储数据进行自动治理和优化,降低存储和计算成本,提升分析处理性能。
表数据组织格式
如上图所示,展示了分区表的数据结构,先按照分区对数据文件进行物理隔离,不同分区的数据在不同的目录之下; 每个分区内的数据按照桶数量来切分数据,每个桶的数据文件单独存放; 每个桶内的数据文件类型主要分成三种:
Delta Data File:每次事务写入或者小文件合并后生成的增量数据文件,会保存每行记录的中间历史状态,用于满足近实时增量读写需求。
Compacted Data File:Delta File 经过 Compact 执行生成的数据文件,会消除数据记录的中间历史状态,PK 值相同的记录只会保留一行,按照列式压缩存储,用来支撑高效的数据查询需求。
Delta CDC Log: 按照时序存储的 CDC 格式增量日志 (目前还未对外推出)。
数据自动治理优化
如上图所示,TT2 的表数据服务主要分成 Auto Sort / Auto Merge / Auto Compact / Auto Clean 四种,用户无需主动配置,存储引擎后台服务会智能的自动收集各个维度的数据信息,配置合理的策略自动执行。
Auto Sort: 自动将实时写入的行存 avro 文件转换成 aliorc 列存文件,节省存储成本和提升读取效率。
Auto Merge: 自动合并小文件,解决小文件数量膨胀引发的各种问题。主要策略是周期性地根据数据文件大小/文件数量/写入时序等多个维度进行综合分析,进行分层次的合并。但它并不会消除任何一条记录的中间历史状态,主要用于 time travel 查询历史数据。
Auto Partial Compact: 自动合并文件并消除记录的历史状态,降低 update/delete 记录过多带来的额外存储成本,以及提升读取效率。主要策略是周期性地根据增量的数据大小/写入时序/time travel 时间等多个维度进行综合分析来执行 compact 操作。该操作只针对超过 time travel 可查询时间范围的历史记录进行 compact。
Auto Clean: 自动清理无效文件,节省存储成本。Auto Sort / Auto Merge / Auto Partial Compact 操作执行完成后,会生成新的数据文件,所以老的数据文件其实没什么作用了,会被即时自动删除,及时节省存储成本。
如果用户对于查询性能的要求非常高,也可尝试手动执行全量数据的 major compact 操作,每个桶的所有数据会消除所有的历史状态,并且额外生成一个新的 Aliorc 列存数据文件,用于高效查询,但也会产生额外的执行成本,以及新文件的存储成本,因此非必要尽量不执行。
详细语法参考官网,简单示例:
2. 数据写入场景业务实践
本章节主要介绍部分典型的写入场景业务实践。
▶ 分钟级近实时 Upsert 写入链路
MaxCompute 离线架构一般在小时或天级别批量导入增量数据到一张新表或者新分区中,然后配置对应的离线 ETL 处理链路,将增量数据和存量表数据执行 Join Merge 操作,生成最新的全量数据,此离线链路的延时较长,计算和存储也会消耗一定的成本。
使用新架构的 upsert 实时导入链路基本可以保持数据从写入到查询可见的延时在 5-10 分钟,满足分钟级近实时业务需求,并且不需要复杂的 ETL 链路来进行增全量的 Merge 操作,节省相应的计算和存储成本。
实际业务数据处理场景中,涉及的数据源丰富多样,可能存在数据库、日志系统或者其他消息队列等系统,为了方便用户数据写入 TT2, MaxCompute 深度定制开发了开源Flink Connector工具,针对高并发、容错、事务提交等场景做了定制化的设计及开发优化,以满足延时低、正确性高等要求,同时也能很好的对接融合 Flink 生态。具体使用细节可以参考官网产品说明
上图简单展示了整体写入的流程,可总结如下主要关键点:
基本大部份可融合 flink 生态的引擎或者工具都可通过 flink 任务,结合 MaxCompute flink connector 实时写入数据进 TT2 表。
写入并发可以横向扩展,满足低延时高吞吐需求。写入流量吞吐跟 flink sink 并发数,TT2 桶数量等参数配置相关,可根据各自的业务场景进行合理配置。特别说明,针对 TT2 桶数量配置为 Flink sink 并发数的整数倍的场景,系统进行了高效优化,写入性能最佳。
满足数据分钟级可见,支持读写快照隔离
结合 Flink 的 Checkpoint 机制处理容错场景,保障 exactly_once 语义。
支持上千分区同时写入,满足海量分区并发写入场景需求。
流量吞吐上限可参考单个桶 1MB/s 的处理能力进行评估,不同环境不同配置都可能影响吞吐。如果对写入延时比较敏感,需要相对稳定的吞吐量,可考虑申请独享的数据传输资源,但需要额外收费。如果默认使用共享的公共数据传输服务资源组的话,在资源竞抢严重的情况下,可能保障不了稳定的写入吞吐量,并且可使用的资源量也有上限。
▶ 部分列增量更新链路
该链路可用来优化将多张增量表的数据列拼接到一张大宽表的场景,比较类似多流 join 的业务场景。
如上图所示,左边展示了 MaxCompute 的离线 ETL 链路处理此类场景,将多张增量表按照比较固定的时间来对齐数据,通常小时/天级别,然后触发一个 join 任务,把所有表的数据列拼接起来生成大宽表,如果有存量数据,还需要执行类似 upsert 的 ETL 链路。因此整体 ETL 链路延时较长,流程复杂,也比较消耗计算和存储资源,数据也容易遇到无法对齐的场景。
右边展示了通过 TT2 表支持部分列更新的能力,只需要将各个表的数据列实时增量更新到 TT2 大宽表中即可,TT2 表的后台 Compact 服务以及查询时,会自动把相同 PK 值的数据行拼接成一行数据。该链路基本完全解决了离线链路遇到的问题,延时从小时/天级别降低到分钟级,而且链路简单,几乎是 ZeroETL,也能成倍节省计算和存储成本。
目前支持以下两种方式进行部分列更新,功能还在灰度上线中,还未发布到官网(预计两个月内在公共云发布)。
通过 SQL Insert 进行增量写入部分列:
通过 Flink Connector 实时写入部分列。
▶ SQL DML / Upsert 批处理链路
为了方便用户操作 TT2 表,MaxCompute 计算引擎对 SQL 全套的数据查询 DQL 语法和数据操作 DML 语法进行了支持,保障离线链路的高可用和良好的用户体验。SQL 引擎的内核模块包括 Compiler、Optimizer、Runtime 等都做了专门适配开发以支持相关功能和优化,包括特定语法的解析,特定算子的 Plan 优化,针对 pk 列的去重逻辑,以及 runtime upsert 并发写入等。
数据处理完成之后,会由 Meta Service 来执行事务冲突检测,原子更新数据文件元信息等,保障读写隔离和事务一致性。
SQL DML 具体语法可参考官网文档,对于 Insert / Update / Delete / Merge Into 都有详细的介绍和示例。
对于 Upsert 批式写入能力,由于 TT2 表后台服务或者查询时会自动根据 PK 值来合并记录,因此对于 Insert + Update 场景,不需要使用复杂的 Update/Merge Into 语法,可统一使用 Insert into 插入新数据即可,使用简单,并且能节省一些读取 IO 和计算资源。
3. 数据查询场景业务实践
本章节主要介绍部分典型的查询场景业务实践。
▶ Time travel 查询
基于 TT2,计算引擎可高效支持Time travel查询的典型业务场景,即查询历史版本的数据,可用于回溯业务数据的历史状态,或数据出错时,用来恢复历史状态数据进行数据纠正。
详细语法参考官网,简单示例:
可查询的历史数据时间范围,可通过表属性 acid.data.retain.hours 来配置,配置策略上文已介绍,配置参数详解参考官网。
Time travel 查询处理过程简介
SQL 引擎接收到用户侧输入的 time travel 查询语法后,会先从 Meta 服务中解析出来要查询的历史数据版本,然后过滤出来要读取的 Compacted file 和 Delta file,进行合并 merge 输出,Compacted file 可极大提升读取效率。
结合上图示例进一步描述查询细节:
图中 TT2 Schema 包含一个 pk 列和一个 val 列。左边图展示了数据变化过程,t1 - t5 代表了 5 个事务的时间版本,分别执行了 5 次数据写入操作,生成了 5 个 Delta file,在 t2 和 t4 时刻分别执行了 Compact 操作,生成了两个 Compacted File: c1 和 c2,可见 c1 已经消除了中间状态历史记录(2,a),只保留最新状态的记录(2,b)。
如查询 t1 时刻的历史数据,只需读取 Delta file (d1) 进行输出; 如查询 t2 时刻,只需读取 Compacted file (c1) 输出其三条记录。如查询 t3 时刻,就会包含 Compacted file (c1)以及 Delta file (d3) 进行合并 merge 输出,可依此类推其他时刻的查询。可见,Compacted file 文件虽可用来加速查询,但需要触发较重的 Compact 操作,用户需要结合自己的业务场景主动触发 major compact,或者由后台系统自动触发 compact 操作。
Time travel 查询设置的事务版本,支持时间版本和 ID 版本两种形态,SQL 语法上除了可直接指定一些常量和常用函数外,还额外开发了get_latest_timestamp和get_latest_version两个函数,第二个参数代表它是最近第几次 commit,方便用户获取 MaxCompute 内部的数据版本进行精准查询,提升用户体验。
▶ 增量查询
TT2 表支持增量写入和存储,最重要的一个考虑就是支持增量查询以及增量计算链路,为此,也专门设计开发了新的 SQL 增量查询语法来支持近实时增量处理链路。用户通过增量查询语句可灵活构建增量数仓业务链路,近期正在规划开发支持增量物化视图来进一步简化使用门槛,提升用户体验,降低用户成本。
支持两种增量查询语法:
用户指定时间戳或者版本查询增量数据,详细语法参考官网,简单示例:
引擎自动管理数据版本查询增量数据,不需要用户手动指定查询版本, 非常适合周期性的增量计算链路 (功能灰度发布中,以官网发布为准)。简单示例:
增量查询处理过程简介
SQL 引擎接收到用户侧输入的增量查询语法后,会先从 Meta 服务中解析出来要查询的历史增量数据版本,然后过滤出来要读取的 Delta file 列表,进行合并 merge 输出。
结合上图示例进一步描述查询细节:
图中表 tt2 Schema 包含一个 pk 列和一个 val 列。左边图展示了数据变化过程,t1 - t5 代表了 5 个事务的时间版本,分别执行了 5 次数据写入操作,生成了 5 个 Delta file,在 t2 和 t4 时刻分别执行了 Compact 操作,生成了两个 Compacted File: c1 和 c2。
在具体的查询示例中,例如,begin 是 t1-1,end 是 t1,只需读取 t1 时间段对应的 Delta file (d1)进行输出; 如果 end 是 t2,会读取两个 Delta files (d1, d2);如果 begin 是 t1,end 是 t2-1,即查询的时间范围为(t1, t2),这个时间段是没有任何增量数据插入的,会返回空行。
Compact / Merge 服务生成的数据(c1, c2)不会作为新增数据重复输出。
▶ PK 点查 DataSkipping 优化
上文提到,TT2 表的数据分布和索引基本是按照 PK 列值进行构建的,因此如果对 TT2 表进行点查,并指定了 PK 值进行过滤的话,将会极大减少要读取的数据量和读取耗时,资源消耗可能也会成百上千倍的减少。比如,TT2 表总的数据记录是 1 亿,经过过滤后真正从数据文件中读取的数据记录可能只有一万条。
主要的 DataSkipping 优化包括:
先进行 Bucket 裁剪,只读取包含指定 PK 值的一个 bucket 即可;
在 Bucket 内部进行数据文件裁剪,只读取包含指定 PK 值的文件即可;
在文件内部进行 Block 裁剪,根据 Block 的 PK 值域范围进行过滤,只读取包含指定 PK 值的 block 即可。
遵循常规的 SQL 查询语法,简单示例:
▶ SQL 查询分析 Plan 优化
由于 TT2 表数据按照 PK 值进行分桶分布的,并且桶内部数据查询出来具备 Unique 属性和 Sort 有序性,因此 SQL Optimizer 利用这些属性可以做大量的优化。
比如图中示例的 SQL 语句 (假设 tt2_t1 和 tt2_t2 的桶数量相同),SQL Optimizer 可做的主要优化如下:
Distinct 的 PK 列本身具备的 Unique 属性,因此可以消除去重算子;
Join on key 和 PK 列相同,因此直接使用 Bucket Local Join 即可,消除资源消耗很重的 Shuffle 过程;
由于每个桶读取出来的数据本身有序,因此可以直接使用 MergeJoin 算法,消除前置的 Sort 算子。
这些消除的算子都极为消耗资源,因此这些优化可整体让性能提升 1 倍以上。
遵循常规的 SQL 查询语法,简单示例:
4. 数据库整库实时同步写入 MaxCompute
当前数据库和大数据处理引擎都有各自擅长的数据处理场景,部分复杂的业务场景同时需要 OLTP/OLAP/离线分析引擎对数据进行分析处理,因此数据也需要在各个引擎之间流动。将数据库的单表或者整库的变更记录实时同步到 MaxCompute 进行分析处理是目前比较典型的业务链路。
如上图所示,左边流程是之前 MaxCompute 支持此类场景的典型 ETL 处理链路,按照小时/天级别读取数据库的变更记录写入到 MaxCompute 一张临时的增量表中,然后将临时表和存量的全量表进行 Join Merge 处理,生成新的全量数据。此链路较复杂,并且延时较长,也会消耗一定的计算和存储成本。
右边流程则是使用新架构支持该场景,直接按照分钟级别实时读取数据库的变更记录 upsert 写入到 TT2 表即可。链路极简单,数据可见降低到分钟级,只需要一张 TT2 表即可,计算和存储成本降到最低。
目前 MaxCompute 集成了两种方式支持该链路:
通过 DataWorks 数据集成的整库/单表增全量实时同步任务,在页面进行任务配置即可。
优势
MaxCompute 离线 &近实时数仓一体化新架构会尽量覆盖部分近实时数据湖(HUDI/ICEBERG 等)的通用功能,此外,作为完全自研设计的新架构,在低成本,功能,性能,稳定性,集成等方面也具备很多独特亮点:
用 MaxCompute 较低的成本来支持近实时以及增量链路,具备很高的性价比。
统一的存储、元数据、计算引擎一体化设计,做了非常深度和高效的集成,具备存储成本低,数据文件管理高效,查询效率高,并且 Time travel / 增量查询可复用 MaxCompute 批量查询的大量优化规则等优势。
通用的全套 SQL 语法支持所有功能,非常便于用户使用。
深度定制优化的数据导入工具,高性能支持很多复杂的业务场景。
无缝衔接 MaxCompute 现有的业务场景,可以减少迁移、存储、计算成本。
表数据后台智能自动化治理和优化,保证更好的读写稳定性和性能,自动优化存储效率和成本。
基于 MaxCompute 平台完全托管,用户可以开箱即用,没有额外的接入成本,功能生效只需要创建一张 TT2 表即可。
生产现状和未来规划
整体功能邀测运行大概半年时间,单中国公共云已经超过 100+ project, 700+张 TT2 表存在有效数据存储和读写,近实时链路和 Upsert 能力已经在部分客户的生产链路上得到充分验证。
未来半年规划:
支持 CDC 数据读写
支持增量物化视图
支持数据秒级可见
表数据服务智能治理深度优化以及查询性能优化
评论