湖仓一体电商项目(十六):业务实现之编写写入 ODS 层业务代码
- 2022-11-30 广东
本文字数:5110 字
阅读完需:约 17 分钟
业务实现之编写写入 ODS 层业务代码
由于本业务涉及到 MySQL 业务数据和用户日志数据,两类数据是分别采集存储在不同的 Kafka Topic 中的,所以这里写入 ODS 层代码由两个代码组成。
一、代码编写
处理 MySQL 业务库 binlog 数据的代码复用第一个业务代码只需要在”ProduceKafkaDBDataToODS.scala” 代码中写入存入 Icebeg-ODS 层表的代码即可,“ProduceKafkaDBDataToODS.scala”代码文件中加入代码如下:
//向Iceberg ods 层 ODS_PRODUCT_CATEGORY 表插入数据
tblEnv.executeSql(
"""
|insert into hadoop_iceberg.icebergdb.ODS_PRODUCT_CATEGORY
|select
| data['id'] as id ,
| data['p_id'] as p_id,
| data['name'] as name,
| data['pic_url'] as pic_url,
| data['gmt_create'] as gmt_create
| from kafka_db_bussiness_tbl where `table` = 'pc_product_category'
""".stripMargin)
//向Iceberg ods 层 ODS_PRODUCT_INFO 表插入数据
tblEnv.executeSql(
"""
|insert into hadoop_iceberg.icebergdb.ODS_PRODUCT_INFO
|select
| data['product_id'] as product_id ,
| data['category_id'] as category_id,
| data['product_name'] as product_name,
| data['gmt_create'] as gmt_create
| from kafka_db_bussiness_tbl where `table` = 'pc_product'
""".stripMargin)
处理用户日志的代码需要自己编写,代码中的业务逻辑主要是读取存储用户浏览日志数据topic “KAFKA-USER-LOG-DATA”中的数据,通过Flink代码处理将不同类型用户日志处理成json类型数据,将该json结果后续除了存储在Iceberg-ODS层对应的表之外还要将数据存储在Kafka topic “KAFKA-ODS-TOPIC” 中方便后续的业务处理。具体代码参照“ProduceKafkaLogDataToODS.scala”,主要代码逻辑如下:
object ProduceKafkaLogDataToODS {
private val kafkaBrokers: String = ConfigUtil.KAFKA_BROKERS
private val kafkaOdsTopic: String = ConfigUtil.KAFKA_ODS_TOPIC
private val kafkaDwdBrowseLogTopic: String = ConfigUtil.KAFKA_DWD_BROWSELOG_TOPIC
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
env.enableCheckpointing(5000)
import org.apache.flink.streaming.api.scala._
/**
* 1.需要预先创建 Catalog
* 创建Catalog,创建表需要在Hive中提前创建好,不在代码中创建,因为在Flink中创建iceberg表不支持create table if not exists ...语法
*/
tblEnv.executeSql(
"""
|create catalog hadoop_iceberg with (
| 'type'='iceberg',
| 'catalog-type'='hadoop',
| 'warehouse'='hdfs://mycluster/lakehousedata'
|)
""".stripMargin)
/**
* {
* "logtype": "browselog",
* "data": {
* "browseProductCode": "eSHd1sFat9",
* "browseProductTpCode": "242",
* "userIp": "251.100.236.37",
* "obtainPoints": 32,
* "userId": "uid208600",
* "frontProductUrl": "https://f/dcjp/nVnE",
* "logTime": 1646980514321,
* "browseProductUrl": "https://kI/DXSNBeP/"
* }
* }
*/
/**
* 2.创建 Kafka Connector,连接消费Kafka中数据
* 注意:1).关键字要使用 " 飘"符号引起来 2).对于json对象使用 map < String,String>来接收
*/
tblEnv.executeSql(
"""
|create table kafka_log_data_tbl(
| logtype string,
| data map<string,string>
|) with (
| 'connector' = 'kafka',
| 'topic' = 'KAFKA-USER-LOG-DATA',
| 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
| 'scan.startup.mode'='earliest-offset', --也可以指定 earliest-offset 、latest-offset
| 'properties.group.id' = 'my-group-id',
| 'format' = 'json'
|)
""".stripMargin)
/**
* 3.将不同的业务库数据存入各自的Iceberg表
*/
tblEnv.executeSql(
"""
|insert into hadoop_iceberg.icebergdb.ODS_BROWSELOG
|select
| data['logTime'] as log_time ,
| data['userId'] as user_id,
| data['userIp'] as user_ip,
| data['frontProductUrl'] as front_product_url,
| data['browseProductUrl'] as browse_product_url,
| data['browseProductTpCode'] as browse_product_tpcode,
| data['browseProductCode'] as browse_product_code,
| data['obtainPoints'] as obtain_points
| from kafka_log_data_tbl where `logtype` = 'browselog'
""".stripMargin)
//4.将用户所有日志数据组装成Json数据存入 kafka topic ODS-TOPIC 中
//读取 Kafka 中的数据,将维度数据另外存储到 Kafka 中
val kafkaLogTbl: Table = tblEnv.sqlQuery("select logtype,data from kafka_log_data_tbl")
//将 kafkaLogTbl Table 转换成 DataStream 数据
val userLogDS: DataStream[Row] = tblEnv.toAppendStream[Row](kafkaLogTbl)
//将 userLogDS 数据转换成JSON 数据写出到 kafka topic ODS-TOPIC
val odsSinkDS: DataStream[String] = userLogDS.map(row => {
//最后返回给Kafka 日志数据的json对象
val returnJsonObj = new JSONObject()
val logType: String = row.getField(0).toString
val data: String = row.getField(1).toString
val nObject = new JSONObject()
val arr: Array[String] = data.stripPrefix("{").stripSuffix("}").split(",")
for (elem <- arr) {
//有些数据 “data”中属性没有值,就没有“=”
if (elem.contains("=") && elem.split("=").length == 2) {
val split: Array[String] = elem.split("=")
nObject.put(split(0).trim, split(1).trim)
} else {
nObject.put(elem.stripSuffix("=").trim, "")
}
}
if ("browselog".equals(logType)) {
returnJsonObj.put("iceberg_ods_tbl_name", "ODS_BROWSELOG")
returnJsonObj.put("kafka_dwd_topic",kafkaDwdBrowseLogTopic)
returnJsonObj.put("data",nObject.toString)
} else {
//其他日志,这里目前没有
}
returnJsonObj.toJSONString
})
val props = new Properties()
props.setProperty("bootstrap.servers",kafkaBrokers)
odsSinkDS.addSink(new FlinkKafkaProducer[String](kafkaOdsTopic,new KafkaSerializationSchema[String] {
override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord[Array[Byte],Array[Byte]](kafkaOdsTopic,null,element.getBytes())
}
},props,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))
env.execute()
}
}
二、创建 Iceberg-ODS 层表
代码在执行之前需要在 Hive 中预先创建对应的 Iceberg 表,创建 Icebreg 表方式如下:
1、在 Hive 中添加 Iceberg 表格式需要的包
启动 HDFS 集群,node1 启动 Hive metastore 服务,在 Hive 客户端启动 Hive 添加 Iceberg 依赖包:
#node1节点启动Hive metastore服务
[root@node1 ~]# hive --service metastore &
#在hive客户端node3节点加载两个jar包
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
2、创建 Iceberg 表
这里创建 Iceberg 表有“ODS_PRODUCT_CATEGORY”、“ODS_PRODUCT_INFO”,创建语句如下:
CREATE TABLE ODS_PRODUCT_CATEGORY (
id string,
p_id string,
name string,
pic_url string,
gmt_create string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_PRODUCT_CATEGORY/'
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);
CREATE TABLE ODS_PRODUCT_INFO (
product_id string,
category_id string,
product_name string,
gmt_create string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_PRODUCT_INFO/'
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);
CREATE TABLE ODS_BROWSELOG (
log_time string,
user_id string,
user_ip string,
front_product_url string,
browse_product_url string,
browse_product_tpcode string,
browse_product_code string,
obtain_points string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_BROWSELOG/'
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);
以上语句在 Hive 客户端执行完成之后,在 HDFS 中可以看到对应的 Iceberg 数据目录:
三、代码测试
以上代码编写完成后,代码执行测试步骤如下:
1、在 Kafka 中创建对应的 topic
#在Kafka 中创建 KAFKA-USER-LOG-DATA topic
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-USER-LOG-DATA --partitions 3 --replication-factor 3
#在Kafka 中创建 KAFKA-ODS-TOPIC topic(第一个业务已创建可忽略)
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-ODS-TOPIC --partitions 3 --replication-factor 3
#在Kafka 中创建 KAFKA-DIM-TOPIC topic(第一个业务已创建可忽略)
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DIM-TOPIC --partitions 3 --replication-factor 3
#监控以上两个topic数据
[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-ODS-TOPIC
[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DIM-TOPIC
2、将代码中消费 Kafka 数据改成从头开始消费
代码中 Kafka Connector 中属性“scan.startup.mode”设置为“earliest-offset”,从头开始消费数据。
这里也可以不设置从头开始消费 Kafka 数据,而是直接启动实时向 MySQL 表中写入数据代码“RTMockDBData.java”代码,实时向 MySQL 对应的表中写入数据,这里需要启动 maxwell 监控数据,代码才能实时监控到写入 MySQL 的业务数据。
针对用户日志数据可以启动代码“RTMockUserLogData.java”,实时向日志采集接口写入数据。
3、启动日志采集接口,启动 Flume 监控
如果上个步骤中设置从“earliest-offset”消费 kafka 数据,可以暂时不启动日志采集接口和 Flume
#在node5节点上启动日志采集接口
[root@node5 ~]# cd /software/
[root@node5 software]# java -jar logcollector-0.0.1-SNAPSHOT.jar
#在node5节点上启动Flume
[root@node5 software]# flume-ng agent --name a -f /software/a.properties -Dflume.root.logger=INFO,console
4、执行代码,查看对应 topic 中的结果
以上代码执行后在,在对应的 Kafka “KAFKA-DIM-TOPIC”和“KAFKA-ODS-TOPIC”中都有对应的数据。在 Iceberg-ODS 层中对应的表中也有数据。
5、执行模拟生产用户日志代码,查看对应 topic 中的结果
执行模拟产生用户日志数据代码:RTMockUserLogData.java,观察对应的 Kafak “KAFKA-ODS-TOPIC”中有实时数据被采集。
版权声明: 本文为 InfoQ 作者【Lansonli】的原创文章。
原文链接:【http://xie.infoq.cn/article/c94f847e081aa3713bc364dc0】。文章转载请联系作者。
Lansonli
微信公众号:三帮大数据 2022-07-12 加入
CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师
评论