【FlinkSQL】Flink SQL Query 语法(四)- Rattern Recognition
操作符
Pattern Recognition
只适用流处理模式
搜索一组事件模式(Event pattern)是一种常见的用例,尤其是在数据流情景中。Flink 提供复杂事件处理(CEP)库,允许在事件流中进行模式检测。
Flink 的 SQL API 提供了一种关系式的查询表达方式,其中包含大量内置函数和基于规则的优化,可以开箱即用。
2016 年 12 月,国际标准化组织(ISO)发布了新版本的 SQL 标准,其中包括在 SQL 中的行模式识别(Row Pattern Recognition in SQL)(ISO/IEC TR 19075-5:2016)。允许 Flink 使用 MATCH_RECOGNIZE 子句融合 CEP 和 SQL API,以便在 SQL 中进行复杂事件处理。
MATCH_RECOGNIZE 子句启用以下任务:
使用 PARTITION BY 和 ORDER BY 子句对数据进行逻辑分区和排序。
使用 PATTERN 子句定义要查找的行模式。这些模式使用类似于正则表达式的语法。
在 DEFINE 子句中指定行模式变量的逻辑组合。
在 MEASURES 子句中定义表达式,这些表达式可用于 SQL 查询中的其他部分。
介绍和示例
安装指南
模式识别特性使用 Apache Flink 内部的 CEP 库。为了能够使用 MATCH_RECOGNIZE 子句,需要将库作为依赖项添加到 Maven 项目中。
在 SQL Client 中使用 MATCH_RECOGNIZE 子句,因为默认情况下包含所有依赖项。
SQL 语义
每个 MATCH_RECOGNIZE 查询都包含以下子句:
PARTITION BY - 定义表的逻辑分区;类似于 GROUP BY 操作。
ORDER BY - 指定传入行的排序方式;这是必须的,因为模式依赖于顺序。
MEASURES - 定义子句的输出;类似于 SELECT 子句。
ONE ROW PER MATCH - 输出方式,定义每个匹配项应产生多少行。
AFTER MATCH SKIP - 指定下一个匹配的开始位置;这也是控制单个事件可以属于多少个不同匹配项的方法。
PATTERN - 允许使用类似于正则表达式的语法构造搜索的模式。
DEFINE - 本部分定义了模式变量必须满足的条件。
目前,MATCH_RECOGNIZE 子句只能应用于 Append-only 表,也总是生成一个 Append-only 表。
示例
假设已经注册了一个表 Ticker
。该表包含特定时间点的股票价格。
为了简化,只考虑单个股票(ACME
)的传入数据。Ticker
记录可以类似于下表,其中的行是连续追加的。
现在的任务是找出一个单一股票价格不断下降的时间段。为此,可以编写如下查询:
此查询为股票价格持续下跌的每个期间生成摘要行。
该行结果描述了从 01-APR-11 10:00:04
开始的价格下跌期,在 01-APR-11 10:00:07
达到最低价格,到 01-APR-11 10:00:08
再次上涨。
分区
可以在分区数据中寻找模式,例如单个股票行情或特定用户的趋势。这可以用 PARTITION BY 子句来表示。该子句类似于对聚合分组。
强烈建议对传入的数据进行分区,否则 MATCH_RECOGNIZE 子句将被转换为非并行算子,以确保全局排序。
事件顺序
Apache Flink 可以根据时间(处理时间或者事件时间)进行模式搜索。
如果是事件时间,则在将事件传递到内部模式状态机之前对其进行排序。所以,无论行添加到表的顺序如何,生成的输出都是正确的。模式是按照每行中所包含的时间指定顺序计算的。
MATCH_RECOGNIZE 子句要求升序的时间属性是 ORDER BY 子句的第一个参数。对于示例 Ticker
表,诸如 ORDER BY rowtime ASC, price DESC
的定义是有效的,但 ORDER BY price, rowtime
或者 ORDER BY rowtime DESC, price ASC
是无效的。
Define & Measures
DEFINE 和 MEASURES 关键字与 WHERE 和 SELECT 子句具有相近的含义。
MEASURES 子句定义匹配模式的输出中要包含哪些内容。产生的行数取决于输出方式设置。
DEFINE 子句指定行必须满足的条件才能被分类到相应的模式变量。如果没有为模式变量定义条件,则将对每一行使用计算结果为 TRUE 的默认条件。
有关可使用的表达式的更详细的说明,查看下文 Pattern Navigation 的部分。
Aggregations
聚合可以在 DEFINE 和 MEASURES 子句中使用,支持内置函数和用户自定义函数。
匹配的行可以使用聚合函数,使用方式查看下文 Pattern Navigation 的部分
下面的示例找出股票平均价格没有低于某个阈值的最长时间段。展示了 MATCH_RECOGNIZE 在聚合中的可表达性。
给定此查询和以下输入值:
只要事件的平均价格不超过 15,查询就会将事件作为模式变量 A 的一部分进行累积。 例如,这种限制发生在 01-Apr-11 10:00:04
。接下来的时间段在 01-Apr-11 10:00:11
再次超过平均价格 15。因此,所述查询的结果将是:
聚合函数可以应用于模式表达式,但只能引用单个模式变量。因此,SUM(A.price * A.tax)
是有效的,而 AVG(A.price * B.tax)
则是无效的。
不支持 DISTINCT 聚合。
定义模式
MATCH_RECOGNIZE 子句允许用户在事件流中使用功能强大、表达力强的语法搜索模式,这种语法与广泛使用的正则表达式语法有些相似。
每个模式都是由基本的构建块构造的,称为 模式变量,可以应用算子(量词和其他修饰符)到这些模块中。整个模式必须用括号括起来。示例模式如下所示:
(A B) 表示 Row 必须满足开始于 A 结束于 B
* —— 0 或者多行
+ —— 1 或者多行
? —— 0 或者 1 行
{ n } —— 严格 n 行(n > 0)
{ n, } —— n 或者更多行(n ≥ 0)
{ n, m } —— 在 n 到 m(包含)行之间(0 ≤ n ≤ m,0 < m)
{ , m } —— 在 0 到 m(包含)行之间(m > 0)
不支持可能产生空匹配的模式。此类模式的示例如 PATTERN (A*)
,PATTERN (A? B*)
,PATTERN (A{0,} B{0,} C*)
等。
Greedy 量词和 Reluctant 量词
每一个量词可以是贪婪(Greedy,默认行为)的或者勉强的(Reluctant)。贪婪的量词尝试匹配尽可能多的行,而勉强的量词则尝试匹配尽可能少的行。
为了说明区别,可以通过查询查看以下示例,其中贪婪量词应用于 B 变量:
假设我们有以下输入:
上面的模式将产生以下输出:
模式变量 B 匹配 price
是 12、13、14 的行(A_11 | B_12 | B_13 | B_14 | C_16)
将 B* 修改为 B*? 的同一查询,这意味着 B* 应该是勉强的,将产生:
模式变量 B 只匹配 price
为 12 的行(A_11 | B_12 | C_13)(A_13 | B_14 | C_16)
模式的最后一个变量不能使用贪婪量词。因此,不允许使用类似 (A B*)
的模式。可以使用 NOT B 作为结尾。
目前不支持可选的勉强量词(A??
或者 A{0,1}?
)
时间约束
特别是对于流的使用场景,通常需要在给定的时间内完成模式。这要求限制住 Flink 在内部必须保持的状态总体大小(即已经过期的状态就不需要再维护了),即使在贪婪的量词的情况下也是如此。
因此,Flink SQL 支持附加的(非标准 SQL)WITHIN 子句来定义模式的时间约束。子句可以在 PATTERN 子句之后定义,并以毫秒为间隔进行解析。
如果潜在匹配的第一个和最后一个事件之间的时间长于给定值,则不会将这种匹配追加到结果表中。
通常鼓励使用 WITHIN 子句,因为有助于 Flink 进行有效的内存管理。一旦达到阈值,即可修剪基础状态。然而,WITHIN 子句不是 SQL 标准的一部分。时间约束处理的方法已被提议将来可能会改变。
下面的示例查询说明了 WITHIN 子句的用法:
该查询检测到在 1 小时的间隔内价格下降了 10。
假设该查询用于分析以下股票数据:
查询将生成以下结果:
结果行代表价格从 15(在 01-Apr-11 12:00:00
)下降到 1(在 01-Apr-11 13:00:00
)。dropDiff
列包含价格差异。
请注意,价格在 01-Apr-11 10:00:00
和 01-Apr-11 11:40:00
之间,下降了 11,因为两个事件之间的时间差大于 1 小时,因此,它们不会产生匹配。
输出方式
输出方式描述每个找到的匹配项应该输出多少行。SQL 标准描述了两种方式:
ALL ROWS PER MATCH
ONE ROW PER MATCH
目前,唯一支持的输出方式是 ONE ROW PER MATCH,始终为每个找到的匹配项生成一个输出行。输出行的 schema 将是按特定顺序连接 [partitioning columns] + [measures columns]
。以下示例显示了所定义的查询的输出:
对于以下输入行:
该查询将生成以下输出:
该模式识别由 symbol
列分区。即使在 MEASURES 子句中未明确提及,分区列仍会添加到结果的开头。
模式导航
DEFINE 和 MEASURES 子句允许在(可能)匹配模式的行列表中进行导航(用于声明条件或产生输出结果)。
引用模式变量
允许引用一组映射到 DEFINE 或 MEASURES 子句中特定模式变量的行。
例如,如果我们尝试将当前行与 A 进行匹配,则表达式 A.price
描述了目前为止已映射到 A 的一组行加上当前行。如果 DEFINE/MEASURES 子句中的表达式需要一行(例如 A.price
或 A.price > 10
),将选择属于相应集合的最后一个值。
如果没有指定模式变量(例如 SUM(price)
),则表达式引用默认模式变量 *
,该变量引用模式中的所有变量。换句话说,创建了一个列表,其中列出了迄今为止映射到任何变量的所有行以及当前行。
对于更全面的示例,可以查看以下模式和相应的条件:
下表描述了如何为每个传入事件计算这些条件。
该表由以下列组成:
# —— 行标识符,用于唯一标识列表中的传入行
[A.price]/[B.price]/[price]
。price —— 传入行的价格。
[A.price]/[B.price]/[price] —— 描述 DEFINE 子句中用于计算条件的行列表。
Classifier —— 当前行的分类器,指示该行映射到的模式变量。
A.price/B.price/SUM(price)/SUM(B.price) —— 描述了这些表达式求值后的结果。
从表中可以看出,第一行映射到模式变量 A,随后的行映射到模式变量 B。但是,最后一行不满足 B 条件,因为所有映射行 SUM(price)
的总和与 B 中所有行的总和都超过了指定的阈值。
Logical Offsets
在映射到指定模式变量的事件启用导航。这可以用两个相应的函数表示:
text LAST(variable.field, n)
返回映射到变量最后 n 个元素的事件中的字段值。计数从映射的最后一个元素开始。
text FIRST(variable.field, n)
返回映射到变量的第 n 个元素的事件中的字段值。计数从映射的第一个元素开始。
对于更全面的示例,可以参考以下模式和相应的条件:
下表描述了如何为每个传入事件计算这些条件。
该表包括以下列:
price —— 传入行的价格。
Classifier —— 当前行的分类器,指示该行映射到的模式变量。
LAST(B.price, 1)/LAST(B.price, 2) —— 描述对这些表达式求值后的结果。
将默认模式变量与 Logical offsets 一起使用也可能很有意义。
在这种情况下,offset 会包含到目前为止映射的所有行:
如果第二行没有映射到 B 变量,则会得到以下结果:
也可以在 FIRST/LAST 函数的第一个参数中使用多个模式变量引用。这样,可以编写访问多个列的表达式。但是,它们都必须使用相同的模式变量。换句话说,必须在一行中计算 LAST/FIRST 函数的值。
因此,可以使用 LAST(A.price * A.tax)
,但不允许使用类似 LAST(A.price * B.tax)
的表达式。
匹配后的策略
AFTER MATCH SKIP 子句指定在找到完全匹配后从何处开始新的匹配过程。
有四种不同的策略:
SKIP PAST LAST ROW —— 在当前匹配的最后一行之后的下一行继续模式匹配。
SKIP TO NEXT ROW —— 继续从匹配项开始行后的下一行开始搜索新匹配项。
SKIP TO LAST variable —— 恢复映射到指定模式变量的最后一行的模式匹配。
SKIP TO FIRST variable —— 在映射到指定模式变量的第一行继续模式匹配。
这也是一种指定单个事件可以属于多少个匹配项的方法。例如,使用 SKIP PAST LAST ROW 策略,每个事件最多只能属于一个匹配项。
为了更好地理解这些策略之间的差异,我们可以看看下面的例子。
对于以下输入行:
我们使用不同的策略评估以下查询:
该查询返回映射到 A 的总体匹配的第一个和最后一个时间戳所有行的价格之和。
查询将根据使用的 AFTER MATCH 策略产生不同的结果:
AFTER MATCH SKIP PAST LAST ROW
第一个结果与 #1,#2,#3,#4 行匹配。
第二个结果与 #5,#6,#7 行匹配。
AFTER MATCH SKIP TO NEXT ROW
同样,第一个结果与 #1,#2,#3,#4 行匹配。
与上一个策略相比,下一个匹配再次包含 #2 行匹配。因此,第二个结果与 #2,#3,#4,#5 行匹配。
第三个结果与 #3,#4,#5,#6 行匹配。
第四个结果与 #4,#5,#6,#7 行匹配。
最后一个结果与 #5,#6,#7 行匹配。
AFTER MATCH SKIP TO LAST A
同样,第一个结果与 #1,#2,#3,#4 行匹配。
与前一个策略相比,下一个匹配只包含 #3 行(对应 A)用于下一个匹配。因此,第二个结果与 #3,#4,#5,#6 行匹配。
最后一个结果与 #5,#6,#7 行匹配。
AFTER MATCH SKIP TO FIRST A
这种组合将产生一个运行时异常,因为人们总是试图在上一个开始的地方开始一个新的匹配。这将产生一个无限循环,因此是禁止的。
必须记住,在 SKIP TO FIRST/LAST variable
策略的场景下,可能没有映射到该变量的行(例如,对于模式 A*)。在这种情况下,将抛出一个运行时异常,因为标准要求一个有效的行来继续匹配。
时间属性
为了在 MATCH_RECOGNIZE 之上应用一些后续查询,可能需要使用时间属性。有两个函数可供选择:
MATCH_ROWTIME()
返回映射到给定模式的最后一行的时间戳。
MATCH_PROCTIME()
返回处理时间属性。
控制内存消耗
在编写 MATCH_RECOGNIZE 查询时,内存消耗是一个重要的考虑因素,因为潜在匹配的空间是以宽度优先的方式构建的。为了确保模式能够完成,最好使用映射到匹配项的合理数量的行。
例如,该模式不能有接受每一行的量词表达。这种模式可以是这样的:
查询将每个传入行映射到 B 变量,因此永远不会完成。可以纠正此查询,例如,通过否定 C 的条件:
或者使用 Reluctant 语法:
已知的局限
Flink 对 MATCH_RECOGNIZE 子句实现是一项长期持续的工作,目前尚不支持 SQL 标准的某些功能。
不支持的功能包括:
模式表达式:
Pattern groups —— 这意味着量词不能应用于模式的子序列。因此,
(A (B C)+)
不是有效的模式。Alterations —— 像
PATTERN((A B | C D) E)
这样的模式,这意味着在寻找 E 行之前必须先找到子序列 A B 或者 C D。PERMUTE operator —— 这等同于它应用于所示的所有变量的排列
PATTERN (PERMUTE (A, B, C)) = PATTERN (A B C | A C B | B A C | B C A | C A B | C B A)
。Anchors ——
^
和$
,表示分区的开始/结束,在流上下文中没有意义,将不被支持。Exclusion ——
PATTERN ({- A -} B)
表示将查找 A,但是不会参与输出。这只适用于 ALL ROWS PER MATCH 方式。Reluctant optional quantifier ——
PATTERN A??
只支持贪婪的可选量词。ALL ROWS PER MATCH 输出方式 —— 为参与创建匹配项的每一行产生一个输出行。这也意味着:
MEASURES 子句唯一支持的语义是 FINAL
CLASSIFIER 函数,尚不支持返回行映射到的模式变量。
SUBSET —— 允许创建模式变量的逻辑组,并在 DEFINE 和 MEASURES 子句中使用这些组。
Physical offsets —— PREV/NEXT,为所有可见事件建立索引,而不是仅将那些映射到模式变量的事件编入索引。
提取时间属性 —— 目前无法为后续基于时间的操作提取时间属性。(这里文档可能有误,
MATCH_ROWTIME()
和MATCH_PROCTIME()
可以提取时间属性)MATCH_RECOGNIZE 仅 SQL 支持。Table API 中没有等效项。
Aggregations:
不支持 distinct aggregations。
版权声明: 本文为 InfoQ 作者【Alex🐒】的原创文章。
原文链接:【http://xie.infoq.cn/article/72a6446d9ed4b776d5eded0bf】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论