写点什么

湖仓一体电商项目(八):业务实现之编写写入 ODS 层业务代码

作者:Lansonli
  • 2022-11-12
    广东
  • 本文字数:6030 字

    阅读完需:约 20 分钟

湖仓一体电商项目(八):业务实现之编写写入ODS层业务代码

业务实现之编写写入 ODS 层业务代码

一、代码逻辑和架构图

ODS 层在湖仓一体架构中主要是存储原始数据,这里主要是读取 Kafka “KAFKA-DB-BUSSINESS-DATA”topic 中的数据实现如下两个方面功能:


  • 将 MySQL 业务数据原封不动的存储在 Iceberg-ODS 层中方便项目临时业务需求使用。

  • 将事实数据和维度数据进行分离,分别存储 Kafka 对应的 topic 中


以上两个方面中第一个方面需要再 Hive 中预先创建对应的 Iceberg 表,才能写入,第二个方面不好分辨 topic“KAFKA-DB-BUSSINESS-DATA”中哪些 binlog 数据是事实数据哪些 binlog 是维度数据,所以这里我们在 mysql 配置表“lakehousedb.dim_tbl_config_info”中写入表信息,这样通过 Flink 获取此表维度表信息进行广播与 Kafka 实时流进行关联将事实数据和维度数据进行区分。



;


二、代码编写

数据写入 ODS 层代码是“ProduceKafkaDBDataToODS.scala”,主要代码逻辑实现如下:


object ProduceKafkaDBDataToODS {  private val mysqlUrl: String = ConfigUtil.MYSQL_URL  private val mysqlUser: String = ConfigUtil.MYSQL_USER  private val mysqlPassWord: String = ConfigUtil.MYSQL_PASSWORD  private val kafkaBrokers: String = ConfigUtil.KAFKA_BROKERS  private val kafkaDimTopic: String = ConfigUtil.KAFKA_DIM_TOPIC  private val kafkaOdsTopic: String = ConfigUtil.KAFKA_ODS_TOPIC  private val kafkaDwdUserLogTopic: String = ConfigUtil.KAFKA_DWD_USERLOG_TOPIC
def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
import org.apache.flink.streaming.api.scala._
env.enableCheckpointing(5000)
/** * 1.需要预先创建 Catalog * 创建Catalog,创建表需要在Hive中提前创建好,不在代码中创建,因为在Flink中创建iceberg表不支持create table if not exists ...语法 */ tblEnv.executeSql( """ |create catalog hadoop_iceberg with ( | 'type'='iceberg', | 'catalog-type'='hadoop', | 'warehouse'='hdfs://mycluster/lakehousedata' |) """.stripMargin)
/** * 2.创建 Kafka Connector,连接消费Kafka中数据 * 注意:1).关键字要使用 " 飘"符号引起来 2).对于json对象使用 map < String,String>来接收 */ tblEnv.executeSql( """ |create table kafka_db_bussiness_tbl( | database string, | `table` string, | type string, | ts string, | xid string, | `commit` string, | data map<string,string> |) with ( | 'connector' = 'kafka', | 'topic' = 'KAFKA-DB-BUSSINESS-DATA', | 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092', | 'scan.startup.mode'='latest-offset', --也可以指定 earliest-offset 、latest-offset | 'properties.group.id' = 'my-group-id', | 'format' = 'json' |) """.stripMargin)
/** * 3.将不同的业务库数据存入各自的Iceberg表 */ tblEnv.executeSql( """ |insert into hadoop_iceberg.icebergdb.ODS_MEMBER_INFO |select | data['id'] as id , | data['user_id'] as user_id, | data['member_growth_score'] as member_growth_score, | data['member_level'] as member_level, | data['balance'] as balance, | data['gmt_create'] as gmt_create, | data['gmt_modified'] as gmt_modified | from kafka_db_bussiness_tbl where `table` = 'mc_member_info' """.stripMargin)

tblEnv.executeSql( """ |insert into hadoop_iceberg.icebergdb.ODS_MEMBER_ADDRESS |select | data['id'] as id , | data['user_id'] as user_id, | data['province'] as province, | data['city'] as city, | data['area'] as area, | data['address'] as address, | data['log'] as log, | data['lat'] as lat, | data['phone_number'] as phone_number, | data['consignee_name'] as consignee_name, | data['gmt_create'] as gmt_create, | data['gmt_modified'] as gmt_modified | from kafka_db_bussiness_tbl where `table` = 'mc_member_address' """.stripMargin)
tblEnv.executeSql( """ |insert into hadoop_iceberg.icebergdb.ODS_USER_LOGIN |select | data['id'] as id , | data['user_id'] as user_id, | data['ip'] as ip, | data['login_tm'] as login_tm, | data['logout_tm'] as logout_tm | from kafka_db_bussiness_tbl where `table` = 'mc_user_login' """.stripMargin)
//4.读取 Kafka 中的数据,将维度数据另外存储到 Kafka 中 val kafkaTbl: Table = tblEnv.sqlQuery("select database,`table`,type,ts,xid,`commit`,data from kafka_db_bussiness_tbl")
//5.将kafkaTbl Table 转换成DStream 与MySql中的数据 val kafkaDS: DataStream[Row] = tblEnv.toAppendStream[Row](kafkaTbl)
//6.设置mapState,用于广播流 val mapStateDescriptor = new MapStateDescriptor[String,JSONObject]("mapStateDescriptor",classOf[String],classOf[JSONObject])
//7.从MySQL中获取配置信息,并广播 val bcConfigDs: BroadcastStream[JSONObject] = env.addSource(MySQLUtil.getMySQLData(mysqlUrl,mysqlUser,mysqlPassWord)).broadcast(mapStateDescriptor)
//8.设置维度数据侧输出流标记 val dimDataTag = new OutputTag[String]("dim_data")
//9.只监控mysql 数据库lakehousedb 中的数据,其他库binlog不监控,连接两个流进行处理 val factMainDs: DataStream[String] = kafkaDS.filter(row=>{"lakehousedb".equals(row.getField(0).toString)}).connect(bcConfigDs).process(new BroadcastProcessFunction[Row, JSONObject, String] { override def processElement(row: Row, ctx: BroadcastProcessFunction[Row, JSONObject, String]#ReadOnlyContext, out: Collector[String]): Unit = { //最后返回给Kafka 事实数据的json对象 val returnJsonObj = new JSONObject() //获取广播状态 val robcs: ReadOnlyBroadcastState[String, JSONObject] = ctx.getBroadcastState(mapStateDescriptor) //解析事件流数据 val nObject: JSONObject = CommonUtil.rowToJsonObj(row) //获取当前时间流来自的库和表 ,样例数据如下 //lackhousedb,pc_product,insert,1646659263,21603,null,{gmt_create=1645493074001, category_id=220, product_name=黄金, product_id=npfSpLHB8U} val dbName: String = nObject.getString("database") val tableName: String = nObject.getString("table") val key = dbName + ":" + tableName if (robcs.contains(key)) { //维度数据 val jsonValue: JSONObject = robcs.get(key) //维度数据,将对应的 jsonValue中的信息设置到流事件中 nObject.put("tbl_name", jsonValue.getString("tbl_name")) nObject.put("tbl_db", jsonValue.getString("tbl_db")) nObject.put("pk_col", jsonValue.getString("pk_col")) nObject.put("cols", jsonValue.getString("cols")) nObject.put("phoenix_tbl_name", jsonValue.getString("phoenix_tbl_name")) ctx.output(dimDataTag, nObject.toString) }else{ //事实数据,加入iceberg 表名写入Kafka ODS-DB-TOPIC topic中 if("mc_user_login".equals(tableName)){ returnJsonObj.put("iceberg_ods_tbl_name","ODS_USER_LOGIN") returnJsonObj.put("kafka_dwd_topic",kafkaDwdUserLogTopic) returnJsonObj.put("data",nObject.toString) } out.collect(returnJsonObj.toJSONString) } }
override def processBroadcastElement(jsonObject: JSONObject, ctx: BroadcastProcessFunction[Row, JSONObject, String]#Context, out: Collector[String]): Unit = { val tblDB: String = jsonObject.getString("tbl_db") val tblName: String = jsonObject.getString("tbl_name") //向状态中更新数据 val bcs: BroadcastState[String, JSONObject] = ctx.getBroadcastState(mapStateDescriptor) bcs.put(tblDB + ":" + tblName, jsonObject) println("广播数据流设置完成...") } })

//10.结果写入到Kafka - dim_data_topic topic中 val props = new Properties() props.setProperty("bootstrap.servers",kafkaBrokers) factMainDs.addSink(new FlinkKafkaProducer[String](kafkaOdsTopic,new KafkaSerializationSchema[String] { override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { new ProducerRecord[Array[Byte],Array[Byte]](kafkaOdsTopic,null,element.getBytes()) } },props,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))//暂时使用at_least_once语义,exactly_once语义有些bug问题
factMainDs.getSideOutput(dimDataTag).addSink(new FlinkKafkaProducer[String](kafkaDimTopic,new KafkaSerializationSchema[String] { override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { new ProducerRecord[Array[Byte],Array[Byte]](kafkaDimTopic,null,element.getBytes()) } },props,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))//暂时使用at_least_once语义,exactly_once语义有些bug问题
env.execute()
}
}
复制代码

