写点什么

数据湖(十九):SQL API 读取 Kafka 数据实时写入 Iceberg 表

作者:Lansonli
  • 2023-01-16
    广东
  • 本文字数:1513 字

    阅读完需:约 5 分钟

数据湖(十九):SQL API 读取Kafka数据实时写入Iceberg表

SQL API 读取 Kafka 数据实时写入 Iceberg 表

从 Kafka 中实时读取数据写入到 Iceberg 表中,操作步骤如下:

一、首先需要创建对应的 Iceberg 表

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);env.enableCheckpointing(1000);//1.创建CatalogtblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +        "'type'='iceberg'," +        "'catalog-type'='hadoop'," +        "'warehouse'='hdfs://mycluster/flink_iceberg')");//2.创建iceberg表 flink_iceberg_tbltblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");
复制代码

二、编写代码读取 Kafka 数据实时写入 Iceberg

public class ReadKafkaToIceberg {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);        env.enableCheckpointing(1000);
/** * 1.需要预先创建 Catalog 及Iceberg表 */ //1.创建Catalog tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" + "'type'='iceberg'," + "'catalog-type'='hadoop'," + "'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.创建iceberg表 flink_iceberg_tbl// tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");
//3.创建 Kafka Connector,连接消费Kafka中数据 tblEnv.executeSql("create table kafka_input_table(" + " id int," + " name varchar," + " age int," + " loc varchar" + ") with (" + " 'connector' = 'kafka'," + " 'topic' = 'flink-iceberg-topic'," + " 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092'," + " 'scan.startup.mode'='latest-offset'," + " 'properties.group.id' = 'my-group-id'," + " 'format' = 'csv'" + ")");
//4.配置 table.dynamic-table-options.enabled Configuration configuration = tblEnv.getConfig().getConfiguration(); // 支持SQL语法中的 OPTIONS 选项 configuration.setBoolean("table.dynamic-table-options.enabled", true);
//5.写入数据到表 flink_iceberg_tbl3 tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 select id,name,age,loc from kafka_input_table");
//6.查询表数据 TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/"); tableResult.print(); }}
复制代码


启动以上代码,向 Kafka topic 中生产如下数据:


1,zs,18,beijing2,ls,19,shanghai3,ww,20,beijing4,ml,21,shanghai
复制代码


我们可以看到控制台上有对应实时数据输出,查看对应的 Icberg HDFS 目录,数据写入成功。


发布于: 刚刚阅读数: 3
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网十万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
数据湖(十九):SQL API 读取Kafka数据实时写入Iceberg表_数据湖_Lansonli_InfoQ写作社区