写点什么

湖仓一体电商项目(十八):业务实现之编写写入 DWD 层业务代码

作者:Lansonli
  • 2023-02-04
    广东
  • 本文字数:1354 字

    阅读完需:约 4 分钟

湖仓一体电商项目(十八):业务实现之编写写入DWD层业务代码

业务实现之编写写入 DWD 层业务代码

一、代码编写

Flink 读取 Kafka topic “KAFKA-ODS-TOPIC” 数据写入 Iceberg-DWD 层也是复用第一个业务代码,这里只需要在代码中加入写入 Iceberg-DWD 层代码即可,代码如下:

//插入 iceberg - dwd 层 会员浏览商品日志信息 :DWD_BROWSELOGtblEnv.executeSql(  s"""    |insert into hadoop_iceberg.icebergdb.DWD_BROWSELOG    |select    | log_time,    | user_id2,    | user_ip,    | front_product_url,    | browse_product_url,    | browse_product_tpcode,    | browse_product_code,    | obtain_points    | from ${table} where iceberg_ods_tbl_name = 'ODS_BROWSELOG'  """.stripMargin)
复制代码


另外,在 Flink 处理此 topic 中每条数据时都有获取对应写入后续 Kafka topic 信息,本业务对应的每条用户日志数据写入的 kafka topic 为“KAFKA-DWD-BROWSE-LOG-TOPIC”,所以代码可以复用。

二、创建 Iceberg-DWD 层表

代码在执行之前需要在 Hive 中预先创建对应的 Iceberg 表,创建 Icebreg 表方式如下:

1、在 Hive 中添加 Iceberg 表格式需要的包

启动 HDFS 集群,node1 启动 Hive metastore 服务,在 Hive 客户端启动 Hive 添加 Iceberg 依赖包:

#node1节点启动Hive metastore服务[root@node1 ~]# hive --service metastore &
#在hive客户端node3节点加载两个jar包add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
复制代码


2、创建 Iceberg 表

这里创建 Iceberg-DWD 表有“DWD_BROWSELOG”,创建语句如下:

CREATE TABLE DWD_BROWSELOG  ( log_time string, user_id string, user_ip string, front_product_url string, browse_product_url string, browse_product_tpcode string, browse_product_code string, obtain_points string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/DWD_BROWSELOG/' TBLPROPERTIES ('iceberg.catalog'='location_based_table','write.metadata.delete-after-commit.enabled'= 'true','write.metadata.previous-versions-max' = '3');
复制代码


三、代码测试

以上代码编写完成后,代码执行测试步骤如下:

1、在 Kafka 中创建对应的 topic

#在Kafka 中创建 KAFKA-DWD-BROWSE-LOG-TOPIC topic./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DWD-BROWSE-LOG-TOPIC --partitions 3 --replication-factor 3
#监控以上topic数据[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DWD-BROWSE-LOG-TOPIC
复制代码


2、将代码中消费 Kafka 数据改成从头开始消费

代码中 Kafka Connector 中属性“scan.startup.mode”设置为“earliest-offset”,从头开始消费数据。

这里也可以不设置从头开始消费 Kafka 数据,而是直接启动向日志采集接口模拟生产日志代码“RTMockUserLogData.java”,需要启动日志采集接口及 Flume。

3、执行代码,查看对应结果

以上代码执行后在,在对应的 Kafka “KAFKA-DWD-BROWSE-LOG-TOPIC” topic 中都有对应的数据。在 Iceberg-DWD 层中对应的表中也有数据。

Kafka 中结果如下:

Iceberg-DWD 层表”DWD_BROWSELOG”中的数据如下:


发布于: 刚刚阅读数: 6
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网十万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
湖仓一体电商项目(十八):业务实现之编写写入DWD层业务代码_数据湖_Lansonli_InfoQ写作社区