数据湖(十七):Flink 与 Iceberg 整合 DataStream API 操作
- 2022-11-06  广东
- 本文字数:8082 字 - 阅读完需:约 27 分钟 

Flink 与 Iceberg 整合 DataStream API 操作
目前 Flink 支持使用 DataStream API 和 SQL API 方式实时读取和写入 Iceberg 表,建议大家使用 SQL API 方式实时读取和写入 Iceberg 表。
Iceberg 支持的 Flink 版本为 1.11.x 版本以上,目前经过测试 Iceberg 版本与 Flink 的版本对应关系如下:
- Flink1.11.x 版本与 Iceberg0.11.1 版本匹配。 
- Flink1.12.x~Flink1.1.x 版本与 Iceberg0.12.1 版本匹配,SQL API 有一些 bug。 
- Flink1.14.x 版本与 Iceberg0.12.1 版本能整合但是有一些小 bug,例如实时读取 Iceberg 中的数据有 bug。 
以下 Flink 与 Iceberg 整合使用的 Flink 版本为 1.13.5,Iceberg 版本为 0.12.1 版本。后期使用 SQL API 操作时使用的 Flink 版本为 1.11.6,Iceberg 版本为 0.11.1 版本。
一、DataStream API 实时写入 Iceberg 表
DataStream Api 方式操作 Iceberg 方式目前仅支持 Java Api。使用 DataStream API 实时写入 Iceberg 表具体操作如下:
1、首先在 Maven 中导入以下依赖
<properties>  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  <maven.compiler.source>1.8</maven.compiler.source>  <maven.compiler.target>1.8</maven.compiler.target>  <!-- flink 1.12.x -1.13.x  版本与Iceberg 0.12.1 版本兼容 ,不能与Flink 1.14 兼容-->  <flink.version>1.13.5</flink.version>  <!--<flink.version>1.12.1</flink.version>-->  <!--<flink.version>1.14.2</flink.version>-->  <!-- flink 1.11.x 与Iceberg 0.11.1 合适-->  <!--<flink.version>1.11.6</flink.version>-->  <hadoop.version>3.2.2</hadoop.version></properties>
<dependencies>    <dependency>        <groupId>com.alibaba.ververica</groupId>        <artifactId>ververica-connector-iceberg</artifactId>        <version>1.13-vvr-4.0.7</version>    </dependency>    <!-- Flink 操作Iceberg 需要的Iceberg依赖 -->  <dependency>    <groupId>org.apache.iceberg</groupId>    <artifactId>iceberg-flink-runtime</artifactId>    <version>0.12.1</version>    <!--<version>0.11.1</version>-->  </dependency>
  <!-- java 开发Flink 所需依赖 -->  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-java</artifactId>    <version>${flink.version}</version>  </dependency>  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-streaming-java_2.11</artifactId>    <version>${flink.version}</version>  </dependency>
  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-clients_2.11</artifactId>    <version>${flink.version}</version>  </dependency>
  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-streaming-scala_2.11</artifactId>    <version>${flink.version}</version>  </dependency>
  <!-- Flink Kafka连接器的依赖 -->  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-kafka_2.11</artifactId>    <version>${flink.version}</version>  </dependency>  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-base</artifactId>    <version>${flink.version}</version>  </dependency>
  <!-- 读取hdfs文件需要jar包-->  <dependency>    <groupId>org.apache.hadoop</groupId>    <artifactId>hadoop-client</artifactId>    <version>${hadoop.version}</version>  </dependency>
  <!-- Flink SQL & Table-->  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-table-runtime-blink_2.11</artifactId>    <version>${flink.version}</version>  </dependency>
  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-table</artifactId>    <version>${flink.version}</version>  </dependency>
  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-table-common</artifactId>    <version>${flink.version}</version>  </dependency>
  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-table-api-java</artifactId>    <version>${flink.version}</version>  </dependency>  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-table-api-java-bridge_2.11</artifactId>    <version>${flink.version}</version>  </dependency>
  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-table-planner_2.11</artifactId>    <version>${flink.version}</version>  </dependency>
  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-table-planner-blink_2.11</artifactId>    <version>${flink.version}</version>  </dependency>
  <dependency>    <groupId>junit</groupId>    <artifactId>junit</artifactId>    <version>4.11</version>    <scope>test</scope>   </dependency>
  <!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->  <dependency>  <groupId>org.slf4j</groupId>  <artifactId>slf4j-log4j12</artifactId>  <version>1.7.25</version>  <scope>test</scope>  </dependency>  <dependency>  <groupId>log4j</groupId>  <artifactId>log4j</artifactId>  <version>1.2.17</version>  </dependency>  <dependency>  <groupId>org.slf4j</groupId>  <artifactId>slf4j-api</artifactId>  <version>1.7.25</version>  </dependency>  <dependency>  <groupId>org.slf4j</groupId>  <artifactId>slf4j-nop</artifactId>  <version>1.7.25</version>  <scope>test</scope>  </dependency>  <dependency>  <groupId>org.slf4j</groupId>  <artifactId>slf4j-simple</artifactId>  <version>1.7.5</version>  </dependency></dependencies>
