写点什么

用户指南 | 如何使用 Flow 功能实现持续聚合,赋能实时计算和查询

  • 2024-11-12
    北京
  • 本文字数:3278 字

    阅读完需:约 11 分钟

用户指南 | 如何使用 Flow 功能实现持续聚合,赋能实时计算和查询

什么是持续聚合

持续聚合功能在实际应用中有许多落地场景,比如 Streaming ETL、实时分析、监控报警等。其中一个最常见的应用是降采样(Downsampling),使用窗口函数,可以把一个毫秒级输出频率的信号降采样到秒级(比如通过计算一秒内的平均值),这样就可以节省存储和计算成本。


进一步地,例如一个速度传感器高频输入大量数据,持续聚合功能可以对这些输入数据进行过滤,过滤掉速度低于或高于一定数值的数据点,并且计算每五分钟内的平均速度,最后将结果输出到结果表中。


持续聚合功能由 Flow Engine 提供。Flow 是 GreptimeDB 内置的一个轻量级流处理引擎,为用户提供了持续聚合、窗口计算等功能。用户可以直接使用 SQL 语句来创建一个 Flow 任务进行持续聚合,无需额外编写业务代码。Flow 任务可用于实时数据处理、实时计算等场景。

应用示例

持续聚合功能可以完全通过 SQL 来定义和使用,本文将演示从创建 Flow 任务,接受数据进行流处理,到删除该 Flow 任务的全部流程


我们以一个速度传感器读入左右轮的瞬时速度,并且过滤掉较高或较低的异常值,并计算五秒内的平均速度为例。


首先,创建一个源数据表作为输入:


CREATE TABLE velocity (    ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,    left_wheel FLOAT,    right_wheel FLOAT,    TIME INDEX(ts));
复制代码


以及作为 Flow 任务输出的结果表:


CREATE TABLE avg_speed (    avg_speed FLOAT,    start_window TIMESTAMP TIME INDEX,    end_window TIMESTAMP,    update_at TIMESTAMP,);
复制代码


接下来就可以创建 Flow 任务了,这里需要使用我们提供的 SQL 方言语法 CREATE FLOW,可以参考如下例子:


CREATE FLOW calc_avg_speedSINK TO avg_speedASSELECT avg((left_wheel+right_wheel)/2)FROM velocityWHERE left_wheel > 0.5 AND right_wheel > 0.5 AND left_wheel < 60 AND right_wheel < 60GROUP BY tumble(ts, '5 second');
复制代码


上述 SQL 语句的意思是:创建一个名为 calc_avg_speed 的 Flow 任务,它会把结果输出到 avg_speed 表上。其上运行的查询由 AS 之后的 SELECT 语句定义。


首先,过滤掉左右轮速度值中过小或过大的值(小于等于 0.5 或大于等于 60);然后,基于输入表 velocity,以 ts 列的每五秒的间隔作为窗口,计算窗口内的平均速度。Flow 作业当中的查询完全基于 SQL 语法,并根据需求实现了相关扩展。


现在 Flow 任务已经创建,想要观察 avg_speed 中的持续聚合的结果,只需要向源数据表 velocity 中插入数据:


INSERT INTO velocity VALUES    ("2021-07-01 00:00:00.200", 0.0, 0.7),    ("2021-07-01 00:00:00.200", 0.0, 61.0),    ("2021-07-01 00:00:02.500", 2.0, 1.0,);
复制代码


注意其中前两行都因为不符合条件被过滤掉了,只留下第三行被用于计算,查询输出表就可以得到计算结果:


SELECT * FROM avg_speed;
复制代码


 avg_speed |        start_window        |         end_window         |         update_at          -----------+----------------------------+----------------------------+----------------------------       1.5 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:05.000000 | 2024-06-04 03:35:20.670000(1 row)
复制代码


尝试向 velocity 表中插入更多数据:


INSERT INTO velocity VALUES    ("2021-07-01 00:00:05.100", 5.0, 4.0),    ("2021-07-01 00:00:09.600", 2.3, 2.1);
复制代码


结果表 avg_speed 现在包含两行:分别表示两个 5 秒窗口的平均值,1.5 和 3.35(=(4.5+2.2)/2)


SELECT * FROM avg_speed;
复制代码


 avg_speed |        start_window        |         end_window         |         update_at          -----------+----------------------------+----------------------------+----------------------------       1.5 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:05.000000 | 2024-06-04 03:35:20.670000      3.35 | 2021-07-01 00:00:05.000000 | 2021-07-01 00:00:10.000000 | 2024-06-04 03:35:34.693000
复制代码


avg_speed 表中的列解释如下:


  • avg_speed:窗口中计算得到的平均速度;

  • start_window:窗口的开始时间;

  • end_window:窗口的结束时间;

  • update_at:更新行数据的时间。


