写点什么

使用 Flink 读写 Iceberg 表

作者:Joseph295
  • 2025-07-06
    北京
  • 本文字数:3143 字

    阅读完需:约 10 分钟

1. catalog

要在 Apache Iceberg 中同时支持 S3 和 Hadoop(HDFS)存储,并兼顾生产环境的稳定性与扩展性,推荐使用 Hive Catalog 或 REST Catalog。以下从核心需求、方案对比、选型依据及配置实践展开分析:

1.1. 核心挑战

  1. 存储兼容性

  • 需同时支持 HDFS(本地/私有云)和 S3(公有云对象存储)。

  • Hadoop Catalog 虽原生兼容两者,但存在原子性缺陷(S3 的 rename 非原子操作)和扩展性问题(海量表时列表性能差)。

  1. 生产环境要求

  • 需支持 ACID 事务、并发写入冲突解决(乐观锁)、元数据一致性。

  • Hadoop Catalog 不推荐用于生产,因其无法保证 S3 写入原子性,且缺乏多引擎协同能力


1.2.可行方案对比与选型建议


Hadoop Catalog

  • 支持 HDFS/S3

  • 不推荐生产环境

  • 运维复杂度低

  • 关键优势:无外部依赖,部署简单


Hive Catalog

  • 支持 HDFS/S3(需配置)

  • 推荐生产环境

  • 运维复杂度中等

  • 关键优势:生态广泛(Spark/Flink/Trino 兼容),元数据通过 HMS 统一管理


Rest Catalog

  • 支持任意后端存储

  • 推荐生产环境

  • 运维复杂度中高

  • 关键优势:存储与元数据解耦,灵活性最强;避免绑定特定云厂商


推荐方案 1: Hive Catalog(平衡型)

  • 原理:通过 Hive Metastore(HMS)管理 Iceberg 表的元数据路径(metadata_location),物理数据存储与 HMS 解耦,可指向 HDFS 或 S3

  • 配置式例:

