用户指南 | 如何使用 Flow 功能实现持续聚合,赋能实时计算和查询
什么是持续聚合
持续聚合功能在实际应用中有许多落地场景,比如 Streaming ETL、实时分析、监控报警等。其中一个最常见的应用是降采样(Downsampling),使用窗口函数,可以把一个毫秒级输出频率的信号降采样到秒级(比如通过计算一秒内的平均值),这样就可以节省存储和计算成本。
进一步地,例如一个速度传感器高频输入大量数据,持续聚合功能可以对这些输入数据进行过滤,过滤掉速度低于或高于一定数值的数据点,并且计算每五分钟内的平均速度,最后将结果输出到结果表中。
持续聚合功能由 Flow Engine 提供。Flow 是 GreptimeDB 内置的一个轻量级流处理引擎,为用户提供了持续聚合、窗口计算等功能。用户可以直接使用 SQL 语句来创建一个 Flow 任务进行持续聚合,无需额外编写业务代码。Flow 任务可用于实时数据处理、实时计算等场景。
应用示例
持续聚合功能可以完全通过 SQL 来定义和使用,本文将演示从创建 Flow 任务,接受数据进行流处理,到删除该 Flow 任务的全部流程。
我们以一个速度传感器读入左右轮的瞬时速度,并且过滤掉较高或较低的异常值,并计算五秒内的平均速度为例。
首先,创建一个源数据表作为输入:
以及作为 Flow 任务输出的结果表:
接下来就可以创建 Flow 任务了,这里需要使用我们提供的 SQL 方言语法 CREATE FLOW
,可以参考如下例子:
上述 SQL 语句的意思是:创建一个名为 calc_avg_speed
的 Flow 任务,它会把结果输出到 avg_speed
表上。其上运行的查询由 AS
之后的 SELECT
语句定义。
首先,过滤掉左右轮速度值中过小或过大的值(小于等于 0.5 或大于等于 60);然后,基于输入表 velocity
,以 ts
列的每五秒的间隔作为窗口,计算窗口内的平均速度。Flow 作业当中的查询完全基于 SQL 语法,并根据需求实现了相关扩展。
现在 Flow 任务已经创建,想要观察 avg_speed
中的持续聚合的结果,只需要向源数据表 velocity
中插入数据:
注意其中前两行都因为不符合条件被过滤掉了,只留下第三行被用于计算,查询输出表就可以得到计算结果:
尝试向 velocity
表中插入更多数据:
结果表 avg_speed
现在包含两行:分别表示两个 5 秒窗口的平均值,1.5 和 3.35(=(4.5+2.2)/2)
avg_speed
表中的列解释如下:
avg_speed
:窗口中计算得到的平均速度;start_window
:窗口的开始时间;end_window
:窗口的结束时间;update_at
:更新行数据的时间。
其中 start_window
和 end_window
是 Flow 引擎的时间窗口函数 tumble
自动添加的。update_at
则是 Flow 引擎对 Flow 任务输出表自动添加的一列,用于标记这一行数据的最新更新时间,以便了解 Flow 任务的运行情况。
最后,使用 DROP FLOW
删除这个 Flow 任务:
Flow 管理及高级特性
创建或更新 Flow
创建 Flow 的语法是:
上述创建 Flow 任务的语法的解释如下:
flow-name
是全局唯一的标识符。sink-table-name
是存储聚合数据的表名。它可以是一个现有的表或一个新表。如果目标表不存在,Flow 将自动创建目标表。EXPIRE AFTER
是一个可选的时间间隔(使用 SQL 的 INTERVAL 语法表示),用于从 Flow 引擎中清除过期的中间状态。COMMENT
是 Flow 任务的注释性描述。<SQL>
部分对应具体的持续聚合查询。Flow 计算引擎会从中提取引用到的表名并且作为 Flow 任务的源表。
一个简单的示例:
其中 EXPIRE AFTER
项可能需要进一步解释。简单来说,像所有现代的流处理系统一样,Flow 计算引擎有两个重要概念:系统时间和事件时间。
系统时间:也叫处理时间,就是进行流处理计算的机器的系统时间。
事件时间:某一行数据代表的事件发生的时间,一般也会记录在该行数据的某一列中,Flow 将
TIME INDEX
列视为事件时间。
EXPIRE AFTER
过期机制利用系统时间和事件时间之间的差值,清除掉 Flow 中间状态中过于古老的行。,上面示例 SQL 中,事件时间老于系统时间一小时以上的行就会被清除掉,不再参与运算。
🌟 注意,EXPIRE AFTER
只作用于新到达的数据。因此输出表中的结果不会单纯因为时间流逝而产生变化,只是不再会有更老的数据被更新到结果表上了。
另外, Flow 的中间状态目前也没有进行任何持久化,而是纯内存的,之后会添加持久化功能以使其可以在重启后也能保证数据正确。
删除 Flow
使用如下语句即可删除一个 Flow 任务:
Flow 目前支持的聚合函数
除了 count
、sum
、avg
、min
、max
这几种聚合函数,Flow 目前还支持了加减乘除、比较和逻辑运算几种标量函数,以及固定窗口的 tumble
函数。
未来,我们计划在持续聚合中支持更多的聚合函数、标量函数和窗口函数。
总结
本文介绍了 GreptimeDB 中持续聚合功能的基本用法和特性,并且举例说明了创建、使用和删除 Flow 任务的流程。使用持续聚合可以随时、低延时(秒级/亚秒级)地获取用户关心的信息,同时避免了额外的内存和计算上的开销。
未来,除了支持更多函数之外,我们还会支持流处理中间状态的持久化和诸如 Temporal Filter 等高级功能,更详细的信息可以参考相关的用户文档和开发指南。
关于 Greptime
Greptime 格睿科技专注于为可观测、物联网及车联网等领域提供实时、高效的数据存储和分析服务,帮助客户挖掘数据的深层价值。目前基于云原生的时序数据库 GreptimeDB 已经衍生出多款适合不同用户的解决方案,更多信息或 demo 展示请联系下方小助手(微信号:greptime)。
欢迎对开源感兴趣的朋友们参与贡献和讨论,从带有 good first issue 标签的 issue 开始你的开源之旅吧~期待在开源社群里遇见你!添加小助手微信即可加入“技术交流群”与志同道合的朋友们面对面交流哦~
Star us on GitHub Now: https://github.com/GreptimeTeam/greptimedb
Twitter: https://twitter.com/Greptime
Slack: https://greptime.com/slack
评论