写点什么

湖仓一体电商项目(十六):业务实现之编写写入 ODS 层业务代码

作者:Lansonli
  • 2022-11-30
    广东
  • 本文字数:5110 字

    阅读完需:约 17 分钟

湖仓一体电商项目(十六):业务实现之编写写入ODS层业务代码

业务实现之编写写入 ODS 层业务代码

由于本业务涉及到 MySQL 业务数据和用户日志数据,两类数据是分别采集存储在不同的 Kafka Topic 中的,所以这里写入 ODS 层代码由两个代码组成。

一、代码编写

处理 MySQL 业务库 binlog 数据的代码复用第一个业务代码只需要在”ProduceKafkaDBDataToODS.scala” 代码中写入存入 Icebeg-ODS 层表的代码即可,“ProduceKafkaDBDataToODS.scala”代码文件中加入代码如下:


//向Iceberg ods 层 ODS_PRODUCT_CATEGORY 表插入数据tblEnv.executeSql(  """    |insert into hadoop_iceberg.icebergdb.ODS_PRODUCT_CATEGORY    |select    |   data['id'] as id ,    |   data['p_id'] as p_id,    |   data['name'] as name,    |   data['pic_url'] as pic_url,    |   data['gmt_create'] as gmt_create    | from kafka_db_bussiness_tbl where `table` = 'pc_product_category'  """.stripMargin)
//向Iceberg ods 层 ODS_PRODUCT_INFO 表插入数据tblEnv.executeSql( """ |insert into hadoop_iceberg.icebergdb.ODS_PRODUCT_INFO |select | data['product_id'] as product_id , | data['category_id'] as category_id, | data['product_name'] as product_name, | data['gmt_create'] as gmt_create | from kafka_db_bussiness_tbl where `table` = 'pc_product' """.stripMargin)处理用户日志的代码需要自己编写,代码中的业务逻辑主要是读取存储用户浏览日志数据topic “KAFKA-USER-LOG-DATA”中的数据,通过Flink代码处理将不同类型用户日志处理成json类型数据,将该json结果后续除了存储在Iceberg-ODS层对应的表之外还要将数据存储在Kafka topic “KAFKA-ODS-TOPIC” 中方便后续的业务处理。具体代码参照“ProduceKafkaLogDataToODS.scala”,主要代码逻辑如下:object ProduceKafkaLogDataToODS { private val kafkaBrokers: String = ConfigUtil.KAFKA_BROKERS private val kafkaOdsTopic: String = ConfigUtil.KAFKA_ODS_TOPIC private val kafkaDwdBrowseLogTopic: String = ConfigUtil.KAFKA_DWD_BROWSELOG_TOPIC
def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) env.enableCheckpointing(5000)
import org.apache.flink.streaming.api.scala._
/** * 1.需要预先创建 Catalog * 创建Catalog,创建表需要在Hive中提前创建好,不在代码中创建,因为在Flink中创建iceberg表不支持create table if not exists ...语法 */ tblEnv.executeSql( """ |create catalog hadoop_iceberg with ( | 'type'='iceberg', | 'catalog-type'='hadoop', | 'warehouse'='hdfs://mycluster/lakehousedata' |) """.stripMargin)
/** * { * "logtype": "browselog", * "data": { * "browseProductCode": "eSHd1sFat9", * "browseProductTpCode": "242", * "userIp": "251.100.236.37", * "obtainPoints": 32, * "userId": "uid208600", * "frontProductUrl": "https://f/dcjp/nVnE", * "logTime": 1646980514321, * "browseProductUrl": "https://kI/DXSNBeP/" * } * } */
/** * 2.创建 Kafka Connector,连接消费Kafka中数据 * 注意:1).关键字要使用 " 飘"符号引起来 2).对于json对象使用 map < String,String>来接收 */ tblEnv.executeSql( """ |create table kafka_log_data_tbl( | logtype string, | data map<string,string> |) with ( | 'connector' = 'kafka', | 'topic' = 'KAFKA-USER-LOG-DATA', | 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092', | 'scan.startup.mode'='earliest-offset', --也可以指定 earliest-offset 、latest-offset | 'properties.group.id' = 'my-group-id', | 'format' = 'json' |) """.stripMargin)
/** * 3.将不同的业务库数据存入各自的Iceberg表 */ tblEnv.executeSql( """ |insert into hadoop_iceberg.icebergdb.ODS_BROWSELOG |select | data['logTime'] as log_time , | data['userId'] as user_id, | data['userIp'] as user_ip, | data['frontProductUrl'] as front_product_url, | data['browseProductUrl'] as browse_product_url, | data['browseProductTpCode'] as browse_product_tpcode, | data['browseProductCode'] as browse_product_code, | data['obtainPoints'] as obtain_points | from kafka_log_data_tbl where `logtype` = 'browselog' """.stripMargin)

