写点什么

数据湖(十七):Flink 与 Iceberg 整合 DataStream API 操作

作者:Lansonli
  • 2023-01-08
    广东
  • 本文字数:8081 字

    阅读完需:约 27 分钟

数据湖(十七):Flink与Iceberg整合DataStream API操作

Flink 与 Iceberg 整合 DataStream API 操作

目前 Flink 支持使用 DataStream API 和 SQL API 方式实时读取和写入 Iceberg 表,建议大家使用 SQL API 方式实时读取和写入 Iceberg 表。


Iceberg 支持的 Flink 版本为 1.11.x 版本以上,目前经过测试 Iceberg 版本与 Flink 的版本对应关系如下:


  • Flink1.11.x 版本与 Iceberg0.11.1 版本匹配。

  • Flink1.12.x~Flink1.1.x 版本与 Iceberg0.12.1 版本匹配,SQL API 有一些 bug。

  • Flink1.14.x 版本与 Iceberg0.12.1 版本能整合但是有一些小 bug,例如实时读取 Iceberg 中的数据有 bug。


以下 Flink 与 Iceberg 整合使用的 Flink 版本为 1.13.5,Iceberg 版本为 0.12.1 版本。后期使用 SQL API 操作时使用的 Flink 版本为 1.11.6,Iceberg 版本为 0.11.1 版本。

一、DataStream API 实时写入 Iceberg 表

DataStream Api 方式操作 Iceberg 方式目前仅支持 Java Api。使用 DataStream API 实时写入 Iceberg 表具体操作如下:

1、首先在 Maven 中导入以下依赖

<properties>  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  <maven.compiler.source>1.8</maven.compiler.source>  <maven.compiler.target>1.8</maven.compiler.target>  <!-- flink 1.12.x -1.13.x  版本与Iceberg 0.12.1 版本兼容 ,不能与Flink 1.14 兼容-->  <flink.version>1.13.5</flink.version>  <!--<flink.version>1.12.1</flink.version>-->  <!--<flink.version>1.14.2</flink.version>-->  <!-- flink 1.11.x 与Iceberg 0.11.1 合适-->  <!--<flink.version>1.11.6</flink.version>-->  <hadoop.version>3.2.2</hadoop.version></properties>
<dependencies> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-iceberg</artifactId> <version>1.13-vvr-4.0.7</version> </dependency> <!-- Flink 操作Iceberg 需要的Iceberg依赖 --> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink-runtime</artifactId> <version>0.12.1</version> <!--<version>0.11.1</version>--> </dependency>
<!-- java 开发Flink 所需依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency>
<!-- Flink Kafka连接器的依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency>
<!-- 读取hdfs文件需要jar包--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency>
<!-- Flink SQL & Table--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime-blink_2.11</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency>
<!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.5</version> </dependency></dependencies>
复制代码

2、编写代码使用 DataStream API 将 Kafka 数据写入到 Iceberg 表

