写点什么

湖仓一体电商项目(十一):编写写入 DWS 层业务代码

作者:Lansonli
  • 2022-11-15
    广东
  • 本文字数:7102 字

    阅读完需:约 23 分钟

湖仓一体电商项目(十一):编写写入DWS层业务代码

编写写入 DWS 层业务代码

DWS 层主要是存放大宽表数据,此业务中主要是针对 Kafka topic “KAFKA-DWD-BROWSE-LOG-TOPIC”中用户浏览商品日志数据关联 HBase 中“ODS_PRODUCT_CATEGORY”商品分类表与“ODS_PRODUCT_INFO”商品表维度数据获取浏览商品主题大宽表。


Flink 在读取 Kafka 用户浏览商品数据与 HBase 中维度数据进行关联时采用了 Redis 做缓存,这样可以加快处理数据的速度。获取用户主题宽表之后,将数据写入到 Iceberg-DWS 层中,另外将宽表数据结果写入到 Kafka 中方便后期做实时统计分析。

一、代码编写

具体代码参照“ProduceBrowseLogToDWS.scala”,大体代码逻辑如下:


object ProduceBrowseLogToDWS {
private val hbaseDimProductCategoryTbl: String = ConfigUtil.HBASE_DIM_PRODUCT_CATEGORY private val hbaseDimProductInfoTbl: String = ConfigUtil.HBASE_DIM_PRODUCT_INFO private val kafkaDwsBrowseLogWideTopic: String = ConfigUtil.KAFKA_DWS_BROWSE_LOG_WIDE_TOPIC private val kafkaBrokers: String = ConfigUtil.KAFKA_BROKERS
def main(args: Array[String]): Unit = { //1.准备环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) env.enableCheckpointing(5000) import org.apache.flink.streaming.api.scala._
/** * 1.需要预先创建 Catalog * 创建Catalog,创建表需要在Hive中提前创建好,不在代码中创建,因为在Flink中创建iceberg表不支持create table if not exists ...语法 */ tblEnv.executeSql( """ |create catalog hadoop_iceberg with ( | 'type'='iceberg', | 'catalog-type'='hadoop', | 'warehouse'='hdfs://mycluster/lakehousedata' |) """.stripMargin)
/** * 2.创建 Kafka Connector,连接消费Kafka dwd中数据 * { * "browseProductCode": "BviQsxHtxC", * "browseProductTpCode": "282", * "userIp": "5.189.85.33", * "obtainPoints": "38", * "userId": "uid250775", * "frontProductUrl": "https:///swdOX/ruh", * "kafka_dwd_topic": "KAFKA-DWD-BROWSE-LOG-TOPIC", * "logTime": "1647067452241", * "browseProductUrl": "https:///57/zB4oF" * } */ tblEnv.executeSql( """ |create table kafka_dwd_browse_log_tbl ( | logTime string, | userId string, | userIp string, | frontProductUrl string, | browseProductUrl string, | browseProductTpCode string, | browseProductCode string, | obtainPoints string |) with ( | 'connector' = 'kafka', | 'topic' = 'KAFKA-DWD-BROWSE-LOG-TOPIC', | 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092', | 'scan.startup.mode'='earliest-offset', --也可以指定 earliest-offset 、latest-offset | 'properties.group.id' = 'my-group-id', | 'format' = 'json' |) """.stripMargin)
val browseLogTbl:Table = tblEnv.sqlQuery( """ | select logTime,userId,userIp,frontProductUrl,browseProductUrl,browseProductTpCode,browseProductCode,obtainPoints from kafka_dwd_browse_log_tbl """.stripMargin)

//3.将Row 类型数据转换成对象类型操作,方便与维度数据进行关联 val browseLogDS: DataStream[BrowseLog] = tblEnv.toAppendStream[Row](browseLogTbl).map(row=>{ val logTime: String = row.getField(0).toString//浏览日志时间 val userId: String = row.getField(1).toString//用户编号 val userIp: String = row.getField(2).toString//浏览IP地址 val frontProductUrl: String = row.getField(3).toString//跳转前URL地址,有为null,有的不为null val browseProductUrl: String = row.getField(4).toString//浏览商品URL val browseProductTpCode: String = row.getField(5).toString//浏览商品二级分类 val browseProductCode: String = row.getField(6).toString//浏览商品编号 val obtainPointsstring: String = row.getField(7).toString//浏览商品所获积分 BrowseLog(logTime,userId,userIp,frontProductUrl,browseProductUrl,browseProductTpCode,browseProductCode,obtainPointsstring) })
//4.设置Sink 到Kafka 数据输出到侧输出流标记 val kafkaDataTag = new OutputTag[JSONObject]("kafka_data")
//5.连接phoenix 库查询HBase数据组织Browse宽表 val browseLogWideInfoDS: DataStream[BrowseLogWideInfo] = browseLogDS.process(new ProcessFunction[BrowseLog,BrowseLogWideInfo] {
var conn: Connection = _ var pst: PreparedStatement = _ var rs: ResultSet = _
//创建Phoenix 连接 override def open(parameters: Configuration): Unit = { //连接Phoenix println(s"连接Phoenix ... ...") conn = DriverManager.getConnection(ConfigUtil.PHOENIX_URL) }
override def processElement(browseLog: BrowseLog, context: ProcessFunction[BrowseLog, BrowseLogWideInfo]#Context, collector: Collector[BrowseLogWideInfo]): Unit ={ //最终返回的json 对象 val jsonObj = new JSONObject() jsonObj.put("log_time", browseLog.logTime) jsonObj.put("user_id", browseLog.userId) jsonObj.put("user_ip", browseLog.userIp) jsonObj.put("front_product_url", browseLog.frontProductUrl) jsonObj.put("browse_product_url", browseLog.browseProductUrl) jsonObj.put("browse_product_tpcode", browseLog.browseProductTpCode) //商品类型id jsonObj.put("browse_product_code", browseLog.browseProductCode)//商品id jsonObj.put("obtain_points", browseLog.obtainPoints)

//根据浏览商品类型id : browse_product_tpcode 从Redis缓存中读取 DIM_PRODUCT_CATEGORY - 商品类别表 val productCategoryRedisCacheInfo: String = MyRedisUtil.getInfoFromRedisCache(hbaseDimProductCategoryTbl, browseLog.browseProductTpCode)
//根据浏览商品id : browse_product_code 从Redis缓存中读取 DIM_PRODUCT_INFO - 商品基本信息表 val productInfoRedisCacheInfo: String = MyRedisUtil.getInfoFromRedisCache(hbaseDimProductInfoTbl, browseLog.browseProductCode)
//商品种类数据如果 Redis 缓存中没有则读取phoenix获取,有则直接从缓存中获取 if (MyStringUtil.isEmpty(productCategoryRedisCacheInfo)) { //说明缓存中没有数据,从phoenix中查询 println("连接Phoenix查询 DIM_PRODUCT_CATEGORY - 商品类别表 维度数据") val sql = s""" |SELECT | b.id as first_category_id, | b.name AS first_category_name, | a.id as second_category_id, | a.name AS second_category_name |FROM DIM_PRODUCT_CATEGORY a JOIN DIM_PRODUCT_CATEGORY b ON a.p_id = b.id where a.id = '${browseLog.browseProductTpCode}' """.stripMargin
println("phoenix 执行SQL 如下: "+sql) pst = conn.prepareStatement(sql) rs = pst.executeQuery()
//准备 向Redis 中写入 DIM_PRODUCT_CATEGORY - 商品类别表 的json对象 val dimProductCategroyRedisJsonObj = new JSONObject() while (rs.next()) { dimProductCategroyRedisJsonObj.put("first_category_id", rs.getString("first_category_id")) dimProductCategroyRedisJsonObj.put("first_category_name", rs.getString("first_category_name")) dimProductCategroyRedisJsonObj.put("second_category_id", rs.getString("second_category_id")) dimProductCategroyRedisJsonObj.put("second_category_name", rs.getString("second_category_name"))
//将商品种类信息存入Redis缓存,向Redis中设置数据缓存 MyRedisUtil.setRedisDimCache(hbaseDimProductCategoryTbl, browseLog.browseProductTpCode, dimProductCategroyRedisJsonObj.toString)
//将json 加入到总返回结果的Json中 CommonUtil.AddAttributeToJson(jsonObj, dimProductCategroyRedisJsonObj) }
}else{ //Redis中查询到了数据,从redis 中获取 json 信息设置在最终结果中 println("DIM_PRODUCT_CATEGORY - 商品类别表 从Redis中获取到缓存处理") CommonUtil.AddAttributeToJson(jsonObj, JSON.parseObject(productCategoryRedisCacheInfo))
}
//商品信息数据如果 Redis 缓存中没有则读取phoenix获取,有则直接从缓存中获取 if (MyStringUtil.isEmpty(productInfoRedisCacheInfo)) { //说明缓存中没有数据,从phoenix中查询 println("连接Phoenix查询 DIM_PRODUCT_INFO - 商品基本信息表 维度数据") val sql = s""" |SELECT | product_id, | product_name |FROM DIM_PRODUCT_INFO where product_id = '${browseLog.browseProductCode}' """.stripMargin
println("phoenix 执行SQL 如下: "+sql) pst = conn.prepareStatement(sql) rs = pst.executeQuery()
//准备 向Redis 中写入 DIM_PRODUCT_INFO - 商品基本信息表 的json对象 val dimProductInfoRedisJsonObj = new JSONObject() while (rs.next()) { dimProductInfoRedisJsonObj.put("product_id", rs.getString("product_id")) dimProductInfoRedisJsonObj.put("product_name", rs.getString("product_name"))
//将商品种类信息存入Redis缓存,向Redis中设置数据缓存 MyRedisUtil.setRedisDimCache(hbaseDimProductInfoTbl, browseLog.browseProductCode, dimProductInfoRedisJsonObj.toString)
//将json 加入到总返回结果的Json中 CommonUtil.AddAttributeToJson(jsonObj, dimProductInfoRedisJsonObj) }
}else{ //Redis中查询到了数据,从redis 中获取 json 信息设置在最终结果中 println("DIM_PRODUCT_INFO - 商品基本信息表 从Redis中获取到缓存处理") CommonUtil.AddAttributeToJson(jsonObj, JSON.parseObject(productInfoRedisCacheInfo)) }
//准备向Kafka 中存储的数据json 对象 context.output(kafkaDataTag,jsonObj)
//最终返回 jsonObj,此时jsonObj包含了所有json 信息 /** * { * "first_category_id": "30", * "user_ip": "195.134.35.113", * "obtain_points": "0", * "product_name": "扭扭车", * "log_time": "2022-03-17 16:22:09", * "browse_product_tpcode": "30000", * "front_product_url": "https://0BZ/7N/qVIap", * "first_category_name": "玩具乐器", * "user_id": "uid786601", * "browse_product_code": "xA4cfipkdl", * "product_id": "xA4cfipkdl", * "second_category_id": "30000", * "browse_product_url": "https://DU6S2wiT/n/l3E", * "second_category_name": "童车童床" * } */ collector.collect(BrowseLogWideInfo(jsonObj.getString("log_time").split(" ")(0),jsonObj.getString("user_id"),jsonObj.getString("user_ip"), jsonObj.getString("product_name"),jsonObj.getString("front_product_url"),jsonObj.getString("browse_product_url"),jsonObj.getString("first_category_name"), jsonObj.getString("second_category_name"),jsonObj.getString("obtain_points")))
}
override def close(): Unit = { rs.close() pst.close() conn.close() } })
/** * 6.将清洗完的数据存入Iceberg 表中 * 将宽表转换成表存储在 iceberg - DWS 层 DWS_BROWSE_INFO , */ val table: Table = tblEnv.fromDataStream(browseLogWideInfoDS) tblEnv.executeSql( s""" |insert into hadoop_iceberg.icebergdb.DWS_BROWSE_INFO |select | log_time, | user_id, | user_ip, | product_name, | front_product_url, | browse_product_url, | first_category_name, | second_category_name, | obtain_points | from ${table} """.stripMargin)

