写点什么

Airtable 如何用 StarRocks 构建数据验证系统

作者:StarRocks
  • 2025-08-02
    河北
  • 本文字数:6829 字

    阅读完需:约 22 分钟

Airtable 如何用 StarRocks 构建数据验证系统

摘要:归档冷数据至 S3,借助 StarRocks 实现一致性验证与存储降本

作者:Riley ,Airtable 数据基础设施团队

导读:

开源无国界,在本期“StarRocks 全球用户精选案例”专栏中,我们将介绍总部位于旧金山的云端协作服务公司 Airtable。作为一家致力于让用户像操作表格一样轻松构建数据应用的企业,Airtable 在 2025 年完成了向 AI 原生应用平台 的转型,平台同时支持企业级安全与治理能力,并与外部系统集成,实现复杂业务流程的自动化。

随着业务增长,Airtable 需要长期保存的历史记录数据(如撤销、版本历史)迅速膨胀,占据了近一半存储空间。为降低成本,Airtable 推出了 Live Shard Data Archive(LSDA)项目,将访问频率极低的数据迁移至 S3,并利用 StarRocks 验证归档数据与源数据的一致性。

本文聚焦数据验证阶段,重点介绍:

  • 为何选择 StarRocks 解决数据验证问题

  • 数据导入与导出优化方案

  • 归档数据导入实践


概览

在 Airtable,应用级(或“基于 Base 的”)数据存储在 Amazon RDS 的多个分片 MySQL 实例上,每个 Base 对应一个独立分片。随着 Base 不断更新变化,我们会记录与其历史相关的追加写入数据(append-only data),用于支持撤销(Undo)、历史版本(Revision History)等功能,让企业用户可以查看长达三年前的记录变更。

随着业务规模扩张,这类追加写入表的体量迅速增长,如今已接近占到整个 Airtable 存储总量的一半。而这些数据的访问频率却极低,却与日常使用的 Base 数据存放在同一存储层,带来了不必要的存储成本压力。

(Airtable base 示例)

项目背景

Live Shard Data Archive(LSDA)项目旨在将访问频率极低的追加写入数据迁移到成本更低的 Amazon S3,以缩减 RDS 实例的磁盘容量。迁移完成后,我们可以安全删除旧数据并重建 RDS 实例,从而释放空间。

整个迁移过程分为三步:

  1. 数据归档与转换:从 RDS 提取并转换数据,存入 S3,并确保应用代码能高效、稳定地访问归档数据;

  2. 数据验证:比对归档数据与 RDS 源数据,确保迁移过程中未引入不一致问题;

  3. 更新应用逻辑,使其可以直接从 S3 读取数据。

完成这些步骤后,我们能够安全截断已迁移至 S3 的数据,大幅降低 RDS 存储占用和成本。

本文将重点介绍第二步的首个阶段——数据验证

归档与转换

将 RDS 数据归档并转换为存储在 S3 上的最终形态,需要分三步完成:

  1. 利用 RDS 自带功能,将数据库快照直接导出到 S3,文件格式为 Parquet。

  2. 为优化归档数据的查询延迟,需要基于客户维度重新分区。这个环节挑战很大,因为需要处理的数据规模超过 1PB,文件数量超过 1000 万个。我们采用 Apache Flink 进行增量式读取和处理,将数据重新分区到每个客户的独立分区中。

  3. 通过高并发的 Kubernetes 重写任务(rewriter job)对每个客户的归档数据进行排序,并为重写后的 Parquet 文件添加索引和 Bloom Filter,从而加速常见查询模式。

验证概览与方案考虑

为了验证数据一致性,我们需要对归档数据与源数据逐行比对,确保每一行记录完全匹配。最直接的做法是:从归档中读取一行,在 RDS 中查找对应记录并进行比对。

但这一思路在实践中并不可行——数据量接近 1PB、记录数约 2 万亿条,而 RDS 还要承载线上业务流量,如此规模的额外查询会给生产环境带来巨大压力。