CREATE CATALOG hive_catalog PROPERTIES (  "type" = "iceberg",  "catalog-type" = "hive",  "uri" = "thrift://hms-host:9083",  -- Hive Metastore 地址  "warehouse" = "s3a://bucket/warehouse"  -- 或 hdfs://namenode/warehouse);
复制代码
  • 优势:

  • 存储无关性:同一 Catalog 可管理 HDFS 或 S3 上的表,通过 warehouse 路径动态切换。

  • 生态成熟:Spark/Flink/Trino 等引擎无缝集成,无需额外适配

  • 局限:需维护 HMS 服务,但不涉及存储层绑定


推荐方案 2:REST Catalog(高阶灵活型)

  • 原理:通过 REST API 抽象元数据存取,后端存储(HDFS/S3)由服务层动态适配。物理存储路径在 API 请求中指定

  • 架构式例:

Client → REST Catalog Server → 元数据存储(RDBMS/DynamoDB)                数据存储(S3/HDFS)
复制代码
  • 优势:

  • 彻底解藕:更换存储无需修改 Catalog 配置,适合混合云/多云场景。

  • 扩展性强:支持自定义认证、审计逻辑

  • 局限:需自建 REST 服务(如使用 Nessie 或 Tabular 的托管方案)


1.3.选型核心依据

  1. 原子性与一致性

  • Hive Catalog 利用 HMS 的锁机制保证元数据更新原子性;REST Catalog 通过服务层事务控制解决 S3 的 rename 非原子问题。

  • Hadoop Catalog 直接依赖文件系统操作,在 S3 上可能因非原子提交导致元数据损坏。

  1. 运维成本

  • Hive Catalog:需维护 HMS,但技术栈成熟,适合已有 Hadoop 生态的用户。

  • REST Catalog:需部署 REST 服务,但可统一管理多存储类型,长期运维成本更低。

  1. 性能与扩展性

  • Hive Catalog 在海量表场景(>10 万分区)可能受限于 HMS 的元数据查询性能。

  • REST Catalog 可通过水平扩展服务节点应对高并发,更适合超大规模集群


1.4.配置实践要点

场景 1:Hive Catalog 同时读写 HDFS/S3 表

  1. 统一 HMS 配置:在 hive-site.xml 中指定默认存储路径(如 s3a://bucket/warehouse

  2. 动态路径覆盖:建表时显式指定存储位置:

-- 创建 S3 表CREATE TABLE hive_catalog.db.s3_table (...) WITH ('location' = 's3a://bucket/table_path');
-- 创建 HDFS 表CREATE TABLE hive_catalog.db.hdfs_table (...) WITH ('location' = 'hdfs://namenode/table_path');
复制代码
  1. 引擎配置:在 Spark/Flink 中传递 S3/HDFS 认证信息(如 core-site.xml 或 IAM Role)

  2. hive-site.xml

在 Flink 中读写基于 Hive Metastore 的 Iceberg 表时,hive-site.xml 不是绝对必需的,但在某些场景下会显著简化配置并解决特定问题。


需要 hive-site.xml 的场景

  • Kerberos 认证

当 Hive Metastore 启用 Kerberos 认证时,必须提供包含以下配置的 hive-site.xml

<property>  <name>hive.metastore.sasl.enabled</name>  <value>true</value></property><property>  <name>hive.metastore.kerberos.principal</name>  <value>hive/_HOST@REALM.COM</value></property>
复制代码
  • 自定义 Hive 配置

如果 Metastore 使用了非标准配置(如 MySQL 后端、自定义连接池等):

<!-- 示例:MySQL 后端配置 --><property>  <name>javax.jdo.option.ConnectionURL</name>  <value>jdbc:mysql://metastore-db:3306/hive</value></property>
复制代码
  • HA Metastore 配置

使用 Hive Metastore 高可用时:

<property>  <name>hive.metastore.uris</name>  <value>thrift://hms1:9083,thrift://hms2:9083</value></property>
复制代码
  • S3/OSS 存储集成

当 Iceberg 表存储在对象存储时(需传递 AWS 凭证):

<property>  <name>fs.s3a.access.key</name>  <value>AKIAxxx</value></property><property>  <name>fs.s3a.secret.key</name>  <value>secret</value></property>
复制代码


不需要 hive-site.xml 的场景

  1. 基础连接

如果 Metastore 使用默认配置(无认证、单节点):

// 直接在 Flink SQL 中指定 URI 即可CREATE CATALOG iceberg_catalog WITH (  'catalog-type'='hive',  'uri'='thrift://hms:9083', // 直接提供 URI  'warehouse'='hdfs:///iceberg/warehouse');
复制代码
  1. 配置通过代码传递

所有 Hive 参数可通过 Flink Catalog 配置直接覆盖:

CREATE CATALOG iceberg_catalog WITH (  ...  'hive.metastore.uris'='thrift://hms:9083',  'fs.s3a.access.key'='AKIAxxx',  'fs.s3a.secret.key'='secret');
复制代码


最佳实践

场景一:基础开发/测试环境

  • 无需 hive-site.xml,直接在 Flink SQL 中配置 uri 和 warehouse


场景二:生产环境(安全认证/复杂配置)

  • 必须提供 hive-site.xml,通过 hive-conf-dir 指定路径


场景三:云原生环境(K8S)

  • 将 hive-site.xml 挂载为 ConfigMap,路径通过 hive-conf-dir 传递


如何指定 hive-site.xml

在 Catalog 创建时添加 hive-conf-dir 参数:

CREATE CATALOG iceberg_catalog WITH (  'type'='iceberg',  'catalog-type'='hive',  'uri'='thrift://hms:9083',  'warehouse'='s3a://my-bucket/iceberg',  'hive-conf-dir'='/etc/hive/conf' // 指向包含 hive-site.xml 的目录);
复制代码


替代方案:直接传递配置

如果不想依赖文件,可逐项传递关键参数:

CREATE CATALOG iceberg_catalog WITH (  ...  'hive.metastore.uris'='thrift://hms:9083',  'fs.s3a.access.key'='AKIAxxx',  'fs.s3a.secret.key'='secret',  'hive.metastore.sasl.enabled'='true');
复制代码

关键结论hive-site.xml 主要用于传递复杂安全配置和存储集成参数。简单环境可直接通过 Flink SQL 配置,生产环境建议使用该文件统一管理配置。


场景 2: REST Catalog 集成 S3 与 HDFS

  1. 部署 REST 服务:使用开源实现(如 Nessie 或自研。

  2. 配置存储适配器:服务端根据请求路径动态选择存储类型:

# 伪代码:REST Handler 解析请求def create_table(request):  path = request.json["location"]  if path.startswith("s3://"):      io = S3FileIO(aws_credentials)  elif path.startswith("hdfs://"):      io = HadoopFileIO(config)  catalog.create_table(..., io=io)
复制代码
  1. 客户端统一访问:所有引擎通过 REST API 地址访问 Catalog,屏蔽底层存储差异


总结建议

  • 优先 Hive Catalog:若已有 HMS 且规模中等,配置简单且兼容性强,适合多数企业

  • 选择 REST Catalog:若需多云架构、超大规模表管理或自定义元数据逻辑,牺牲初期复杂度换取长期灵活性

  • 避免 Hadoop Catalog:仅适合测试环境,生产环境可能面临数据一致性和性能风险

附加优化:对 S3 存储启用 S3FileIO(优于 Hadoop S3A),通过分片上传和校验和提升性能。混合存储场景下,建议用 MinIO 统一抽象 HDFS 和 S3 API,简化访问逻辑


用户头像

Joseph295

关注

三脚猫的技术 2018-03-14 加入

coder

评论

发布
暂无评论
使用 Flink 读写 Iceberg 表_Joseph295_InfoQ写作社区