//7.同时将结果存储在Kafka KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC topic中 /** * 将以上数据写入到Kafka 各自DWD 层topic中,这里不再使用SQL方式,而是直接使用DataStream代码方式 Sink 到各自的DWD层代码中 */ val props = new Properties() props.setProperty("bootstrap.servers",kafkaBrokers)
browseLogWideInfoDS.getSideOutput(kafkaDataTag).addSink(new FlinkKafkaProducer[JSONObject](kafkaDwsBrowseLogWideTopic,new KafkaSerializationSchema[JSONObject] { override def serialize(jsonObj: JSONObject, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { new ProducerRecord[Array[Byte], Array[Byte]](kafkaDwsBrowseLogWideTopic,null,jsonObj.toString.getBytes()) } },props,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))
env.execute() }}
复制代码

二、创建 Iceberg-DWS 层表

代码在执行之前需要在 Hive 中预先创建对应的 Iceberg 表,创建 Icebreg 表方式如下:

1、在 Hive 中添加 Iceberg 表格式需要的包

启动 HDFS 集群,node1 启动 Hive metastore 服务,在 Hive 客户端启动 Hive 添加 Iceberg 依赖包:


#node1节点启动Hive metastore服务[root@node1 ~]# hive --service metastore &
#在hive客户端node3节点加载两个jar包add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
复制代码

