写点什么

Flink SQL Client 综合实战,深入理解 java 虚拟机百度云

  • 2021 年 11 月 09 日
  • 本文字数:2849 字

    阅读完需:约 9 分钟

本次实战主要是通过 Flink SQL Client 消费 kafka 的实时消息,再用各种 SQL 操作对数据进行查询统计,内容汇总如下:


  1. DDL 创建 Kafka 表

  2. 窗口统计;

  3. 数据写入 ElasticSearch

  4. 联表操作

版本信息

  1. Flink:1.10.0

  2. Flink 所在操作系统:CentOS Linux release 7.7.1908

  3. JDK:1.8.0_211

  4. Kafka:2.4.0(scala:2.12)

  5. Mysql:5.7.29

数据源准备

  1. 本次实战用的数据,来源是阿里云天池公开数据集的一份淘宝用户行为数据集,获取方式请参考《准备数据集用于flink学习》

  2. 获取到数据集文件后转成 kafka 消息发出,这样我们使用 Flink SQL 时就按照实时消费 kafka 消息的方式来操作,具体的操作方式请参考《将CSV的数据发送到kafka》

  3. 上述操作完成后,一百零四万条淘宝用户行为数据就会通过 kafka 消息顺序发出,咱们的实战就有不间断实时数据可用 了,消息内容如下:


{"user_id":1004080,"item_id":2258662,"category_id":79451,"behavior":"pv","ts":"2017-11-24T23:47:47Z"}


{"user_id":100814,"item_id":5071478,"category_id":1107469,"behavior":"pv","ts":"2017-11-24T23:47:47Z"}


{"user_id":114321,"item_id":4306269,"category_id":4756105,"behavior":"pv","ts":"2017-11-24T23:47:48Z"}


  1. 上述消息中每个字段的含义如下表:


| 列名称 | 说明 |


| --- | --- |


| 用户 ID | 整数类型,序列化后的用户 ID |


| 商品 ID | 整数类型,序列化后的商品 ID |


| 商品类目 ID | 整数类型,序列化后的商品所属类目 ID |


| 行为类型 | 字符串,枚举类型,包括(‘pv’, ‘buy’, ‘cart’, ‘fav’) |


| 时间戳 | 行为发生的时间戳 |


| 时间字符串 | 根据时间戳字段生成的时间字符串 |

jar 准备

实战过程中要用到下面这五个 jar 文件:


  1. flink-jdbc_2.11-1.10.0.jar

  2. flink-json-1.10.0.jar

  3. flink-sql-connector-elasticsearch6_2.11-1.10.0.jar

  4. flink-sql-connector-kafka_2.11-1.10.0.jar

  5. mysql-connector-java-5.1.48.jar


我已将这些文件打包上传到 GitHub,下载地址:https://raw.githubusercontent.com/zq2599/blog_download_files/master/files/sql_lib.zip


请在 flink 安装目录下新建文件夹 sql_lib,然后将这五个 jar 文件放进去;

Elasticsearch 准备

如果您装了 docker 和 docker-compose,那么下面的命令可以快速部署 elasticsearch 和 head 工具:


wget https://raw.githubusercontent.com/zq2599/blog_demos/master/elasticsearch_docker_compose/docker-compose.yml && \


docker-compose up -d


准备完毕,开始操作吧;

DDL 创建 Kafka 表

  1. 进入 flink 目录,启动 flink:bin/start-cluster.sh

  2. 启动 Flink SQL Client:bin/sql-client.sh embedded -l sql_lib

  3. 启动成功显示如下:



  1. 执行以下命令即可创建 kafka 表,请按照自己的信息调整参数:


