写点什么

Flink 实践教程 - 进阶(11):SQL 关联:Regular Join

  • 2022 年 4 月 01 日
  • 本文字数:2588 字

    阅读完需:约 8 分钟

Flink 实践教程-进阶(11):SQL 关联:Regular Join

作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介  

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

Flink SQL 提供了 Regular Joins、Interval Joins、Temporal Joins、Lookup Join、Array 展平和 Table Function 六种方式实现数据关联。

本文将为您介绍如何使用 Regualr Joins 实现数据关联。Regualr Joins 在使用时有一定的限制条件,比如只能在 Equi-Join 条件下使用。下面将以 Kafka 作为源表的左右表为例,将商品订单 order-source 中商品 ID 与 product-info 中商品 ID 进行左关联得到商品名称,最终将结果数据到 Logger Sink 中去。

教程链接:https://cloud.tencent.com/developer/video/30595?sharedUid=1077144

前置准备

创建流计算 Oceanus 集群

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建 Kafka  Topic

进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 实例的创建,并创建 2 个 Topic,order-sourceproduct-info

流计算 Oceanus 作业  

1. 上传依赖

在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传 Logger Sink[4]  JAR 包。

2. 创建作业  

在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 SQL 作业,点击【开发调试】进入作业编辑页面。

CREATE TABLE `order_source` (   `id` INT,   `user_id` INT,   `product_id` INT,   `create_time` TIMESTAMP(3)) WITH ( 'connector' = 'kafka', 'topic' = 'order-source',   'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种 'properties.bootstrap.servers' = 'x.x.x.x:9092',  -- 替换为您的 Kafka 连接地址 'properties.group.id' = 'testGroup0', -- 必选参数, 一定要指定 Group ID 'format' = 'json', 'json.fail-on-missing-field' = 'false',  -- 如果设置为 false, 则遇到缺失字段不会报错。 'json.ignore-parse-errors' = 'true'    -- 如果设置为 true,则忽略任何解析报错。);​CREATE TABLE `product_info` (   `product_id` INT,   `product_name` STRING,   `update_time` TIMESTAMP(3)) WITH ( 'connector' = 'kafka', 'topic' = 'product-info',  -- 替换为您要消费的 Topic 'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种 'properties.bootstrap.servers' = 'x.x.x.x:9092',  -- 替换为您的 Kafka 连接地址 'properties.group.id' = 'testGroup0', -- 必选参数, 一定要指定 Group ID 'format' = 'json', 'json.fail-on-missing-field' = 'false',  -- 如果设置为 false, 则遇到缺失字段不会报错。 'json.ignore-parse-errors' = 'true'    -- 如果设置为 true,则忽略任何解析报错。);​CREATE TABLE logger_sink_table (   `id` INT PRIMARY KEY NOT ENFORCED,   `user_id` INT,   `product_id` INT,   `product_name` STRING,   `create_time` TIMESTAMP(3)) WITH (   'connector' = 'logger',   'print-identifier' = 'DebugData');​INSERT INTO logger_sink_tableSELECT order_source.id, order_source.user_id, order_source.product_id, product_info.product_name, order_source.create_timeFROM order_source left join product_infoon order_source.product_id = product_info.product_id;
复制代码

 3. 运行作业  

点击【发布草稿】后启动作业,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。

4. 模拟数据  

通过 Kafka Client 发送数据到关联的左表 order-source 和右表 product-info。  发送消息命令:

[root@VM-3-centos ~]# cd /opt/kafka_2.11-2.4.1/bin/[root@VM-3-centos bin]# bash kafka-console-producer.sh --broker-list 10.0.0.29:9092 --topic order-source
复制代码

Topic order-source 模拟数据示例: 

{"id":1,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:47:00"}{"id":2,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:48:00"}{"id":3,"user_id":10,"product_id":1002,"create_time":"2022-03-17 16:49:00"}
复制代码

发送消息命令: 

[root@VM-3-centos ~]# cd /opt/kafka_2.11-2.4.1/bin/[root@VM-3-centos bin]# bash kafka-console-producer.sh --broker-list 10.0.0.29:9092 --topic product-info
复制代码

Topic product-info 模拟数据示例: 

{"id":1,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:47:00"}{"id":2,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:48:00"}{"id":3,"user_id":10,"product_id":1002,"create_time":"2022-03-17 16:49:00"}
复制代码

更多接入方式请参考 CKafka 收发消息 [5]  

5. 查看运行结果  

在【日志】面板的 TaskManager 中查看收到的数据,可以看到已经关联到了 product_id 为 1001 的商品名称。



总结

Regular Joins 比较适合批量加载数据的场景,而当关联的右表为时常更新的维表时会出现关联不到的情况。此外,从上述运行结果可以看出:Regular Joins 关联的记录为 Retract Stream(回撤流)下游需为 Upsert 类型 Sink。

更多 SQL Join 详情请参考开源 Flink 官方文章 SQL Join 章节[5]。  更多 Flink 实践教程详见 流计算 Oceanus 教程[6]  

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview  

[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298  

[3] Kafka 控制台:https://console.cloud.tencent.com/ckafka  

[4] Logger Sink 下载地址:https://cloud.tencent.com/document/product/849/58713  

[5] Flink SQL Join:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/joins  

[6] 流计算 Oceanus 教程:https://cloud.tencent.com/developer/tag/10509  

扫码加入 流计算 Oceanus 产品交流群👇




用户头像

还未添加个人签名 2020.06.19 加入

欢迎关注,邀您一起探索数据的无限潜能!

评论

发布
暂无评论
Flink 实践教程-进阶(11):SQL 关联:Regular Join_flink_腾讯云大数据_InfoQ写作平台