三、创建 Iceberg-ODS 层表

代码在执行之前需要在 Hive 中预先创建对应的 Iceberg 表,创建 Icebreg 表方式如下:

1、在 Hive 中添加 Iceberg 表格式需要的包

启动 HDFS 集群,node1 启动 Hive metastore 服务,在 Hive 客户端启动 Hive 添加 Iceberg 依赖包:


#node1节点启动Hive metastore服务[root@node1 ~]# hive --service metastore &
#在hive客户端node3节点加载两个jar包add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
复制代码

2、创建 Iceberg 表

这里创建 Iceberg 表有“ODS_MEMBER_INFO”、“ODS_MEMBER_ADDRESS”、“ODS_USER_LOGIN”,创建语句如下:


#在Hive客户端执行以下建表语句CREATE TABLE ODS_MEMBER_INFO  (id string,user_id string,member_growth_score string,member_level string,balance string,gmt_create string,gmt_modified string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_MEMBER_INFO/' TBLPROPERTIES ('iceberg.catalog'='location_based_table','write.metadata.delete-after-commit.enabled'= 'true','write.metadata.previous-versions-max' = '3');

CREATE TABLE ODS_MEMBER_ADDRESS (id string,user_id string,province string,city string,area string,address string,log string,lat string,phone_number string,consignee_name string,gmt_create string,gmt_modified string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_MEMBER_ADDRESS/' TBLPROPERTIES ('iceberg.catalog'='location_based_table','write.metadata.delete-after-commit.enabled'= 'true','write.metadata.previous-versions-max' = '3');
CREATE TABLE ODS_USER_LOGIN (id string,user_id string,ip string,login_tm string,logout_tm string)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_USER_LOGIN/' TBLPROPERTIES ('iceberg.catalog'='location_based_table','write.metadata.delete-after-commit.enabled'= 'true','write.metadata.previous-versions-max' = '3');
复制代码


以上语句在 Hive 客户端执行完成之后,在 HDFS 中可以看到对应的 Iceberg 数据目录:


四、代码测试

以上代码编写完成后,代码执行测试步骤如下:

1、在 Kafka 中创建对应的 topic

#在Kafka 中创建 KAFKA-ODS-TOPIC topic./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-ODS-TOPIC --partitions 3 --replication-factor 3
#在Kafka 中创建 KAFKA-DIM-TOPIC topic./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DIM-TOPIC --partitions 3 --replication-factor 3
#监控以上两个topic数据[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-ODS-TOPIC
[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DIM-TOPIC
复制代码

2、将代码中消费 Kafka 数据改成从头开始消费

代码中 Kafka Connector 中属性“scan.startup.mode”设置为“earliest-offset”,从头开始消费数据。


这里也可以不设置从头开始消费 Kafka 数据,而是直接启动实时向 MySQL 表中写入数据代码“RTMockDBData.java”代码,实时向 MySQL 对应的表中写入数据,这里需要启动 maxwell 监控数据,代码才能实时监控到写入 MySQL 的业务数据。

3、执行代码,查看对应 topic 中的结果

以上代码执行后在,在对应的 Kafka “KAFKA-DIM-TOPIC”和“KAFKA-ODS-TOPIC”中都有对应的数据。在 Iceberg-ODS 层中对应的表中也有数据。


发布于: 刚刚阅读数: 4
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
湖仓一体电商项目(八):业务实现之编写写入ODS层业务代码_湖仓一体电商项目_Lansonli_InfoQ写作社区