写点什么

数据湖(十八):Flink 与 Iceberg 整合 SQL API 操作

作者:Lansonli
  • 2023-01-12
    广东
  • 本文字数:4489 字

    阅读完需:约 15 分钟

数据湖(十八):Flink与Iceberg整合SQL API操作

Flink 与 Iceberg 整合 SQL API 操作

Flink SQL 在操作 Iceberg 时,对应的版本为 Flink 1.11.x 与 Iceberg0.11.1 版本,目前,Flink1.14.2 版本与 Iceberg0.12.1 版本对于 SQL API 来说兼容有问题,所以这里使用 Flink1.11.6 版本与 Iceberg0.11.1 版本来演示 Flink SQL API 操作 Iceberg。

一、SQL API 创建 Iceberg 表并写入数据

1、创建新项目,导入如下 maven 依赖包

 <properties>  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  <maven.compiler.source>1.8</maven.compiler.source>  <maven.compiler.target>1.8</maven.compiler.target>  <!-- flink 1.11.x 与Iceberg 0.11.1 合适-->  <flink.version>1.11.6</flink.version>  <hadoop.version>3.2.2</hadoop.version></properties>
<dependencies> <!-- Flink 操作Iceberg 需要的Iceberg依赖 --> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink-runtime</artifactId> <version>0.11.1</version> </dependency>
<!-- java 开发Flink 所需依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency>
<!-- Flink Kafka连接器的依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency>
<!-- 读取hdfs文件需要jar包--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency>
<!-- Flink SQL & Table--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime-blink_2.11</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> </dependency>

<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency>
<!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.5</version> </dependency></dependencies>
复制代码


2、编写 Flink SQL 创建 Iceberg 表并写入数据

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//1.创建Catalog tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" + "'type'='iceberg'," + "'catalog-type'='hadoop'," + "'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.使用当前Catalog tblEnv.useCatalog("hadoop_iceberg");
//3.创建数据库 tblEnv.executeSql("create database iceberg_db");
//4.使用数据库 tblEnv.useDatabase("iceberg_db");
//5.创建iceberg表 flink_iceberg_tbl tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl2(id int,name string,age int,loc string) partitioned by (loc)");
//6.写入数据到表 flink_iceberg_tbl tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 values (1,'zs',18,'beijing'),(2,'ls',19,'shanghai'),(3,'ww',20,'guangzhou')");
复制代码


3、在 Hive 中映射 Iceberg 表并查询

在 Hive 中执行如下命令创建对应的 Iceberg 表:


#在Hive中创建Iceberg表CREATE TABLE flink_iceberg_tbl2  (  id int,   name string,  age int,  loc string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/flink_iceberg/iceberg_db/flink_iceberg_tbl2' TBLPROPERTIES ('iceberg.catalog'='location_based_table');
复制代码


#在Hive中查询Iceberg表中的数据hive> select * from flink_iceberg_tbl2;OK3  ww  20  guangzhou1  zs  18  beijing2  ls  19  shanghai
复制代码


二、SQL API 批量查询 Iceberg 表数据

Flink SQL API 批量查询 Iceberg 表数据,直接查询显示即可。代码如下:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//1.创建CatalogtblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" + "'type'='iceberg'," + "'catalog-type'='hadoop'," + "'warehouse'='hdfs://mycluster/flink_iceberg')");//2.批量读取表数据TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 ");
tableResult.print();
复制代码


结果如下:


三、SQL API 实时查询 Iceberg 表数据

Flink SQL API 实时查询 Iceberg 表数据时需要设置参数“table.dynamic-table-options.enabled”为 true,以支持 SQL 语法中的“OPTIONS”选项,代码如下:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
Configuration configuration = tblEnv.getConfig().getConfiguration();// 支持SQL语法中的 OPTIONS 选项configuration.setBoolean("table.dynamic-table-options.enabled", true);
//1.创建CatalogtblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" + "'type'='iceberg'," + "'catalog-type'='hadoop'," + "'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.从Iceberg表当前快照读取所有数据,并继续增量读取数据// streaming指定为true支持实时读取数据,monitor_interval 监控数据的间隔,默认1sTableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");
tableResult.print();
复制代码


启动以上代码后,可以看到会将目前存在于 Iceberg 表中的数据读取出来,向 Hive 中对应的 Iceberg 表中插入数据,可以看到控制台实时获取数据。


#在向Hive的Iceberg表中插入数据之前需要加入以下两个包:add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
#向Hive 中Iceberg 表插入两条数据hive> insert into flink_iceberg_tbl2 values (4,'ml',30,'shenzhen'),(5,'tq',31,'beijing');
复制代码


在控制台可以看到实时新增数据


四、SQL API 指定基于快照实时增量读取数据

Flink SQL API 还支持基于某个 snapshot-id 来继续实时获取数据,代码如下:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);env.enableCheckpointing(1000);
Configuration configuration = tblEnv.getConfig().getConfiguration();// 支持SQL语法中的 OPTIONS 选项configuration.setBoolean("table.dynamic-table-options.enabled", true);
//1.创建CatalogtblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" + "'type'='iceberg'," + "'catalog-type'='hadoop'," + "'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.从Iceberg 指定的快照继续实时读取数据,快照ID从对应的元数据中获取//start-snapshot-id :快照IDTableResult tableResult2 = tblEnv.executeSql("SELECT * FROM hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/");tableResult2.print();
复制代码


发布于: 2023-01-12阅读数: 25
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网十万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
数据湖(十八):Flink与Iceberg整合SQL API操作_数据湖_Lansonli_InfoQ写作社区