//4.将用户所有日志数据组装成Json数据存入 kafka topic ODS-TOPIC 中 //读取 Kafka 中的数据,将维度数据另外存储到 Kafka 中 val kafkaLogTbl: Table = tblEnv.sqlQuery("select logtype,data from kafka_log_data_tbl")
//将 kafkaLogTbl Table 转换成 DataStream 数据 val userLogDS: DataStream[Row] = tblEnv.toAppendStream[Row](kafkaLogTbl) //将 userLogDS 数据转换成JSON 数据写出到 kafka topic ODS-TOPIC val odsSinkDS: DataStream[String] = userLogDS.map(row => { //最后返回给Kafka 日志数据的json对象 val returnJsonObj = new JSONObject() val logType: String = row.getField(0).toString
val data: String = row.getField(1).toString val nObject = new JSONObject() val arr: Array[String] = data.stripPrefix("{").stripSuffix("}").split(",") for (elem <- arr) { //有些数据 “data”中属性没有值,就没有“=” if (elem.contains("=") && elem.split("=").length == 2) { val split: Array[String] = elem.split("=") nObject.put(split(0).trim, split(1).trim) } else { nObject.put(elem.stripSuffix("=").trim, "") } }
if ("browselog".equals(logType)) { returnJsonObj.put("iceberg_ods_tbl_name", "ODS_BROWSELOG") returnJsonObj.put("kafka_dwd_topic",kafkaDwdBrowseLogTopic) returnJsonObj.put("data",nObject.toString) } else { //其他日志,这里目前没有 }
returnJsonObj.toJSONString })
val props = new Properties() props.setProperty("bootstrap.servers",kafkaBrokers)
odsSinkDS.addSink(new FlinkKafkaProducer[String](kafkaOdsTopic,new KafkaSerializationSchema[String] { override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { new ProducerRecord[Array[Byte],Array[Byte]](kafkaOdsTopic,null,element.getBytes()) } },props,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))
env.execute()
}}
复制代码

二、创建 Iceberg-ODS 层表

代码在执行之前需要在 Hive 中预先创建对应的 Iceberg 表,创建 Icebreg 表方式如下:

1、在 Hive 中添加 Iceberg 表格式需要的包

启动 HDFS 集群,node1 启动 Hive metastore 服务,在 Hive 客户端启动 Hive 添加 Iceberg 依赖包:


#node1节点启动Hive metastore服务[root@node1 ~]# hive --service metastore &
#在hive客户端node3节点加载两个jar包add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
复制代码

2、创建 Iceberg 表

这里创建 Iceberg 表有“ODS_PRODUCT_CATEGORY”、“ODS_PRODUCT_INFO”,创建语句如下:


CREATE TABLE ODS_PRODUCT_CATEGORY (id string,p_id string,name string,pic_url string,gmt_create string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_PRODUCT_CATEGORY/' TBLPROPERTIES ('iceberg.catalog'='location_based_table','write.metadata.delete-after-commit.enabled'= 'true','write.metadata.previous-versions-max' = '3');
CREATE TABLE ODS_PRODUCT_INFO (product_id string,category_id string,product_name string,gmt_create string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_PRODUCT_INFO/' TBLPROPERTIES ('iceberg.catalog'='location_based_table','write.metadata.delete-after-commit.enabled'= 'true','write.metadata.previous-versions-max' = '3');
CREATE TABLE ODS_BROWSELOG ( log_time string, user_id string, user_ip string, front_product_url string, browse_product_url string, browse_product_tpcode string, browse_product_code string, obtain_points string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_BROWSELOG/' TBLPROPERTIES ('iceberg.catalog'='location_based_table','write.metadata.delete-after-commit.enabled'= 'true','write.metadata.previous-versions-max' = '3');
复制代码


以上语句在 Hive 客户端执行完成之后,在 HDFS 中可以看到对应的 Iceberg 数据目录:


三、代码测试

以上代码编写完成后,代码执行测试步骤如下:

1、在 Kafka 中创建对应的 topic

#在Kafka 中创建 KAFKA-USER-LOG-DATA topic./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-USER-LOG-DATA --partitions 3 --replication-factor 3
#在Kafka 中创建 KAFKA-ODS-TOPIC topic(第一个业务已创建可忽略)./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-ODS-TOPIC --partitions 3 --replication-factor 3
#在Kafka 中创建 KAFKA-DIM-TOPIC topic(第一个业务已创建可忽略)./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DIM-TOPIC --partitions 3 --replication-factor 3
#监控以上两个topic数据[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-ODS-TOPIC
[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DIM-TOPIC
复制代码

2、将代码中消费 Kafka 数据改成从头开始消费

代码中 Kafka Connector 中属性“scan.startup.mode”设置为“earliest-offset”,从头开始消费数据。


这里也可以不设置从头开始消费 Kafka 数据,而是直接启动实时向 MySQL 表中写入数据代码“RTMockDBData.java”代码,实时向 MySQL 对应的表中写入数据,这里需要启动 maxwell 监控数据,代码才能实时监控到写入 MySQL 的业务数据。


针对用户日志数据可以启动代码“RTMockUserLogData.java”,实时向日志采集接口写入数据。

3、启动日志采集接口,启动 Flume 监控

如果上个步骤中设置从“earliest-offset”消费 kafka 数据,可以暂时不启动日志采集接口和 Flume


#在node5节点上启动日志采集接口[root@node5 ~]# cd /software/[root@node5 software]# java -jar logcollector-0.0.1-SNAPSHOT.jar

#在node5节点上启动Flume[root@node5 software]# flume-ng agent --name a -f /software/a.properties -Dflume.root.logger=INFO,console
复制代码

4、执行代码,查看对应 topic 中的结果

以上代码执行后在,在对应的 Kafka “KAFKA-DIM-TOPIC”和“KAFKA-ODS-TOPIC”中都有对应的数据。在 Iceberg-ODS 层中对应的表中也有数据。

5、执行模拟生产用户日志代码,查看对应 topic 中的结果

执行模拟产生用户日志数据代码:RTMockUserLogData.java,观察对应的 Kafak “KAFKA-ODS-TOPIC”中有实时数据被采集。


发布于: 2022-11-30阅读数: 20
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
湖仓一体电商项目(十六):业务实现之编写写入ODS层业务代码_湖仓一体电商项目_Lansonli_InfoQ写作社区