CREATE TABLE user_behavior (


user_id BIGINT,


item_id BIGINT,


category_id BIGINT,


behavior STRING,


ts TIMESTAMP(3),


proctime as PROCTIME(), -- 处理时间列


WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在 ts 上定义 watermark,ts 成为事件时间列


) WITH (


'connector.type' = 'kafka', -- kafka connector


'connector.version' = 'universal', -- universal 支持 0.11 以上的版本


'connector.topic' = 'user_behavior', -- kafka topic


'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取


'connector.properties.zookeeper.connect' = '192.168.50.43:2181', -- zk 地址


'connector.properties.bootstrap.servers' = '192.168.50.43:9092', -- broker 地址


'format.type' = 'json' -- 数据源格式为 json


);


  1. 执行 SELECT * FROM user_behavior;看看原始数据,如果消息正常应该和下图类似:


![6.](https://img-blog.csdnimg.cn/2020051017112218.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6L


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


y9ibG9nLmNzZG4ubmV0L2JvbGluZ19jYXZhbHJ5,size_16,color_FFFFFF,t_70)

窗口统计

  1. 下面的 SQL 是以每十分钟为窗口,统计每个窗口内的总浏览数,TUMBLE_START 返回的数据格式是 timestamp,这里再调用 DATE_FORMAT 函数将其格式化成了字符串:


SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'),


DATE_FORMAT(TUMBLE_END(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'),


COUNT(*)


FROM user_behavior


WHERE behavior = 'pv'


GROUP BY TUMBLE(ts, INTERVAL '10' MINUTE);


  1. 得到数据如下所示:


数据写入 ElasticSearch

  1. 确保 elasticsearch 已部署好;

  2. 执行以下语句即可创建 es 表,请按照您自己的 es 信息调整下面的参数:


CREATE TABLE pv_per_minute (


start_time STRING,


end_time STRING,


pv_cnt BIGINT


) WITH (


'connector.type' = 'elasticsearch', -- 类型


'connector.version' = '6', -- elasticsearch 版本


'connector.hosts' = 'http://192.168.133.173:9200', -- elasticsearch 地址


'connector.index' = 'pv_per_minute', -- 索引名,相当于数据库表名


'connector.document-type' = 'user_behavior', -- type,相当于数据库库名


'connector.bulk-flush.max-actions' = '1', -- 每条数据都刷新


'format.type' = 'json', -- 输出数据格式 json


'update-mode' = 'append'


);


  1. 执行以下语句,就会将每分钟的 pv 总数写入 es 的 pv_per_minute 索引:


INSERT INTO pv_per_minute


SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS start_time,


DATE_FORMAT(TUMBLE_END(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS end_time,


COUNT(*) AS pv_cnt


FROM user_behavior


WHERE behavior = 'pv'


GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);


  1. 用 es-head 查看,发现数据已成功写入:


联表操作

  1. 当前 user_behavior 表的 category_id 表示商品类目,例如 11120 表示计算机书籍,61626 表示牛仔裤,本次实战的数据集中,这样的类目共有五千多种;

  2. 如果我们将这五千多种类目分成 6 个大类,例如 11120 属于教育类,61626 属于服装类,那么应该有个大类和类目的关系表;

  3. 这个大类和类目的关系表在 MySQL 创建,表名叫 category_info,建表语句如下:


CREATE TABLE category_info(


id int(11) unsigned NOT NULL AUTO_INCREMENT,


parent_id bigint ,


category_id bigint ,


PRIMARY KEY ( id )


) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;


  1. 表 category_info 所有数据来自对原始数据中 category_id 字段的提取,并且随机将它们划分为 6 个大类,该表的数据请在我的 GitHub 下载:https://raw.githubusercontent.com/zq2599/blog_demos/master/files/category_info.sql

  2. 请在 MySQL 上建表 category_info,并将上述数据全部写进去;

  3. 在 Flink SQL Client 执行以下语句创建这个维表,mysql 信息请按您自己配置调整:


CREATE TABLE category_info (


parent_id BIGINT, -- 商品大类


category_id BIGINT -- 商品详细类目


) WITH (


'connector.type' = 'jdbc',


'connector.url' = 'jdbc:mysql://192.168.50.43:3306/flinkdemo',

评论

发布
暂无评论
Flink SQL Client综合实战,深入理解java虚拟机百度云