Flink SQL Client 综合实战
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(), -- 处理时间列
WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在 ts 上定义 watermark,ts 成为事件时间列
) WITH (
'connector.type' = 'kafka', -- kafka connector
'connector.version' = 'universal', -- universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka topic
'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = '192.168.50.43:2181', -- zk 地址
'connector.properties.bootstrap.servers' = '192.168.50.43:9092', -- broker 地址
'format.type' = 'json' -- 数据源格式为 json
);
执行 SELECT * FROM user_behavior;看看原始数据,如果消息正常应该和下图类似:
[](()窗口统计
下面的 SQL 是以每十分钟为窗口,统计每个窗口内的总浏览数,TUMBLE_START 返回的数据格式是 timestamp,这里再调用 DATE_FORMAT 函数将其格式化成了字符串:
SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'),
DATE_FORMAT(TUMBLE_END(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'),
COUNT(*)
FROM user_behavior
WHERE behavior = 'pv'
GROUP BY TUMBLE(ts, INTERVAL '10' MINUTE);
得到数据如下所示:
[](()数据写入 ElasticSearch
确保 elasticsearch 已部署好;
执行以下语句即可创建 es 表,请按照您自己的 es 信息调整下面的参数:
CREATE TABLE pv_per_minute (
start_time STRING,
end_time STRING,
pv_cnt BIGINT
) WITH (
'connector.type' = 'elasticsearch', -- 类型
'connector.version' = '6', -- elasticsearch 版本
'connector.hosts' = 'http://192.168.133.173:9200', -- elasticsearch 地址
'connector.index' = 'pv_per_minute', -- 索引名,相当于数据库表名
'connector.document-type' = 'user_behavior', -- type,相当于数据库库名
'connector.bulk-flush.max-actions' = '1', -- 每条数据都刷新
'format.type' = 'json', -- 输出数据格式 json
'update-mode' = 'append'
);
执行以下语句,就会将每分钟的 pv 总数写入 es 的 pv_per_minute 索引:
INSERT INTO pv_per_minute
SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS start_time,
DATE_FORMAT(TUMBLE_END(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS end_time,
COUNT(*) AS pv_cnt
FROM user_behavior
WHERE behavior = 'pv'
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);
用 es-head 查看,发现数据已成功写入:
![在这里插入图片描述](https://img-blog.csdnimg.cn/20200510171435853.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 0L2JvbGluZ19jYXZhbHJ5,size_16,color_FFFFFF,t_70)
[](()联表操作
当前 user_behavior 表的 category_id 表示商品类目,例如 11120 表示计算机书籍,61626 表示牛仔裤,本次实战的数据集中,这样的类目共有五千多种;
如果我们将这五千多种类目分成 6 个大类,例如 11120 属于教育类,61626 属于服装类,那么应该有个大类和类目的关系表;
这个大类和类目的关系表在 MySQL 创建,表名叫 category_info,建表语句如下:
CREATE TABLE category_info
(
id
int(11) unsigned NOT NULL AUTO_INCREMENT,
parent_id
bigint ,
category_id
bigint ,
PRIMARY KEY ( id
)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
表 category_info 所有数据来自对原始数据中 category_id 字段的提取,并且随机将它们划分为 6 个大类,该表的数据请在我的 GitHub 下载:https://raw.githubusercontent.com/zq2599/blog_demos/master/files/category_info.sql
请在 MySQL 上建表 category_info,并将上述数据全部写进去;
在 Flink SQL Client 执行以下语句创建这个维表,mysql 信息请按您自己配置调整:
CREATE TABLE category_info (
parent_id BIGINT, -- 商品大类
category_id BIGINT -- 商品详细类目
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://192.168.50.43:3306/flinkdemo',
'connector.table' = 'category_info',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '10min'
);
尝试联表查询:
SELECT U.user_id, U.item_id, U.behavior, C.parent_id, C.category_id
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.category_id;
如下图,联表查询成功,每条记录都能对应大类:
再试试联表统计,每个大类的总浏览量:
SELECT C.parent_id, COUNT(*) AS pv_count
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.category_id
WHERE behavior = 'pv'
GROUP BY C.parent_id;
如下图,数据是动态更新的:
执行以下语句,可以在统计时将大类 ID 转成中文名:
SELECT CASE C.parent_id
WHEN 1 THEN '服饰鞋包'
WHEN 2 THEN '家装家饰'
WHEN 3 THEN '家电'
WHEN 4 THEN '美妆'
WHEN 5 THEN '母婴'
WHEN 6 THEN '3C 数码'
ELSE '其他'
END AS category_name,
COUNT(*) AS pv_count
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.category_id
WHERE behavior = 'pv'
GROUP BY C.parent_id;
评论