写点什么

Flink 实践教程 - 进阶(5):排序(乱序调整)

  • 2021 年 12 月 29 日
  • 本文字数:2904 字

    阅读完需:约 10 分钟

Flink 实践教程-进阶(5):排序(乱序调整)

作者:腾讯云流计算 Oceanus 团队


流计算 Oceanus 简介流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。本文将为您详细介绍如何使用 Windowing TVF 配合聚合函数,实时调整乱序数据,经过聚合分析后存入 MySQL 中。


前置准备创建流计算 Oceanus 集群进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。创建消息队列 CKafka 进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 [4]。创建 Topic: 进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。数据准备: 进入同子网的 CVM 下,启动 Kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端 [6]。

启动 Kafka 生产者命令

bash kafka-console-producer.sh --broker-list 10.0.0.29:9092 --topic oceanus_advanced5_input --producer.config ../config/producer.properties// 按顺序插入如下数据,注意这里数据时间是乱序的{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:16"}{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:30"}{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:50"}{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:59"}{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:43"}{"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:09"}{"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:01"}{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:50"}{"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:15"}{"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:50"}{"order_id":"10000","num":1,"event_time":"2021-12-22 14:31:15"}创建 MySQL 实例进入 MySQL 控制台 [7],点击【新建】。具体可参考官方文档 创建 MySQL 实例 [8]。-- 建表语句 CREATE TABLE oceanus_advanced5_output (window_start datetime NOT NULL,window_end datetime NOT NULL,num int(11) DEFAULT NULL,PRIMARY KEY (window_start,window_end)) ENGINE=InnoDB DEFAULT CHARSET=utf8


流计算 Oceanus 作业


  1. 创建 SourceCREATE TABLE kafka_json_source_table (order_id VARCHAR,num INT,event_time TIMESTAMP(3),-- 根据事件时间 event_time 设置 10s 的延迟水印 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND) WITH ('connector' = 'kafka','topic' = 'oceanus_advanced5_input', -- 替换为您要消费的 Topic'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种'properties.bootstrap.servers' = '10.0.0.29:9092', -- 替换为您的 Kafka 连接地址'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID'format' = 'json','json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。);

  2. 创建 SinkCREATE TABLE jdbc_upsert_sink_table (window_start TIMESTAMP(3),window_end TIMESTAMP(3),num INT,PRIMARY KEY(window_start,window_end) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://10.0.0.158:3306/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数'table-name' = 'oceanus_advanced5_output', -- 需要写入的数据表'username' = 'root', -- 数据库访问的用户名(需要提供 INSERT 权限)'password' = 'Tencent123$', -- 数据库访问的密码'sink.buffer-flush.max-rows' = '200', -- 批量输出的条数'sink.buffer-flush.interval' = '2s' -- 批量输出的间隔);

  3. 编写业务 SQLINSERT INTO jdbc_upsert_sink_tableSELECTwindow_start,window_end,SUM(num) AS numFROM TABLE(-- Windowing TVFTUMBLE(TABLE kafka_json_source_table,DESCRIPTOR(event_time),INTERVAL '1' MINUTES)) GROUP BY window_start,window_end;

  4. 查询数据进入 MySQL 控制台 [7],单击右侧【登陆】快速登陆数据库,选择相应的库表查询数据。


笔者这里设置的 10s 的延迟水印,可以看到在 29~30、30~31 时间段的数据统计是正确,并没有因为数据延时而出现漏统计的现象。31~32 时间段的数据并没有统计出来,这是因为我们最后一条数据时间是 2021-12-22 14:31:15,其水印时间为 2021-12-22 14:31:05,小于窗口关闭时间,导致这段时间窗口还未关闭、未计算。


总结 WARTERMARK 是跟随在每条数据上的一条特殊标签,而且只增不减(可以相等)。WARTERMARK 并不能影响数据出现在哪个窗口(本例中由 event_time 决定),其主要决定窗口是否关闭(当水印时间大于窗口结束时间时,窗口关闭并计算)。


如果数据延时过大,例如小时级别,可以配合 allowedLateness 算子合理性使用 WARTERMARK,当达到水印结束时间时,窗口并不关闭,只进行计算操作,当时间到达 allowedLateness 算子设置的时间后,窗口才真正关闭,并在原先的基础上再次进行计算。如在 allowedLateness 算子设置的时间后才达到的数据,我们可以使用 sideOutputLateData 算子将迟到的数据输出到侧输出流进行计算。这里需要注意 allowedLateness 和 sideOutputLateData 算子目前只能使用 Stream API 实现。


目前 flink 1.13 的 Windowing TVF 函数并不能单独使用,需配合 AGGREGATE、JOIN、TOPN 使用。建议优先使用 Windowing TVF 实现窗口聚合等功能,因为 Windowing TVF 更符合 SQL 书写规范,底层优化逻辑也更好。


参考链接[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview


[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298


[3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1


[4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839


[5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854


[6] 运行 Kafka 客户端:https://cloud.tencent.com/document/product/597/56840


[7] MySQL 控制台:https://console.cloud.tencent.com/cdb


[8] 创建 MySQL 实例:https://cloud.tencent.com/document/product/236/46433


流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓


点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~腾讯云大数据https://image.ipaiban.com/upload-ueditor-image-20200619-1592556685554099336.jpg长按二维码关注我们

用户头像

还未添加个人签名 2020.06.19 加入

欢迎关注,邀您一起探索数据的无限潜能!

评论

发布
暂无评论
Flink 实践教程-进阶(5):排序(乱序调整)