数据湖(十七):Flink 与 Iceberg 整合 DataStream API 操作
- 2023-01-08 广东
本文字数:8081 字
阅读完需:约 27 分钟
Flink 与 Iceberg 整合 DataStream API 操作
目前 Flink 支持使用 DataStream API 和 SQL API 方式实时读取和写入 Iceberg 表,建议大家使用 SQL API 方式实时读取和写入 Iceberg 表。
Iceberg 支持的 Flink 版本为 1.11.x 版本以上,目前经过测试 Iceberg 版本与 Flink 的版本对应关系如下:
Flink1.11.x 版本与 Iceberg0.11.1 版本匹配。
Flink1.12.x~Flink1.1.x 版本与 Iceberg0.12.1 版本匹配,SQL API 有一些 bug。
Flink1.14.x 版本与 Iceberg0.12.1 版本能整合但是有一些小 bug,例如实时读取 Iceberg 中的数据有 bug。
以下 Flink 与 Iceberg 整合使用的 Flink 版本为 1.13.5,Iceberg 版本为 0.12.1 版本。后期使用 SQL API 操作时使用的 Flink 版本为 1.11.6,Iceberg 版本为 0.11.1 版本。
一、DataStream API 实时写入 Iceberg 表
DataStream Api 方式操作 Iceberg 方式目前仅支持 Java Api。使用 DataStream API 实时写入 Iceberg 表具体操作如下:
1、首先在 Maven 中导入以下依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!-- flink 1.12.x -1.13.x 版本与Iceberg 0.12.1 版本兼容 ,不能与Flink 1.14 兼容-->
<flink.version>1.13.5</flink.version>
<!--<flink.version>1.12.1</flink.version>-->
<!--<flink.version>1.14.2</flink.version>-->
<!-- flink 1.11.x 与Iceberg 0.11.1 合适-->
<!--<flink.version>1.11.6</flink.version>-->
<hadoop.version>3.2.2</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-iceberg</artifactId>
<version>1.13-vvr-4.0.7</version>
</dependency>
<!-- Flink 操作Iceberg 需要的Iceberg依赖 -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime</artifactId>
<version>0.12.1</version>
<!--<version>0.11.1</version>-->
</dependency>
<!-- java 开发Flink 所需依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka连接器的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 读取hdfs文件需要jar包-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- Flink SQL & Table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
2、编写代码使用 DataStream API 将 Kafka 数据写入到 Iceberg 表
import com.google.common.collect.ImmutableMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import java.util.Map;
/**
* 使用DataStream Api 向Iceberg 表写入数据
*/
public class StreamAPIWriteIceberg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.必须设置checkpoint ,Flink向Iceberg中写入数据时当checkpoint发生后,才会commit数据。
env.enableCheckpointing(5000);
//2.读取Kafka 中的topic 数据
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("node1:9092,node2:9092,node3:9092")
.setTopics("flink-iceberg-topic")
.setGroupId("my-group-id")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
//3.对数据进行处理,包装成RowData 对象,方便保存到Iceberg表中。
SingleOutputStreamOperator<RowData> dataStream = kafkaSource.map(new MapFunction<String, RowData>() {
@Override
public RowData map(String s) throws Exception {
System.out.println("s = "+s);
String[] split = s.split(",");
GenericRowData row = new GenericRowData(4);
row.setField(0, Integer.valueOf(split[0]));
row.setField(1, StringData.fromString(split[1]));
row.setField(2, Integer.valueOf(split[2]));
row.setField(3, StringData.fromString(split[3]));
return row;
}
});
//4.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表
Configuration hadoopConf = new Configuration();
Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");
//配置iceberg 库名和表名
TableIdentifier name =
TableIdentifier.of("icebergdb", "flink_iceberg_tbl");
//创建Icebeng表Schema
Schema schema = new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "nane", Types.StringType.get()),
Types.NestedField.required(3, "age", Types.IntegerType.get()),
Types.NestedField.required(4, "loc", Types.StringType.get()));
//如果有分区指定对应分区,这里“loc”列为分区列,可以指定unpartitioned 方法不设置表分区
// PartitionSpec spec = PartitionSpec.unpartitioned();
PartitionSpec spec = PartitionSpec.builderFor(schema).identity("loc").build();
//指定Iceberg表数据格式化为Parquet存储
Map<String, String> props =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
Table table = null;
// 通过catalog判断表是否存在,不存在就创建,存在就加载
if (!catalog.tableExists(name)) {
table = catalog.createTable(name, schema, spec, props);
}else {
table = catalog.loadTable(name);
}
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);
//5.通过DataStream Api 向Iceberg中写入数据
FlinkSink.forRowData(dataStream)
//这个 .table 也可以不写,指定tableLoader 对应的路径就可以。
.table(table)
.tableLoader(tableLoader)
//默认为false,追加数据。如果设置为true 就是覆盖数据
.overwrite(false)
.build();
env.execute("DataStream Api Write Data To Iceberg");
}
}
以上代码有如下几个注意点:
需要设置 Checkpoint,Flink 向 Iceberg 中写入 Commit 数据时,只有 Checkpoint 成功之后才会 Commit 数据,否则后期在 Hive 中查询不到数据。
读取 Kafka 数据后需要包装成 RowData 或者 Row 对象,才能向 Iceberg 表中写出数据。写出数据时默认是追加数据,如果指定 overwrite 就是全部覆盖数据。
在向 Iceberg 表中写数据之前需要创建对应的 Catalog、表 Schema,否则写出时只指定对应的路径会报错找不到对应的 Iceberg 表。
不建议使用 DataStream API 向 Iceberg 中写数据,建议使用 SQL API。
3、在 Kafka 中创建代码中指定的“flink-iceberg-topic”并启动代码生产数据
# 在Kafka 中创建 flink-iceberg-topic topic
[root@node1 bin]# ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic flink-iceberg-topic --partitions 3 --replication-factor 3
创建好以上 topic 之后,启动代码,然后向 topic 中生产以下数据:
[root@node1 bin]#./kafka-console-producer.sh --topic flink-iceberg-topic --broker-list node1:9092,node2:9092,node3:9092
1,zs,18,beijing
2,ls,19,shanghai
3,ww,20,beijing
4,ml,21,shanghai
可以看到在 HDFS 对应的路径中保存了对应的数据:
4、通过 Hive 查看保存到 Iceberg 中的数据
启动 Hive、Hive Metastore 在 Hive 中创建映射 Iceberg 的外表:
CREATE TABLE flink_iceberg_tbl (
id int,
name string,
age int,
loc string
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');
注意:虽然 loc 是分区列,创建时忽略分区列就可以,此外映射表的路径要保持与保存 Iceberg 数据路径一致。
通过 Hive 查询对应的 Iceberg 表中的数据,结果如下:
hive> select * from flink_iceberg_tbl;
OK
2 ls 19 shanghai
3 ww 20 beijing
1 zs 18 beijing
4 ml 21 shanghai
二、DataStream API 批量/实时读取 Iceberg 表
DataStream API 读取 Iceberg 表又分为批量读取和实时读取。通过方法“streaming(true/false)”来控制。
1、批量/全量读取
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSource;
/**
* 使用DataStream Api 批量/实时 读取Iceberg 数据
*/
public class StreamAPIReadIceberg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.配置TableLoader
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);
//2.从Iceberg中读取全量/增量读取数据
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
//默认为false,整批次读取,设置为true 为流式读取
.streaming(false)
.build();
batchData.map(new MapFunction<RowData, String>() {
@Override
public String map(RowData rowData) throws Exception {
int id = rowData.getInt(0);
String name = rowData.getString(1).toString();
int age = rowData.getInt(2);
String loc = rowData.getString(3).toString();
return id+","+name+","+age+","+loc;
}
}).print();
env.execute("DataStream Api Read Data From Iceberg");
}
}
结果如下:
2、实时读取
//当配置 streaming参数为true时就是实时读取
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
//默认为false,整批次读取,设置为true 为流式读取
.streaming(true)
.build();
修改以上代码并启动,向 Hive 对应的 Iceberg 表“flink_iceberg_tbl”中插入 2 条数据:
在向 Hive 的 Iceberg 表中插入数据之前需要加入以下两个包:
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
向 Hive 中 Iceberg 表插入两条数据
hive> insert into flink_iceberg_tbl values (5,'s1',30,'guangzhou'),(6,'s2',31,'tianjin');
插入完成之后,可以看到 Flink 控制台实时读取到对应数据
三、指定基于快照实时增量读取数据
以上案例我们发现 Flink 将表中所有数据都读取出来,我们也可以指定对应的 snapshot-id 决定基于哪些数据增量读取数据。
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
//基于某个快照实时增量读取数据,快照需要从元数据中获取
.startSnapshotId(4226332606322964975L)
//默认为false,整批次读取,设置为true 为流式读取
.streaming(true)
.build();
结果只读取到指定快照往后的数据,如下:
四、合并 data files
Iceberg 提供 Api 将小文件合并成大文件,可以通过 Flink 批任务来执行。Flink 中合并小文件与 Spark 中小文件合并完全一样。
代码如下:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFilesActionResult;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.actions.Actions;
import org.apache.iceberg.hadoop.HadoopCatalog;
/**
* 可以通过提交Flink批量任务来合并Data Files 文件。
*/
public class RewrietDataFiles {
public static void main(String[] args) {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1.配置TableLoader
Configuration hadoopConf = new Configuration();
//2.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表
Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");
//3.配置iceberg 库名和表名并加载表
TableIdentifier name =
TableIdentifier.of("icebergdb", "flink_iceberg_tbl");
Table table = catalog.loadTable(name);
//4..合并 data files 小文件
RewriteDataFilesActionResult result = Actions.forTable(table)
.rewriteDataFiles()
//默认 512M ,可以手动通过以下指定合并文件大小,与Spark中一样。
.targetSizeInBytes(536870912L)
.execute();
}
}
版权声明: 本文为 InfoQ 作者【Lansonli】的原创文章。
原文链接:【http://xie.infoq.cn/article/95de999e89eb7f05f6982bc1b】。文章转载请联系作者。
Lansonli
微信公众号:三帮大数据 2022-07-12 加入
CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师
评论