2、编写代码使用 DataStream API 将 Kafka 数据写入到 Iceberg 表
import com.google.common.collect.ImmutableMap;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.data.GenericRowData;import org.apache.flink.table.data.RowData;import org.apache.hadoop.conf.Configuration;import org.apache.iceberg.*;import org.apache.iceberg.catalog.Catalog;import org.apache.iceberg.catalog.TableIdentifier;import org.apache.iceberg.flink.TableLoader;import org.apache.flink.table.data.StringData;import org.apache.iceberg.flink.sink.FlinkSink;import org.apache.iceberg.hadoop.HadoopCatalog;import org.apache.iceberg.types.Types;import java.util.Map;
/** * 使用DataStream Api 向Iceberg 表写入数据 */public class StreamAPIWriteIceberg {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //1.必须设置checkpoint ,Flink向Iceberg中写入数据时当checkpoint发生后,才会commit数据。        env.enableCheckpointing(5000);
        //2.读取Kafka 中的topic 数据        KafkaSource<String> source = KafkaSource.<String>builder()                .setBootstrapServers("node1:9092,node2:9092,node3:9092")                .setTopics("flink-iceberg-topic")                .setGroupId("my-group-id")                .setStartingOffsets(OffsetsInitializer.latest())                .setValueOnlyDeserializer(new SimpleStringSchema())                .build();        DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        //3.对数据进行处理,包装成RowData 对象,方便保存到Iceberg表中。        SingleOutputStreamOperator<RowData> dataStream = kafkaSource.map(new MapFunction<String, RowData>() {            @Override            public RowData map(String s) throws Exception {                System.out.println("s = "+s);                String[] split = s.split(",");                GenericRowData row = new GenericRowData(4);                row.setField(0, Integer.valueOf(split[0]));                row.setField(1, StringData.fromString(split[1]));                row.setField(2, Integer.valueOf(split[2]));                row.setField(3, StringData.fromString(split[3]));                return row;            }        });
        //4.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表        Configuration hadoopConf = new Configuration();        Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");
        //配置iceberg 库名和表名        TableIdentifier name =                TableIdentifier.of("icebergdb", "flink_iceberg_tbl");
        //创建Icebeng表Schema        Schema schema = new Schema(                Types.NestedField.required(1, "id", Types.IntegerType.get()),                Types.NestedField.required(2, "nane", Types.StringType.get()),                Types.NestedField.required(3, "age", Types.IntegerType.get()),                Types.NestedField.required(4, "loc", Types.StringType.get()));
        //如果有分区指定对应分区,这里“loc”列为分区列,可以指定unpartitioned 方法不设置表分区//        PartitionSpec spec = PartitionSpec.unpartitioned();        PartitionSpec spec = PartitionSpec.builderFor(schema).identity("loc").build();
        //指定Iceberg表数据格式化为Parquet存储        Map<String, String> props =                ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());        Table table = null;
        // 通过catalog判断表是否存在,不存在就创建,存在就加载        if (!catalog.tableExists(name)) {            table = catalog.createTable(name, schema, spec, props);        }else {            table = catalog.loadTable(name);        }
        TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);
        //5.通过DataStream Api 向Iceberg中写入数据        FlinkSink.forRowData(dataStream)                //这个 .table 也可以不写,指定tableLoader 对应的路径就可以。                .table(table)                .tableLoader(tableLoader)                //默认为false,追加数据。如果设置为true 就是覆盖数据                .overwrite(false)                .build();
        env.execute("DataStream Api Write Data To Iceberg");    }}
以上代码有如下几个注意点:
- 需要设置 Checkpoint,Flink 向 Iceberg 中写入 Commit 数据时,只有 Checkpoint 成功之后才会 Commit 数据,否则后期在 Hive 中查询不到数据。 
- 读取 Kafka 数据后需要包装成 RowData 或者 Row 对象,才能向 Iceberg 表中写出数据。写出数据时默认是追加数据,如果指定 overwrite 就是全部覆盖数据。 
- 在向 Iceberg 表中写数据之前需要创建对应的 Catalog、表 Schema,否则写出时只指定对应的路径会报错找不到对应的 Iceberg 表。 
- 不建议使用 DataStream API 向 Iceberg 中写数据,建议使用 SQL API。 
3、在 Kafka 中创建代码中指定的“flink-iceberg-topic”并启动代码生产数据
# 在Kafka 中创建 flink-iceberg-topic topic[root@node1 bin]# ./kafka-topics.sh  --zookeeper node3:2181,node4:2181,node5:2181  --create  --topic flink-iceberg-topic  --partitions 3 --replication-factor 3
创建好以上 topic 之后,启动代码,然后向 topic 中生产以下数据:
[root@node1 bin]#./kafka-console-producer.sh  --topic flink-iceberg-topic --broker-list node1:9092,node2:9092,node3:90921,zs,18,beijing2,ls,19,shanghai3,ww,20,beijing4,ml,21,shanghai
可以看到在 HDFS 对应的路径中保存了对应的数据:
 
 ;