import com.google.common.collect.ImmutableMap;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.data.GenericRowData;import org.apache.flink.table.data.RowData;import org.apache.hadoop.conf.Configuration;import org.apache.iceberg.*;import org.apache.iceberg.catalog.Catalog;import org.apache.iceberg.catalog.TableIdentifier;import org.apache.iceberg.flink.TableLoader;import org.apache.flink.table.data.StringData;import org.apache.iceberg.flink.sink.FlinkSink;import org.apache.iceberg.hadoop.HadoopCatalog;import org.apache.iceberg.types.Types;import java.util.Map;
/** * 使用DataStream Api 向Iceberg 表写入数据 */public class StreamAPIWriteIceberg { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //1.必须设置checkpoint ,Flink向Iceberg中写入数据时当checkpoint发生后,才会commit数据。 env.enableCheckpointing(5000);
//2.读取Kafka 中的topic 数据 KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("node1:9092,node2:9092,node3:9092") .setTopics("flink-iceberg-topic") .setGroupId("my-group-id") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
//3.对数据进行处理,包装成RowData 对象,方便保存到Iceberg表中。 SingleOutputStreamOperator<RowData> dataStream = kafkaSource.map(new MapFunction<String, RowData>() { @Override public RowData map(String s) throws Exception { System.out.println("s = "+s); String[] split = s.split(","); GenericRowData row = new GenericRowData(4); row.setField(0, Integer.valueOf(split[0])); row.setField(1, StringData.fromString(split[1])); row.setField(2, Integer.valueOf(split[2])); row.setField(3, StringData.fromString(split[3])); return row; } });
//4.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表 Configuration hadoopConf = new Configuration(); Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");
//配置iceberg 库名和表名 TableIdentifier name = TableIdentifier.of("icebergdb", "flink_iceberg_tbl");
//创建Icebeng表Schema Schema schema = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "nane", Types.StringType.get()), Types.NestedField.required(3, "age", Types.IntegerType.get()), Types.NestedField.required(4, "loc", Types.StringType.get()));
//如果有分区指定对应分区,这里“loc”列为分区列,可以指定unpartitioned 方法不设置表分区// PartitionSpec spec = PartitionSpec.unpartitioned(); PartitionSpec spec = PartitionSpec.builderFor(schema).identity("loc").build();
//指定Iceberg表数据格式化为Parquet存储 Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()); Table table = null;
// 通过catalog判断表是否存在,不存在就创建,存在就加载 if (!catalog.tableExists(name)) { table = catalog.createTable(name, schema, spec, props); }else { table = catalog.loadTable(name); }
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);
//5.通过DataStream Api 向Iceberg中写入数据 FlinkSink.forRowData(dataStream) //这个 .table 也可以不写,指定tableLoader 对应的路径就可以。 .table(table) .tableLoader(tableLoader) //默认为false,追加数据。如果设置为true 就是覆盖数据 .overwrite(false) .build();
env.execute("DataStream Api Write Data To Iceberg"); }}
复制代码


以上代码有如下几个注意点:


  • 需要设置 Checkpoint,Flink 向 Iceberg 中写入 Commit 数据时,只有 Checkpoint 成功之后才会 Commit 数据,否则后期在 Hive 中查询不到数据。

  • 读取 Kafka 数据后需要包装成 RowData 或者 Row 对象,才能向 Iceberg 表中写出数据。写出数据时默认是追加数据,如果指定 overwrite 就是全部覆盖数据。

  • 在向 Iceberg 表中写数据之前需要创建对应的 Catalog、表 Schema,否则写出时只指定对应的路径会报错找不到对应的 Iceberg 表。

  • 不建议使用 DataStream API 向 Iceberg 中写数据,建议使用 SQL API。

3、在 Kafka 中创建代码中指定的“flink-iceberg-topic”并启动代码生产数据

# 在Kafka 中创建 flink-iceberg-topic topic[root@node1 bin]# ./kafka-topics.sh  --zookeeper node3:2181,node4:2181,node5:2181  --create  --topic flink-iceberg-topic  --partitions 3 --replication-factor 3
复制代码


创建好以上 topic 之后,启动代码,然后向 topic 中生产以下数据:


[root@node1 bin]#./kafka-console-producer.sh  --topic flink-iceberg-topic --broker-list node1:9092,node2:9092,node3:90921,zs,18,beijing2,ls,19,shanghai3,ww,20,beijing4,ml,21,shanghai
复制代码


可以看到在 HDFS 对应的路径中保存了对应的数据:


4、通过 Hive 查看保存到 Iceberg 中的数据

启动 Hive、Hive Metastore 在 Hive 中创建映射 Iceberg 的外表:


