Iceberg 在基于 Flink 的流式数据入库场景中的应用
简介: 本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点。
本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点。
应用场景
流式数据入库,是大数据和数据湖的典型应用场景。上游的流式数据,如日志,或增量修改,通过数据总线,经过必要的处理后,汇聚并存储于数据湖,供下游的应用(如报表或者商业智能分析)使用。
上述的应用场景通常有如下的痛点,需要整个流程不断的优化:
支持流式数据写入,并保证端到端的不重不丢(即 exactly-once);
尽量减少中间环节,能支持更实时(甚至是 T+0)的读取或导出,给下游提供更实时更准确的基础数据;
支持 ACID,避免脏读等错误发生;
支持修改已落地的数据,虽然大数据和数据湖长于处理静态的或者缓慢变化的数据,即读多写少的场景,但方便的修改功能可以提升用户体验,避免用户因为极少的修改,手动更换整个数据文件,甚至是重新导出;
支持修改表结构,如增加或者变更列;而且变更不要引起数据的重新组织。
引入 Iceberg 作为 Flink sink
为了解决上述痛点,我们引入了 Iceberg 作为数据落地的格式。Iceberg 支持 ACID 事务、修改和删除、独立于计算引擎、支持表结构和分区方式动态变更等特性,很好的满足我们的需求。
同时,为了支持流式数据的写入,我们引入 Flink 作为流式处理框架,并将 Iceberg 作为 Flink sink。
下文主要介绍 Flink Iceberg sink 的实现框架和要点。但在这之前,需要先介绍一些实现中用到的 Flink 基本概念。
Flink 基本概念
从 Flink 的角度如何理解"流"和"批"
Flink 使用 DataFrame API 来统一的处理流和批数据。
Stream, Transformation 和 Operator
一个 Flink 程序由 stream 和 transformation 组成:
Stream: Transformation 之间的中间结果数据;
Transformation:对(一个或多个)输入 stream 进行操作,输出(一个或多个)结果 stream。
当 Flink 程序执行时,其被映射成 Streaming Dataflow,由如下的部分组成:
Source (operator):接收外部输入给 Flink;
Transformation (operator):中间对 stream 做的任何操作;
Sink (operator):Flink 输出给外部。
下图为 Flink 官网的示例,展示了一个以 Kafka 作为输入 Source,经过中间两个 transformation,最终通过 sink 输出到 Flink 之外的过程。
State, Checkpoint and Snapshot
Flink 依靠 checkpoint 和基于 snapshot 的恢复机制,保证程序 state 的一致性,实现容错。
Checkpoint 是对分布式的数据流,以及所有 operator 的 state,打 snapshot 的过程。
■ State
一个 operator 的 state,即它包含的所有用于恢复当前状态的信息,可分为两类:
系统 state:如 operator 中对数据的缓存。
用户自定义 state:和用户逻辑相关,可以利用 Flink 提供的 managed state,如 ValueState、ListState,来存储。
State 的存储位置,可以分为:
Local:内存,或者本地磁盘
State backend:远端的持久化存储,如 HDFS。
如下图所示:
■ Checkpoint
Flink 做 checkpoint 的过程如下:
Checkpoint coordinator 首先发送 barrier 给 source。
Source 做 snapshot,完成后向 coordinator 确认。
Source 向下游发送 barrier。
下游 operator 收到所有上游的 barrier 后,做 snapshot,完成后向 coordinator 确认。
继续往下游发送 barrier,直到 sink。
Sink 通知 coordinator 自己完成 checkpoint。
Coordinator 确认本周期 snapshot 做完。
如下图所示:
■ Barrier
Barrier 是 Flink 做分布式 snapshot 的重要概念。它作为一个系统标记,被插入到数据流中,随真实数据一起,按照数据流的方向,从上游向下游传递。
由于每个 barrier 唯一对应 checkpoint id,所以数据流中的 record 实际被 barrier 分组,如下图所示,barrier n 和 barrier n-1 之间的 record,属于 checkpoint n。
Barrier 的作用是在分布式的数据流中,将 operator 的多个输入流按照 checkpoint对齐(align),如下图所示:
Flink Iceberg sink
了解了上述 Flink 的基本概念,这些概念又是如何被应用和映射到 Flink Iceberg sink 当中的呢?
总体框架
如图,Flink Iceberg sink 有两个主要模块和两个辅助模块组成:
https://qr.dingtalk.com/action/joingroup?code=v1,k1,sX+rQjO7HUrD0gh+2OoGZWUBOG71dGhOJvJ4xGoEO0g=<br>http://weixin.qq.com/r/_y7l-d-EbIITrZue93vp (二维码自动识别)
实现要点
■ Writer
在当前的实现中,Java 的 Map 作为每条记录,输入给 writer。内部逻辑先将其转化为作为中间格式的 Avro IndexedRecord,而后通过 Iceberg 里的 Parquet 相关 API,累积的写入 DataFile。
使用 Avro 作为中间格式是一个临时方案,为简化适配,并最大限度的利用现有逻辑。但长期来看,使用中间格式会影响处理效率,社区也在试图通过 ISSUE-870 来去掉 Avro,进而使用 Iceberg 内建的数据类型作为输入,同时也需要加入一个到 Flink 内建数据类型的转换器。
在做 checkpoint 的过程中,发送 writer 自己的 barrier 到下游的 committer 之前,关闭单个 Parquet 文件,构建 DataFile,并发送 DataFile 的信息给下游。
■ Committer
全局唯一的 Committer 在收到上游所有 writer 的 barrier 以后,将收到的 DataFile 的信息填入 manifest file,并使用 ListState 把 manifest file 作为用户自定义的 state,保存于 snapshot 中。
当 checkpoint 完成以后,通过 merge append 将 manifest file 提交给 Iceberg。Iceberg 内部通过后续的一系列操作完成 commit。最终让新加入的数据对其他的读任务可见。
试用 Flink Iceberg sink
社区上 https://github.com/apache/incubator-iceberg/pull/856 提供了可以试用的原型代码。下载该 patch 放入 master 分支,编译并构建即可。如下的程序展示了如何将该 sink 嵌入到 Flink 数据流中:
// Configurate catalog org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); hadoopConf.set( org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname, META_STORE_URIS); hadoopConf.set( org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname, META_STORE_WAREHOUSE); Catalog icebergCatalog = new HiveCatalog(hadoopConf); // Create Iceberg table Schema schema = new Schema( ... ); PartitionSpec partitionSpec = builderFor(schema)... TableIdentifier tableIdentifier = TableIdentifier.of(DATABASE_NAME, TABLE_NAME); // If needed, check the existence of table by loadTable() and drop it // before creating it icebergCatalog.createTable(tableIdentifier, schema, partitionSpec); // Obtain an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Enable checkpointingenv.enableCheckpointing(...); // Add Source DataStream<Map<String, Object>> dataStream = env.addSource(source, typeInformation); // Configure Ieberg sink Configuration conf = new Configuration(); conf.setString( org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname, META_STORE_URIS); conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME); conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME); // Append Iceberg sink to data stream IcebergSinkAppender<Map<String, Object>> appender = new IcebergSinkAppender<Map<String, Object>>(conf, "test") .withSerializer(MapAvroSerializer.getInstance()) .withWriterParallelism(1); appender.append(dataStream); // Trigger the execution env.execute("Sink Test");
后续规划
Flink Iceberg sink 有很多需要完善的地方,例如:上文中提到的去掉 Avro 作为中间格式;以及在各种失败的情况下是否仍能保证端到端的 exactly-once;按固定时长做 checkpoint,在高低峰时生成不同大小的 DataFile,是否对后续读不友好等。这些问题都在我们的后续规划中,也会全数贡献给社区。
参考资料:
[1] Iceberg 官网:
[2] Flink 1.10文 档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/
[3] Neflix 提供的 Flink Iceberg connector 原型:
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg
[4] Flink Iceberg sink 设计文档:
https://docs.google.com/document/d/19M-sP6FlTVm7BV7MM4Om1n_MVo1xCy7GyDl_9ZAjVNQ/edit?usp=sharing
[5] Flink 容错机制(checkpoint) :
https://www.cnblogs.com/starzy/p/11439988.html
# 社区活动推荐 #普惠全球开发者,这一次,格外与众不同!首个 Apache 顶级项目在线会议 Flink Forward 全球直播中文精华版来啦,聚焦 Alibaba、Google、AWS、Uber、Netflix、新浪微博等海内外一线厂商,经典 Flink 应用场景,最新功能、未来规划一览无余。点击下方链接可了解更多大会详情:https://developer.aliyun.com/live/2594?spm=a2c6h.14242504.J_6074706160.2.3fca361f4cYyQx
存储 消息中间件 缓存 大数据 Java BI API Apache 流计算 UED
版权声明:本文中所有内容均属于阿里云开发者社区所有,任何媒体、网站或个人未经阿里云开发者社区协议授权不得转载、链接、转贴或以其他方式复制发布/发表。申请授权请邮件developerteam@list.alibaba-inc.com,已获得阿里云开发者社区协议授权的媒体、网站,在转载使用时必须注明"稿件来源:阿里云开发者社区,原文作者姓名",违者本社区将依法追究责任。 如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:developer2020@service.aliyun.com 进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
评论