2、创建 Iceberg 表

这里创建 Iceberg-DWS 表有“DWS_BROWSE_INFO”,创建语句如下:


CREATE TABLE DWS_BROWSE_INFO (log_time string,user_id string,user_ip string,product_name string,front_product_url string,browse_product_url string,first_category_name string,second_category_name string,obtain_points string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/DWS_BROWSE_INFO/' TBLPROPERTIES ('iceberg.catalog'='location_based_table','write.metadata.delete-after-commit.enabled'= 'true','write.metadata.previous-versions-max' = '3');
复制代码

三、代码测试

以上代码编写完成后,代码执行测试步骤如下:

1、在 Kafka 中创建对应的 topic

#在Kafka 中创建 KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC topic./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC --partitions 3 --replication-factor 3
#监控以上topic数据[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC
复制代码

2、将代码中消费 Kafka 数据改成从头开始消费

代码中 Kafka Connector 中属性“scan.startup.mode”设置为“earliest-offset”,从头开始消费数据。


这里也可以不设置从头开始消费 Kafka 数据,而是直接启动向日志采集接口模拟生产日志代码“RTMockUserLogData.java”,需要启动日志采集接口及 Flume。

3、执行代码,查看对应结果

以上代码执行后在,在对应的 Kafka “KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC” topic 中都有对应的数据。在 Iceberg-DWS 层中对应的表中也有数据。


Kafka 中结果如下:



Iceberg-DWS 层表”DWS_BROWSE_INFO”中的数据如下:


四、架构图


发布于: 刚刚阅读数: 3
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
湖仓一体电商项目(十一):编写写入DWS层业务代码_湖仓一体电商项目_Lansonli_InfoQ写作社区