CREATE TABLE flink_iceberg_tbl  (  id int,   name string,  age int,  loc string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl' TBLPROPERTIES ('iceberg.catalog'='location_based_table');
复制代码


注意:虽然 loc 是分区列,创建时忽略分区列就可以,此外映射表的路径要保持与保存 Iceberg 数据路径一致。


通过 Hive 查询对应的 Iceberg 表中的数据,结果如下:


hive> select * from flink_iceberg_tbl;OK2  ls  19  shanghai3  ww  20  beijing1  zs  18  beijing4  ml  21  shanghai
复制代码

二、DataStream API 批量/实时读取 Iceberg 表

DataStream API 读取 Iceberg 表又分为批量读取和实时读取。通过方法“streaming(true/false)”来控制。

1、批量/全量读取

import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.data.RowData;import org.apache.hadoop.conf.Configuration;import org.apache.iceberg.flink.TableLoader;import org.apache.iceberg.flink.source.FlinkSource;
/** * 使用DataStream Api 批量/实时 读取Iceberg 数据 */public class StreamAPIReadIceberg { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.配置TableLoader Configuration hadoopConf = new Configuration(); TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);
//2.从Iceberg中读取全量/增量读取数据 DataStream<RowData> batchData = FlinkSource.forRowData().env(env) .tableLoader(tableLoader) //默认为false,整批次读取,设置为true 为流式读取 .streaming(false) .build();
batchData.map(new MapFunction<RowData, String>() { @Override public String map(RowData rowData) throws Exception { int id = rowData.getInt(0); String name = rowData.getString(1).toString(); int age = rowData.getInt(2); String loc = rowData.getString(3).toString(); return id+","+name+","+age+","+loc; } }).print();
env.execute("DataStream Api Read Data From Iceberg");
}}
复制代码


结果如下:


2、实时读取

//当配置 streaming参数为true时就是实时读取DataStream<RowData> batchData = FlinkSource.forRowData().env(env)        .tableLoader(tableLoader)        //默认为false,整批次读取,设置为true 为流式读取        .streaming(true)        .build();
复制代码


修改以上代码并启动,向 Hive 对应的 Iceberg 表“flink_iceberg_tbl”中插入 2 条数据:


在向 Hive 的 Iceberg 表中插入数据之前需要加入以下两个包:


add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
复制代码


向 Hive 中 Iceberg 表插入两条数据


hive> insert into flink_iceberg_tbl values (5,'s1',30,'guangzhou'),(6,'s2',31,'tianjin');
复制代码


插入完成之后,可以看到 Flink 控制台实时读取到对应数据


三、指定基于快照实时增量读取数据

以上案例我们发现 Flink 将表中所有数据都读取出来,我们也可以指定对应的 snapshot-id 决定基于哪些数据增量读取数据。


DataStream<RowData> batchData = FlinkSource.forRowData().env(env)        .tableLoader(tableLoader)        //基于某个快照实时增量读取数据,快照需要从元数据中获取        .startSnapshotId(4226332606322964975L)        //默认为false,整批次读取,设置为true 为流式读取        .streaming(true)        .build();
复制代码


结果只读取到指定快照往后的数据,如下:


四、合并 data files

Iceberg 提供 Api 将小文件合并成大文件,可以通过 Flink 批任务来执行。Flink 中合并小文件与 Spark 中小文件合并完全一样。


代码如下:


import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.hadoop.conf.Configuration;import org.apache.iceberg.Table;import org.apache.iceberg.actions.RewriteDataFilesActionResult;import org.apache.iceberg.catalog.Catalog;import org.apache.iceberg.catalog.TableIdentifier;import org.apache.iceberg.flink.TableLoader;import org.apache.iceberg.flink.actions.Actions;import org.apache.iceberg.hadoop.HadoopCatalog;
/** * 可以通过提交Flink批量任务来合并Data Files 文件。 */public class RewrietDataFiles { public static void main(String[] args) {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1.配置TableLoader Configuration hadoopConf = new Configuration();
//2.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表 Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");
//3.配置iceberg 库名和表名并加载表 TableIdentifier name = TableIdentifier.of("icebergdb", "flink_iceberg_tbl"); Table table = catalog.loadTable(name);
//4..合并 data files 小文件 RewriteDataFilesActionResult result = Actions.forTable(table) .rewriteDataFiles() //默认 512M ,可以手动通过以下指定合并文件大小,与Spark中一样。 .targetSizeInBytes(536870912L) .execute(); }}
复制代码


发布于: 2023-01-08阅读数: 26
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
数据湖(十七):Flink与Iceberg整合DataStream API操作_数据湖_Lansonli_InfoQ写作社区