为避免影响线上,我们改为使用原始且未改动的 RDS 导出文件作为比对源数据。这些文件同样存储在 S3 中,消除了直接查询生产实例的风险。然而,逐行比对仍然过慢,难以满足验证效率要求。

最终,我们确定了一种可行方案:将两份数据加载到关系型数据库中,通过表 Join 快速找出差异,从而高效完成验证。

借助 StarRocks 进行数据验证

在本次数据验证项目中,数据基础设施团队协同存储团队,共同评估并选定了最适合的技术方案,以高效完成整体验证工作。

为何选择 StarRocks 来解决数据验证问题?

数据验证的核心挑战是要对两份规模巨大的数据集执行大量 Join 操作,而这两个数据集各自接近万亿行。如何在可控的计算成本下高效完成如此高强度的计算,是项目的关键难点。

经过调研,我们最终选择 StarRocks,因为它在 Join 性能方面表现突出,能够在同等负载下避免其他查询引擎常见的性能瓶颈。

在实际落地中,我们将 S3 中的原始 Parquet 文件加载到 StarRocks 本地表,并利用其 Colocation Join 机制来高效完成数据验证所需的 Join 操作。

StarRocks 架构

StarRocks 的整体架构支持访问多种类型的数据源:

  • S3 数据湖:包括 Hudi 、Delta Lake、Iceberg 和 Paimon ;

  • S3 原生存储格式:支持直接在 S3 上以 StarRocks 原生格式创建并持久化表;

  • S3 原始文件:可直接对存储在 S3 中的 Parquet、JSON 或 CSV 文件进行查询。

在本项目中,我们将 S3 中的原始 Parquet 文件加载至 StarRocks 本地表,用于执行数据验证(前文已介绍)。

数据导入优化:提升 StarRocks 的数据加载性能

我们需要将接近 1 万亿行的数据从原始 Parquet 文件加载到 StarRocks 本地表中。这些数据由数亿个小文件组成,如果缺乏合理的优化和并行处理策略,整个导入过程可能需要耗时数月。

为提升数据导入吞吐量,我们采取了以下优化措施:

  1. 降低副本数量(3 → 1)

由于这是一次性的数据验证任务,并不涉及生产环境的高可用性要求,因此我们将副本数量从 3 个减少到 1 个,大幅降低了需要导入的数据总量。

  1. 提升内部导入并行度

导入完成后才会进行基于 Join 的验证,因此导入性能不会影响线上服务。我们通过调整以下参数提升并行度:

  • pipeline_dop

  • pipeline_sink_dop

  1. 增加每个分区的桶(Bucket)数量

为避免单个桶过大,我们将每个桶的数据量限制在 5GB 以内,并通过增加桶数量显著提升导入吞吐量。虽然这可能导致数据压缩 Compaction 任务延后,但在本场景中完全可接受。

通过以上优化,我们成功实现了大规模数据的高效导入

数据导入与导出

在完成 StarRocks 部署后,我们需要加载两部分数据:

  1. 来自 RDS 导出的源数据;

  2. 经过转换、计划作为查询来源的归档数据。

考虑到时间与成本,直接在 StarRocks 中存储全部约 1PB 的数据并不可行,因此我们选择仅对表中的非主键列进行哈希处理。

在本次加载过程中,我们共导入了两张表,以下示例主要聚焦 _actionLog 表。

初始方案:哈希非主键列的简化表结构

我们为每张表设计了类似的表结构,初始设计如下:

CREATE TABLE `_rdsExportActionLog` (`id` bigint(20) NOT NULL COMMENT "",`application` varchar(65533) NOT NULL COMMENT "",`hash_value` varchar(65533) NULL COMMENT "") ENGINE=OLAPPRIMARY KEY(`id`, `application`)DISTRIBUTED BY HASH(`id`, `application`)ORDER BY(`application`)PROPERTIES ("replication_num" = "1","colocate_with" = "action_log_group","in_memory" = "false","enable_persistent_index" = "true","replicated_storage" = "true","compression" = "ZSTD");
复制代码

