写点什么

湖仓一体电商项目(十四):实时任务执行流程

作者:Lansonli
  • 2022-11-26
    广东
  • 本文字数:2166 字

    阅读完需:约 7 分钟

湖仓一体电商项目(十四):实时任务执行流程

实时任务执行流程

目前暂时将项目在本地执行,执行顺序如下:

一、准备环境

这里默认 HDFS、Hive、HBase、Kafka 环境已经准备,启动 maxwell 组件监控 mysql 业务库数据:

#在Kafka中创建好对应的kafka topic(已创建的topic,可忽略,避免重复创建)./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DB-BUSSINESS-DATA --partitions 3 --replication-factor 3
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-ODS-TOPIC --partitions 3 --replication-factor 3
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DIM-TOPIC --partitions 3 --replication-factor 3
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DWD-USER-LOGIN-TOPIC --partitions 3 --replication-factor 3
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DWS-USER-LOGIN-WIDE-TOPIC --partitions 3 --replication-factor 3
#启动maxwell[root@node3 ~]# cd /software/maxwell-1.28.2/bin[root@node3 bin]# maxwell --config ../config.properties
#在Hive中创建好需要的Iceberg各层的表add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
CREATE TABLE ODS_MEMBER_INFO (id string,user_id string,member_growth_score string,member_level string,balance string,gmt_create string,gmt_modified string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_MEMBER_INFO/' TBLPROPERTIES ('iceberg.catalog'='location_based_table','write.metadata.delete-after-commit.enabled'= 'true','write.metadata.previous-versions-max' = '3');

CREATE TABLE ODS_MEMBER_ADDRESS (id string,user_id string,province string,city string,area string,address string,log string,lat string,phone_number string,consignee_name string,gmt_create string,gmt_modified string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_MEMBER_ADDRESS/' TBLPROPERTIES ('iceberg.catalog'='location_based_table','write.metadata.delete-after-commit.enabled'= 'true','write.metadata.previous-versions-max' = '3');
CREATE TABLE ODS_USER_LOGIN (id string,user_id string,ip string,login_tm string,logout_tm string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_USER_LOGIN/' TBLPROPERTIES ('iceberg.catalog'='location_based_table','write.metadata.delete-after-commit.enabled'= 'true','write.metadata.previous-versions-max' = '3');
CREATE TABLE DWD_USER_LOGIN (id string,user_id string,ip string,login_tm string,logout_tm string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/DWD_USER_LOGIN/' TBLPROPERTIES ('iceberg.catalog'='location_based_table','write.metadata.delete-after-commit.enabled'= 'true','write.metadata.previous-versions-max' = '3');
CREATE TABLE DWS_USER_LOGIN (user_id string,ip string,gmt_create string,login_tm string,logout_tm string,member_level string,province string,city string,area string,address string,member_points string,balance string,member_growth_score string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/DWS_USER_LOGIN/' TBLPROPERTIES ('iceberg.catalog'='location_based_table','write.metadata.delete-after-commit.enabled'= 'true','write.metadata.previous-versions-max' = '3');

#启动Clickhouse[root@node1 ~]# service clickhouse-server start
#在Clickhouse中创建好对应表create table dm_user_login_info( dt String, province String, city String, user_id String, login_tm String, gmt_create String) engine = MergeTree() order by dt;
复制代码


二、启动 Flink 代码

依次启动如下 Flink 代码:”ProduceKafkaDBDataToODS.scala”、“DimDataToHBase.scala”、“ProduceKafkaODSDataToDWD.scala”、“ProduceUserLogInToDWS.scala”、“ProcessUserLoginInfoToDM.scala”代码。各个代码中 Kafka Connector 属性“scan.startup.mode”设置为“latest-offset”,从最新位置消费数据。

注意:代码执行时可以设置使用内存参数:-Xmx300m -Xms300m

三、启动数据采集接口代码

启动项目“LakeHouseDataPublish”发布数据。

四、启动模拟数据代码

启动项目“LakeHouseMockData”中模拟向数据库中生产数据代码“RTMockDBData.java”。

发布于: 刚刚阅读数: 5
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
湖仓一体电商项目(十四):实时任务执行流程_湖仓一体电商项目_Lansonli_InfoQ写作社区