写点什么

湖仓一体电商项目(四):项目数据种类与采集

作者:Lansonli
  • 2022-11-09
    广东
  • 本文字数:3705 字

    阅读完需:约 12 分钟

湖仓一体电商项目(四):项目数据种类与采集

项目数据种类与采集

实时数仓项目中的数据分为两类,一类是业务系统产生的业务数据,这部分数据存储在 MySQL 数据库中,另一类是实时用户日志行为数据,这部分数据是用户登录系统产生的日志数据。


针对 MySQL 日志数据我们采用 maxwell 全量或者增量实时采集到大数据平台中,针对用户日志数据,通过 log4j 日志将数据采集到目录中,再通过 Flume 实时同步到大数据平台,总体数据采集思路如下图所示:



;


针对 MySQL 业务数据和用户日志数据构建离线+实时湖仓一体数据分析平台,我们暂时划分为会员主题和商品主题。下面了解下主题各类表情况。

一、MySQL 业务数据

1、配置 MySQL 支持 UTF8 编码

在 node2 节点上配“/etc/my.cnf”文件,在对应的标签下加入如下配置,更改 mysql 数据库编码格式为 utf-8:


[mysqld]character-set-server=utf8
[client]default-character-set = utf8
复制代码


修改完成之后重启 mysql 即可。

2、MySQL 数据表

MySQL 业务数据存储在库“lakehousedb”中,此数据库中的业务数据表如下:


2.1、会员基本信息表 : mc_member_info



2.2、 会员收货地址表 : mc_member_address



2.3、用户登录数据表 : mc_user_login



2.4、商品分类表 : pc_product_category



2.5、商品基本信息表 : pc_product


3、MySQL 业务数据采集

我们通过 maxwell 数据同步工具监控 MySQL binlog 日志将 MySQL 日志数据同步到 Kafka topic “KAFKA-DB-BUSSINESS-DATA”中,详细步骤如下:


3.1、配置 maxwell config.properties 文件


进入 node3“/software/maxwell-1.28.2”目录,配置 config.properties 文件,主要是配置监控 mysql 日志数据对应的 Kafka topic,配置详细内容如下:


producer=kafka kafka.bootstrap.servers=node1:9092,node2:9092,node3:9092 kafka_topic=KAFKA-DB-BUSSINESS-DATA #设置根据表将 binlog 写入 Kafka 不同分区,还可指定:database, table, primary_key, transaction_id, thread_id, column producer_partition_by=table #mysql 节点 host=node2 #连接 mysql 用户名和密码 user=maxwell password=maxwell #指定 maxwell 当前连接 mysql 的实例 id,这里用于全量同步表数据使用 client_id=maxwell_first


3.2、启动 kafka,创建 Kafka topic,并监控 Kafka topic


启动 Zookeeper 集群、Kafka 集群,创建 topic“KAFKA-DB-BUSSINESS-DATA” topic:


#进入Kafka路径,创建对应topic[root@node1 ~]# cd /software/kafka_2.11-0.11.0.3/bin/[root@node1 bin]# ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DB-BUSSINESS-DATA --partitions 3 --replication-factor 3
#监控Kafak topic 中的数据[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DB-BUSSINESS-DATA
复制代码


3.3、启动 maxwell


#在node3节点上启动maxwell[root@node3 ~]# cd /software/maxwell-1.28.2/bin/[root@node3 bin]#  maxwell --config ../config.properties
复制代码


3.4、在 mysql 中创建“lakehousedb”并导入数据


#进入mysql ,创建数据库lakehousedb[root@node2 ~]# mysql -u root -p123456mysql> create database lakehousedb;
复制代码


打开“Navicat”工具,将资料中的“lakehousedb.sql”文件导入到 MySQL 数据库“lakehousedb”中,我们可以看到在对应的 kafka topic “KAFKA-DB-BUSSINESS-DATA”中会有数据被采集过来。


二、用户日志数据

1、用户日志数据

目前用户日志数据只有“会员浏览商品日志数据”,其详细信息如下:


  • 接口地址:/collector/common/browselog

  • 请求方式:post

  • 请求数据类型:application/json

  • 接口描述:用户登录系统后,会有当前登录时间信息及当前用户登录后浏览商品,跳转链接、浏览所获积分等信息

  • 请求示例:


{    "logTime": 1646393162044,    "userId": "uid53439497",    "userIp": "216.36.11.233",    "frontProductUrl": "https://fo0z7oZj/rInrtrb/ui",    "browseProductUrl": "https://2/5Rwwx/SqqwwwOUsK4",    "browseProductTpCode": "202",    "browseProductCode": "q6HCcpwfdgfgfxd2I",    "obtainPoints": 16,}
复制代码


  • 请求参数解释如下:



2、用户日志数据采集