随后,我们使用类似如下的 INSERT 语句进行数据加载:

INSERT INTO \`exportActionLog\`WITH LABEL ${label}(id, application, hash_value)SELECT id, application, XX_HASH3_64(CONCAT_WS(',',<columns>)) as hash_valueFROM FILES("path" = "s3://${bucket}/${folder}*.parquet","format" = "parquet",);
复制代码

数据分布与加载瓶颈

这种加载方式耗时极长——仅导入两个分片就需要近一天时间。随着数据量增加、表规模扩大,导入速率进一步下降。

更意外的是,提高本地并行度(即同时加载多个分片)几乎没有带来性能提升;而当并行度超过 5 时,系统还会频繁出现如下情况:

JobId: 14094Label: insert_16604b11–7f2d-11ef-888c-46341e0f370eState: LOADINGProgress: ETL:100%; LOAD:99%Type: INSERT
复制代码

我们发现,加载进度常常迅速到达 99%,却长时间停滞在此状态,无法快速完成。

根据 StarRocks 官方文档:

“当所有数据加载完成后,LOAD 参数会返回 99%,随后数据才开始生效;数据完全生效后,LOAD 才会返回 100%。”

显然,我们的初始方案在数据生效这一环节遇到了性能瓶颈。


优化一:增加 Bucket 数量

最初的数据分布仅基于 id 和 application,且未指定 Bucket 数量(具体机制可参考 StarRocks 官方文档)。

我们的假设是:随着数据量增大,单个 Bucket 过于庞大,导致处理效率下降。

在咨询 StarRocks 团队后,建议将较小表的 Bucket 数量调整至 7200 个。

因此,表结构被修改为如下形式:

CREATE TABLE `exportActionLog` (`id` bigint(20) NOT NULL COMMENT "",`application` varchar(65533) NOT NULL COMMENT "",`hash_value` varchar(65533) NULL COMMENT "") ENGINE=OLAPPRIMARY KEY(`id`, `application`)DISTRIBUTED BY HASH(`id`, `application`) BUCKETS 7200ORDER BY (`application`)PROPERTIES ("replication_num" = "1","colocate_with" = "action_log_group","in_memory" = "false","enable_persistent_index" = "true","replicated_storage" = "true","compression" = "ZSTD");
复制代码

然而,这种方式虽然显著提升了加载速度,但也引发了内存问题:

message: 'primary key memory usage exceeds the limit. tablet_id: 10367, consumption: 126428346066, limit: 125241246351. Memory stats of top five tablets: 53331(73M)53763(73M)53715(73M)53667(73M)53619(73M): 
复制代码

数据分布与加载瓶颈

这种加载方式耗时极长——仅导入两个分片就需要近一天时间。随着数据量增加、表规模扩大,导入速率进一步下降。

更意外的是,提高本地并行度(即同时加载多个分片)几乎没有带来性能提升;而当并行度超过 5 时,系统还会频繁出现如下情况:

JobId: 14094Label: insert_16604b11–7f2d-11ef-888c-46341e0f370eState: LOADINGProgress: ETL:100%; LOAD:99%Type: INSERT
复制代码

我们发现,加载进度常常迅速到达 99%,却长时间停滞在此状态,无法快速完成。

根据 StarRocks 官方文档:

“当所有数据加载完成后,LOAD 参数会返回 99%,随后数据才开始生效;数据完全生效后,LOAD 才会返回 100%。”

显然,我们的初始方案在数据生效这一环节遇到了性能瓶颈。

优化一:增加 Bucket 数量

最初的数据分布仅基于 id 和 application,且未指定 Bucket 数量(具体机制可参考 StarRocks 官方文档)。

我们的假设是:随着数据量增大,单个 Bucket 过于庞大,导致处理效率下降。

在咨询 StarRocks 团队后,建议将较小表的 Bucket 数量调整至 7200 个。

因此,表结构被修改为如下形式:

CREATE TABLE `exportActionLog` (`id` bigint(20) NOT NULL COMMENT "",`application` varchar(65533) NOT NULL COMMENT "",`hash_value` varchar(65533) NULL COMMENT "") ENGINE=OLAPPRIMARY KEY(`id`, `application`)DISTRIBUTED BY HASH(`id`, `application`) BUCKETS 7200ORDER BY (`application`)PROPERTIES ("replication_num" = "1","colocate_with" = "action_log_group","in_memory" = "false","enable_persistent_index" = "true","replicated_storage" = "true","compression" = "ZSTD");
复制代码

然而,这种方式虽然显著提升了加载速度,但也引发了内存问题:

message: 'primary key memory usage exceeds the limit. tablet_id: 10367, consumption: 126428346066, limit: 125241246351. Memory stats of top five tablets: 53331(73M)53763(73M)53715(73M)53667(73M)53619(73M): 
复制代码

优化二:按 Shard ID 对表进行分区

将表按 Shard ID 分区并以此方式加载数据,可以为每个分区单独指定 Bucket 数量,使数据存储更高效。

通过简单计算,我们得出了以下结论:

actionLog => 10TB (Hashed) => 10 * 1024 / 148 shards = 69GB per shard => 34 buckets to host it => add some buffer, 64 buckets per partition
In total: 64 buckets per partition * 148 shards = 9472 buckets
复制代码

此外,这种分布策略还支持按 Shard 逐一验证数据,避免因一次性加载过多数据而造成内存压力。

最终,我们基于该方案创建了新的表结构,并调整了加载语句,使其可直接从 S3 文件路径中提取 Shard ID。

CREATE TABLE `exportActionLog` (`id` bigint(20) NOT NULL COMMENT "",`application` varchar(65533) NOT NULL COMMENT "",`shard` int(11) NOT NULL COMMENT "",`hash_value` varchar(65533) NULL COMMENT "") ENGINE=OLAPPRIMARY KEY(`id`, `application`, `shard`)PARTITION BY (`shard`)DISTRIBUTED BY HASH(`id`, `application`) BUCKETS 64ORDER BY(`application`)PROPERTIES ("replication_num" = "1","colocate_with" = "action_log_group_partition_by_shard","in_memory" = "false","enable_persistent_index" = "true","replicated_storage" = "true","compression" = "ZSTD");
复制代码


这一改进显著提升了数据导入速度:LSDA 数据在不到 10 小时内便从 S3 成功加载至 StarRocks,平均吞吐量约每分钟 20 亿行。

归档数据导入

在将完整的 RDS 导出(作为验证的真实来源数据)成功导入 StarRocks 之后,我们还需要导入归档数据,并在这两份数据之间执行验证。由于归档数据的存储格式与导出数据不一致,这带来了额外挑战。

借鉴导出数据的导入经验,我们采用相同的表结构来存储归档数据——包括相同的 Bucket 数量和基于 Shard ID 的分区策略。

这种方式使导出数据和归档数据能够位于同一 Colocation Group,从而充分利用 StarRocks 的 Colocation Join 功能。

CREATE TABLE `_rdsArchiveActionLog` (`autoincr_id` bigint(20) NOT NULL COMMENT "",`applicationId` varchar(65533) NOT NULL COMMENT "",`shardId` int(11) NOT NULL COMMENT "",`hash_value` varchar(65533) NULL COMMENT "") ENGINE=OLAPPRIMARY KEY(`autoincr_id`, `applicationId`, `shardId`)PARTITION BY (`shardId`)DISTRIBUTED BY HASH(`autoincr_id`, `applicationId`) BUCKETS 64ORDER BY(`applicationId`)PROPERTIES ("replication_num" = "1","colocate_with" = "action_log_group_partition_by_shard","in_memory" = "false","enable_persistent_index" = "true","replicated_storage" = "true","compression" = "ZSTD");
复制代码

RDS 导出与归档目录结构的差异

不过,RDS 导出数据与归档数据在 S3 中的存储方式差异很大。

RDS 导出数据采用大目录结构,每个目录对应 StarRocks 中的一个 Shard 和 Partition。这种结构使得导入非常简单——可以用通配符批量导入目录下的所有 Parquet 文件,并从目录路径直接提取 Shard ID 用于分区字段。

而归档数据则完全不同。为了方便应用直接从 S3 读取,数据是按 Application 维度存储的,导致:

  • 数据分散在超过 600 万个小目录中,每个目录对应一个应用;

  • 应用与分片并不一一对应,有些应用的数据甚至分布在多个分片;

  • S3 中也没有保存源分片信息,无法像导出数据那样直接从路径提取 Shard ID。

为解决这一问题,我们使用 DynamoDB 中的文件元数据,并创建了以 Shard ID 为排序键的全局二级索引(Global Secondary Index),然后按 Shard ID 查询对应文件并执行导入。

通过 Union 在 StarRocks 中批量插入

StarRocks 的 INSERT 语句每次只能指定单一路径,这意味着每个文件都需要单独执行一条 INSERT。

相比之下,RDS 导出数据仅有约 160 个目录及对应的 INSERT 语句,而归档数据却需要执行超过 600 万条

为此,我们尝试对流程进行高并行化处理:

  • 同时加载多个 Shard ;

  • 每个 Shard 内同时运行多个进程。

由于程序基于 Node.js 和 TypeScript,并非真正多线程,但可以通过多进程缓解 I/O 阻塞。

我们尝试用 10 个线程并发加载单个分片,结果遇到了新的问题:

message: 'Failed to load data into tablet 14775287, because of too many versions, current/limit: 1006/1000. You can reduce the loading job concurrency, or increase loading data batch size. If you are loading data with Routine Load, you can increase FE configs routine_load_task_consume_second and max_routine_load_batch_size
复制代码

由于多个进程同时执行插入,StarRocks 产生了过多的表版本,而其基于压缩频率的合并处理速度无法跟上。同时,StarRocks 的版本上限为 1000,无法通过简单调参解决。

因此,我们尝试减少插入语句数量,将更多工作交由 StarRocks 完成。当时 CPU 和内存使用率都很低,但加载过程依然缓慢甚至失败。

由于每个 SELECT 语句只能指定一个路径,我们采用折中方案:通过循环生成 SQL,将多个文件路径拼接到同一条 INSERT 语句中,例如:

INSERT INTO <table>SELECT * FROM FILE_1UNION ALLSELECT * FROM FILE_2……
复制代码

借助这一策略,我们得以同时处理 100 个应用的数据,显著加快了整体加载速度。最终,仅用约 3 天时间,就完成了来自 600 万个应用、约 1 万亿行数据的归档侧加载。

致谢

感谢 Daniel Kozlowski、Kun Zhou、Matthew Jin 和 Xiaobing Xia 在该项目中的重要贡献。


本文翻译自 The Airtable Engineering Blog:https://medium.com/airtable-eng/live-shard-data-archive-export-and-ingestion-to-starrocks-for-validation-6af555e8b3fe

用户头像

StarRocks

关注

新一代极速全场景MPP数据库 2020-08-08 加入

StarRocks一直致力于打造世界顶级的新一代极速全场景MPP数据库,帮助企业建立“极速统一”的数据分析新范式,助力企业数字化经营。当前已帮助腾讯、携程、顺丰、Airbnb等超过110家大型用户构建全新的数据分析能力。

评论

发布
暂无评论
Airtable 如何用 StarRocks 构建数据验证系统_数据分析_StarRocks_InfoQ写作社区