写点什么

Flink 实践教程 - 进阶(3):窗口操作

  • 2021 年 12 月 22 日
  • 本文字数:2678 字

    阅读完需:约 9 分钟

Flink 实践教程-进阶(3):窗口操作

作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。


本文将为您详细介绍如何实时获取 CKafka 中的 JSON 格式数据,经过 HOP WINDOW(滑动窗口)函数聚合分析后存入 ClickHouse 中。


操作视频

前置准备

创建流计算 Oceanus 集群

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建消息队列 CKafka

进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 [4]。


创建 Topic:


进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。


数据准备:


进入同子网的 CVM 下,启动 Kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端 [6]。


# Kafka 客户端启动生产者命令bash kafka-console-producer.sh --broker-list 10.0.0.29:9092 --topic oceanus_advanced3_input --producer.config ../config/producer.properties
复制代码


// 数据格式{  "id": 1,  "amount": 10,  "times": "2021-10-01 10:00:00"}
复制代码

创建 ClickHouse 集群

进入 ClickHouse 控制台 [7],点击左上角【新建集群】,完成 ClickHouse 集群的创建。具体可参考 ClickHouse 快速入门 [8]。


注意:创建 Oceanus 集群和 ClickHouse 集群时所选的 VPC 建议相同。

创建 ClickHouse 表:

  1. 进入与 ClickHouse 集群同 VPC 的某一台 CVM 下,安装 ClickHouse 客户端(下载该客户端需连通外网),具体操作步骤参考 ClickHouse 快速入门 [8]。


# 下载 ClickHouse-Client 命令wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-client-20.7.2.30-2.noarch.rpmwget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-common-static-20.7.2.30-2.x86_64.rpm
# 安装客户端rpm -ivh *.rpm
# 使用 tcp 端口登陆 ClickHouse 集群,IP 地址可通过控制台查看clickhouse-client -hxx.xx.xx.xx --port 9000 -m
复制代码


  1. 登陆 ClickHouse 集群,建表。


CREATE TABLE default.oceanus_advanced3_output1 on cluster default_cluster (    win_start   TIMESTAMP,    win_end     TIMESTAMP,    id          Int8,    amount_all  Int16,    Sign        Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/default/oceanus_advanced3_output1', '{replica}',Sign) ORDER BY (win_start,win_end,id);
复制代码

Oceanus 作业

1. 创建 Source

CREATE TABLE `kafka_json_source_table` (  `id`      INT,  `amount`  INT,  `times`   TIMESTAMP(3),    WATERMARK FOR times AS times - INTERVAL '3' SECOND ) WITH (  'connector' = 'kafka',  'topic' = 'oceanus_advanced3_input',      -- 替换为您要消费的 Topic  'scan.startup.mode' = 'earliest-offset',  -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种  'properties.bootstrap.servers' = '10.0.0.29:9092',  -- 替换为您的 Kafka 连接地址  'properties.group.id' = 'testGroup',      -- 必选参数, 一定要指定 Group ID  'format' = 'json',  'json.fail-on-missing-field' = 'false',   -- 如果设置为 false, 则遇到缺失字段不会报错。  'json.ignore-parse-errors' = 'true'       -- 如果设置为 true,则忽略任何解析报错。);
复制代码

2. 创建 Sink

CREATE TABLE `clickhouse_sink` (    `win_start`  TIMESTAMP(3),    `win_end`    TIMESTAMP(3),    `id`         INT,    `amount_all` INT,     PRIMARY KEY (win_start,win_end,id) NOT ENFORCED) WITH (    'connector' = 'clickhouse',    'url' = 'clickhouse://10.0.0.178:8123',    'database-name' = 'default',    'table-name' = 'oceanus_advanced3_output1',    'table.collapsing.field' = 'Sign'   -- CollapsingMergeTree 类型列字段的名称);
复制代码

3. 编写业务 SQL

INSERT INTO clickhouse_sinkSELECTHOP_START(times,INTERVAL '30' SECOND,INTERVAL '1' MINUTE) AS win_start,  -- 滑动窗口的开始时间HOP_END(times,INTERVAL '30' SECOND,INTERVAL '1' MINUTE)   AS win_end,    -- 滑动窗口的结束时间id,SUM(amount)  AS amount_allFROM kafka_json_source_table-- 这里使用滑动窗口函数和用户 id 进行分组聚合,统计了每分钟各用户的视频点击量,每30s更新一次。GROUP BY HOP(times,INTERVAL '30' SECOND,INTERVAL '1' MINUTE),id;
复制代码


新版 Flink 1.13 集群 SQL 作业不需要用户自己选择内置 Connector

总结

HOP WINDOW(滑动窗口)将元素分配到固定长度的窗口中,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。HOP WINDOW(滑动窗口)保持窗口大小(Size:INTERVAL '1' MINUTE)不变,每次滑动指定的时间周期(Slide:INTERVAL '30' SECOND),因而允许窗口之间的相互重叠。


Slide 的大小决定了 Flink 创建新窗口的频率。


  • 当 Slide 小于 Size 时,相邻窗口会重叠,一个时间会被分配到多个窗口。

  • 当 Slide 大于 Size 时,可能会导致有些事件被丢弃。

  • 当 Slide 等于 Size 时,等于是 TUMBLE WINDOW(滚动窗口)。


更多时间窗口函数请参考 Oceanus 官方文档 [9]。

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview


[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298


[3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1


[4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839


[5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854


[6] 运行 Kafka 客户端:https://cloud.tencent.com/document/product/597/56840


[7] ClickHouse 控制台:https://console.cloud.tencent.com/cdwch?region=ap-guangzhou


[8] ClickHouse 快速入门:https://cloud.tencent.com/document/product/1299/49824


[9] 时间窗口函数:https://cloud.tencent.com/document/product/849/18077


流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓



关注“腾讯云大数据”公众号,技术交流、最新活动、服务专享一站 Get~

用户头像

还未添加个人签名 2020.06.19 加入

欢迎关注,邀您一起探索数据的无限潜能!

评论

发布
暂无评论
Flink 实践教程-进阶(3):窗口操作