日志数据采集是通过 log4j 日志配置来将用户的日志数据集中获取,这里我们编写日志采集接口项目“LogCollector”来采集用户日志数据。


当用户浏览网站触发对应的接口时,日志采集接口根据配合的 log4j 将用户浏览信息写入对应的目录中,然后通过 Flume 监控对应的日志目录,将用户日志数据采集到 Kafka topic “KAFKA-USER-LOG-DATA”中。


这里我们自己模拟用户浏览日志数据,将用户浏览日志数据采集到 Kafka 中,详细步骤如下:


2.1、将日志采集接口项目打包,上传到 node5 节点


将日志采集接口项目“LogCollector”项目配置成生产环境 prod,打包,上传到 node5 节点目录/software 下。


2.2、编写 Flume 配置文件 a.properties


将 a.properties 存放在 node5 节点/software 目录下,文件配置内容如下:


#设置source名称a.sources = r1#设置channel的名称a.channels = c1#设置sink的名称a.sinks = k1
# For each one of the sources, the type is defined#设置source类型为TAILDIR,监控目录下的文件#Taildir Source可实时监控目录一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题a.sources.r1.type = TAILDIR#文件的组,可以定义多种a.sources.r1.filegroups = f1#第一组监控的是对应文件夹中的什么文件:.log文件a.sources.r1.filegroups.f1 = /software/lakehouselogs/userbrowse/.*log
# The channel can be defined as follows.#设置source的channel名称a.sources.r1.channels = c1a.sources.r1.max-line-length = 1000000#a.sources.r1.eventSize = 512000000
# Each channel's type is defined.#设置channel的类型a.channels.c1.type = memory# Other config values specific to each type of channel(sink or source)# can be defined as well# In this case, it specifies the capacity of the memory channel#设置channel道中最大可以存储的event数量a.channels.c1.capacity = 1000#每次最大从source获取或者发送到sink中的数据量a.channels.c1.transcationCapacity=100
# Each sink's type must be defined#设置Kafka接收器a.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink#设置Kafka的broker地址和端口号a.sinks.k1.brokerList=node1:9092,node2:9092,node3:9092#设置Kafka的Topica.sinks.k1.topic=KAFKA-USER-LOG-DATA#设置序列化方式a.sinks.k1.serializer.class=kafka.serializer.StringEncoder #Specify the channel the sink should use#设置sink的channel名称a.sinks.k1.channel = c1
复制代码


2.3、在 Kafka 中创建对应的 topic 并监控


#进入Kafka路径,创建对应topic[root@node1 ~]# cd /software/kafka_2.11-0.11.0.3/bin/[root@node1 bin]# ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-USER-LOG-DATA --partitions 3 --replication-factor 3
#监控Kafak topic 中的数据[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-USER-LOG-DATA
复制代码


2.4、启动日志采集接口


在 node5 节点上启动日志采集接口,启动命令如下:


[root@node5 ~]# cd /software/[root@node5 software]# java -jar ./logcollector-0.0.1-SNAPSHOT.jar 
复制代码


启动之后,根据日志采集接口配置会在“/software/lakehouselogs/userbrowse”目录中汇集用户浏览商品日志数据。


2.5、 启动 Flume,监控用户日志数据到 Kafka


在 node5 节点上启动 Flume,监控用户浏览日志数据到 Kafka “KAFKA-USER-LOG-DATA” topic。


[root@node5 ~]# cd /software/[root@node5 software]# flume-ng agent --name a -f /software/a.properties -Dflume.root.logger=INFO,console
复制代码


2.6、启动模拟用户浏览日志代码,向日志采集接口生产数据


在 window 本地启动“LakeHouseMockData”项目下的“RTMockUserLogData”代码,向日志采集接口中生产用户浏览商品日志数据。


启动代码后,我们会在 Kafka “KAFKA-USER-LOG-DATA” topic 中看到监控到的用户日志数据。


三、错误解决

如果在向 mysql 中创建库及表时有如下错误:


Err 1055 - Expression #1 of ORDER BY clause is not in GROUP BY clause and contains nonaggregated column 'information_schema.PROFILING.SEQ' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by


以上错误是由于 MySQL sql_mode 引起,对于 group by 聚合操作,如果在 select 中的列没有在 group by 中出现,那么这个 SQL 是不合法的。按照以下步骤来处理。

1、首先停止 mysql,然后在 mysql 节点配置 my.ini 文件

[root@node2 ~]# service mysqld stop
复制代码


打开/etc/my.cnf 文件,在 mysqld 标签下配置如下内容:


mysqld sql_mode=STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION

2、重启 mysql 即可解决

[root@node2 ~]# service mysqld start
复制代码


发布于: 2022-11-09阅读数: 22
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
湖仓一体电商项目(四):项目数据种类与采集_湖仓一体_Lansonli_InfoQ写作社区