4、通过 Hive 查看保存到 Iceberg 中的数据
启动 Hive、Hive Metastore 在 Hive 中创建映射 Iceberg 的外表:
CREATE TABLE flink_iceberg_tbl  (  id int,   name string,  age int,  loc string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl' TBLPROPERTIES ('iceberg.catalog'='location_based_table');
注意:虽然 loc 是分区列,创建时忽略分区列就可以,此外映射表的路径要保持与保存 Iceberg 数据路径一致。
通过 Hive 查询对应的 Iceberg 表中的数据,结果如下:
hive> select * from flink_iceberg_tbl;OK2  ls  19  shanghai3  ww  20  beijing1  zs  18  beijing4  ml  21  shanghai
二、DataStream API 批量/实时读取 Iceberg 表
DataStream API 读取 Iceberg 表又分为批量读取和实时读取。通过方法“streaming(true/false)”来控制。
1、批量/全量读取
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.data.RowData;import org.apache.hadoop.conf.Configuration;import org.apache.iceberg.flink.TableLoader;import org.apache.iceberg.flink.source.FlinkSource;
/** * 使用DataStream Api 批量/实时 读取Iceberg 数据 */public class StreamAPIReadIceberg {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.配置TableLoader        Configuration hadoopConf = new Configuration();        TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);
        //2.从Iceberg中读取全量/增量读取数据        DataStream<RowData> batchData = FlinkSource.forRowData().env(env)                .tableLoader(tableLoader)                //默认为false,整批次读取,设置为true 为流式读取                .streaming(false)                .build();
        batchData.map(new MapFunction<RowData, String>() {            @Override            public String map(RowData rowData) throws Exception {                int id = rowData.getInt(0);                String name = rowData.getString(1).toString();                int age = rowData.getInt(2);                String loc = rowData.getString(3).toString();                return id+","+name+","+age+","+loc;            }        }).print();
        env.execute("DataStream Api Read Data From Iceberg");
    }}
结果如下:
 
 2、实时读取
//当配置 streaming参数为true时就是实时读取DataStream<RowData> batchData = FlinkSource.forRowData().env(env)        .tableLoader(tableLoader)        //默认为false,整批次读取,设置为true 为流式读取        .streaming(true)        .build();
修改以上代码并启动,向 Hive 对应的 Iceberg 表“flink_iceberg_tbl”中插入 2 条数据:
在向 Hive 的 Iceberg 表中插入数据之前需要加入以下两个包:
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
向 Hive 中 Iceberg 表插入两条数据
hive> insert into flink_iceberg_tbl values (5,'s1',30,'guangzhou'),(6,'s2',31,'tianjin');
插入完成之后,可以看到 Flink 控制台实时读取到对应数据
 
 三、指定基于快照实时增量读取数据
以上案例我们发现 Flink 将表中所有数据都读取出来,我们也可以指定对应的 snapshot-id 决定基于哪些数据增量读取数据。
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)        .tableLoader(tableLoader)        //基于某个快照实时增量读取数据,快照需要从元数据中获取        .startSnapshotId(4226332606322964975L)        //默认为false,整批次读取,设置为true 为流式读取        .streaming(true)        .build();
结果只读取到指定快照往后的数据,如下:
 
 四、合并 data files
Iceberg 提供 Api 将小文件合并成大文件,可以通过 Flink 批任务来执行。Flink 中合并小文件与 Spark 中小文件合并完全一样。
代码如下:
import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.hadoop.conf.Configuration;import org.apache.iceberg.Table;import org.apache.iceberg.actions.RewriteDataFilesActionResult;import org.apache.iceberg.catalog.Catalog;import org.apache.iceberg.catalog.TableIdentifier;import org.apache.iceberg.flink.TableLoader;import org.apache.iceberg.flink.actions.Actions;import org.apache.iceberg.hadoop.HadoopCatalog;
/** *  可以通过提交Flink批量任务来合并Data Files 文件。 */public class RewrietDataFiles {    public static void main(String[] args) {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //1.配置TableLoader        Configuration hadoopConf = new Configuration();
        //2.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表        Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");
        //3.配置iceberg 库名和表名并加载表        TableIdentifier name =                TableIdentifier.of("icebergdb", "flink_iceberg_tbl");        Table table = catalog.loadTable(name);
        //4..合并 data files 小文件        RewriteDataFilesActionResult result = Actions.forTable(table)                .rewriteDataFiles()                //默认 512M ,可以手动通过以下指定合并文件大小,与Spark中一样。                .targetSizeInBytes(536870912L)                .execute();    }}
版权声明: 本文为 InfoQ 作者【Lansonli】的原创文章。
原文链接:【http://xie.infoq.cn/article/68285f7bb6008d226728ebfe5】。文章转载请联系作者。


Lansonli
微信公众号:三帮大数据 2022-07-12 加入
CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师









 
    
评论