其中 start_windowend_window 是 Flow 引擎的时间窗口函数 tumble 自动添加的。update_at 则是 Flow 引擎对 Flow 任务输出表自动添加的一列,用于标记这一行数据的最新更新时间,以便了解 Flow 任务的运行情况。


最后,使用 DROP FLOW 删除这个 Flow 任务:


DROP FLOW calc_avg_speed;
复制代码

Flow 管理及高级特性

创建或更新 Flow

创建 Flow 的语法是:


CREATE FLOW [ IF NOT EXISTS ] <flow-name>SINK TO <sink-table-name>[ EXPIRE AFTER <expr> ][ COMMENT = "<string>" ]AS <SQL>;
复制代码


上述创建 Flow 任务的语法的解释如下:


  • flow-name 是全局唯一的标识符。

  • sink-table-name 是存储聚合数据的表名。它可以是一个现有的表或一个新表。如果目标表不存在,Flow 将自动创建目标表。

  • EXPIRE AFTER 是一个可选的时间间隔(使用 SQL 的 INTERVAL 语法表示),用于从 Flow 引擎中清除过期的中间状态。

  • COMMENT 是 Flow 任务的注释性描述。

  • <SQL>部分对应具体的持续聚合查询。Flow 计算引擎会从中提取引用到的表名并且作为 Flow 任务的源表。


一个简单的示例:


CREATE FLOW IF NOT EXISTS my_flowSINK TO my_sink_tableEXPIRE AFTER INTERVAL '1 hour'COMMENT = "My first flow in GreptimeDB"ASSELECT count(item) from my_source_table GROUP BY tumble(time_index, '5 minutes');
复制代码


其中 EXPIRE AFTER 项可能需要进一步解释。简单来说,像所有现代的流处理系统一样,Flow 计算引擎有两个重要概念:系统时间和事件时间


  • 系统时间:也叫处理时间,就是进行流处理计算的机器的系统时间。

  • 事件时间:某一行数据代表的事件发生的时间,一般也会记录在该行数据的某一列中,Flow 将 TIME INDEX 列视为事件时间。


EXPIRE AFTER 过期机制利用系统时间和事件时间之间的差值,清除掉 Flow 中间状态中过于古老的行。,上面示例 SQL 中,事件时间老于系统时间一小时以上的行就会被清除掉,不再参与运算。


🌟 注意,EXPIRE AFTER 只作用于新到达的数据。因此输出表中的结果不会单纯因为时间流逝而产生变化,只是不再会有更老的数据被更新到结果表上了。


另外, Flow 的中间状态目前也没有进行任何持久化,而是纯内存的,之后会添加持久化功能以使其可以在重启后也能保证数据正确。

删除 Flow

使用如下语句即可删除一个 Flow 任务:


DROP FLOW [IF EXISTS] <name>
复制代码

Flow 目前支持的聚合函数

除了 countsumavgminmax 这几种聚合函数,Flow 目前还支持了加减乘除、比较和逻辑运算几种标量函数,以及固定窗口的 tumble 函数。


未来,我们计划在持续聚合中支持更多的聚合函数、标量函数和窗口函数。

总结

本文介绍了 GreptimeDB 中持续聚合功能的基本用法和特性,并且举例说明了创建、使用和删除 Flow 任务的流程。使用持续聚合可以随时、低延时(秒级/亚秒级)地获取用户关心的信息,同时避免了额外的内存和计算上的开销。


未来,除了支持更多函数之外,我们还会支持流处理中间状态的持久化和诸如 Temporal Filter 等高级功能,更详细的信息可以参考相关的用户文档开发指南


关于 Greptime

Greptime 格睿科技专注于为可观测、物联网及车联网等领域提供实时、高效的数据存储和分析服务,帮助客户挖掘数据的深层价值。目前基于云原生的时序数据库 GreptimeDB 已经衍生出多款适合不同用户的解决方案,更多信息或 demo 展示请联系下方小助手(微信号:greptime)。

欢迎对开源感兴趣的朋友们参与贡献和讨论,从带有 good first issue 标签的 issue 开始你的开源之旅吧~期待在开源社群里遇见你!添加小助手微信即可加入“技术交流群”与志同道合的朋友们面对面交流哦~

Star us on GitHub Now: https://github.com/GreptimeTeam/greptimedb

官网:https://greptime.cn/

文档:https://docs.greptime.cn/

Twitter: https://twitter.com/Greptime

Slack: https://greptime.com/slack

LinkedIn: https://www.linkedin.com/company/greptime/

用户头像

专注于 Infra 技术分享 2022-09-23 加入

分布式、高性能、存储计算分离的开源云原生时序数据库

评论

发布
暂无评论
用户指南 | 如何使用 Flow 功能实现持续聚合,赋能实时计算和查询_数据分析_Greptime 格睿科技_InfoQ写作社区