写点什么

【FlinkSQL】Flink SQL Query 语法(二)

用户头像
Alex🐒
关注
发布于: 2021 年 06 月 11 日

主要引用官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-agg/

操作符

Window Aggregation

只有流处理支持。

Window TVF Aggregation

窗口聚合定义在 GROUP BY 字句中(使用 Window TVF 的 window_startwindow_end 列)。与使用常规 GROUP BY 子句的查询一样,使用 GROUP BY Window 的查询将为每个 GROUP 计算一个结果行。

SELECT ...FROM <windowed_table> -- 关联查询 windowing TVFGROUP BY window_start, window_end, ...
复制代码


存在一个 Bid

Flink SQL> desc Bid;+-------------+------------------------+------+-----+--------+---------------------------------+|        name |                   type | null | key | extras |                       watermark |+-------------+------------------------+------+-----+--------+---------------------------------+|     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND ||       price |         DECIMAL(10, 2) | true |     |        |                                 ||        item |                 STRING | true |     |        |                                 || supplier_id |                 STRING | true |     |        |                                 |+-------------+------------------------+------+-----+--------+---------------------------------+
Flink SQL> SELECT * FROM Bid;+------------------+-------+------+-------------+| bidtime | price | item | supplier_id |+------------------+-------+------+-------------+| 2020-04-15 08:05 | 4.00 | C | supplier1 || 2020-04-15 08:07 | 2.00 | A | supplier1 || 2020-04-15 08:09 | 5.00 | D | supplier2 || 2020-04-15 08:11 | 3.00 | B | supplier2 || 2020-04-15 08:13 | 1.00 | E | supplier1 || 2020-04-15 08:17 | 6.00 | F | supplier2 |+------------------+-------+------+-------------+
复制代码

Windowing TVFs

三种 TVF 聚合的例子

SELECT window_start, window_end, SUM(price)  FROM TABLE(    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))  GROUP BY window_start, window_end;+------------------+------------------+-------+|     window_start |       window_end | price |+------------------+------------------+-------+| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 || 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |+------------------+------------------+-------+
SELECT window_start, window_end, SUM(price) FROM TABLE( HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;+------------------+------------------+-------+| window_start | window_end | price |+------------------+------------------+-------+| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 || 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 || 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 || 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 |+------------------+------------------+-------+
SELECT window_start, window_end, SUM(price) FROM TABLE( CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;+------------------+------------------+-------+| window_start | window_end | price |+------------------+------------------+-------+| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 || 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 || 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 || 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 || 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 || 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 || 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 || 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |+------------------+------------------+-------+
复制代码

GROUPING SETS

窗口聚合支持 GROUPING SETS 语法(比 GROUP BY 更复杂的分组操作),行数据按照 GROUP SET 中设置分别分组,并进行计算。要求:window_startwindow_end 必须在 GROUP BY 子句中,但不在 GROUPING SETS 子句。


例子:

SELECT window_start, window_end, supplier_id, SUM(price) as price  FROM TABLE(    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))  GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());-- 在 Window 内,按照 supplier_id 分组,和部分组两个规则  +------------------+------------------+-------------+-------+|     window_start |       window_end | supplier_id | price |+------------------+------------------+-------------+-------+| 2020-04-15 08:00 | 2020-04-15 08:10 |      (NULL) | 11.00 || 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier2 |  5.00 || 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier1 |  6.00 || 2020-04-15 08:10 | 2020-04-15 08:20 |      (NULL) | 10.00 || 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier2 |  9.00 || 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier1 |  1.00 |+------------------+------------------+-------------+-------+
复制代码


GROUP SETS 中的表达式可以包含 0 个或多个字段,0 个表示所有行聚合到 1 组。

ROLLUP

ROLLUP 是一种简易表达方式,表示给定列表的所有前缀和空列表。

解释:ROLLUP(a, b) 等同于 GROUPING SETS ((a), (a, b), ())

SELECT window_start, window_end, supplier_id, SUM(price) as priceFROM TABLE(    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end, ROLLUP (supplier_id);
复制代码
CUBE

CUBE 是一种简易表达方式,表示给定列表的所有子集。

解释:CUBE(a, b) 等同于 GROUPING SETS ((a), (b), (a, b), ())

SELECT window_start, window_end, item, supplier_id, SUM(price) as price  FROM TABLE(    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))  GROUP BY window_start, window_end, CUBE (supplier_id, item);
复制代码

Cascading Window Aggregation

window_startwindow_end 是普通时间戳字段, 不是时间属性字段。如果需要基于 window 结果再次进行时间维度的聚合计算,需要使用 window_time 字段(Window TVF 提供的时间属性字段)。

-- tumbling 5 minutes for each supplier_idCREATE VIEW window1 ASSELECT window_start, window_end, window_time as rowtime, SUM(price) as partial_price  FROM TABLE(    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))  GROUP BY supplier_id, window_start, window_end, window_time;
-- tumbling 10 minutes on the first windowSELECT window_start, window_end, SUM(partial_price) as total_price FROM TABLE( TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;
复制代码

Group Window Aggregation

官方更推荐使用 Window TVF:

  • 性能优化

  • 支持 GROUPING SET 语法

  • 可以对 Window 聚合结果应用 Window TopN

Group Window Functions

Group Window Aggregation 在 GROUP BY 子句中定义。下面的 Group Window Function 支持批处理和流处理:


Time Attributes

在流处理中 SQL 查询的表中,分组窗口函数的 time_attr 参数必须引用一个合法的时间属性,且该属性需要指定 Row 的处理时间或事件时间。对于批处理的 SQL 查询,分组窗口函数的 time_attr 参数必须是一个 TIMESTAMP 类型的属性。


处理时间在 DDL 中定义

CREATE TABLE user_actions (  user_name STRING,  data STRING,  user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性) WITH (  ...);
复制代码


事件时间在 DDL 中定义

CREATE TABLE user_actions (  user_name STRING,  data STRING,  user_action_time TIMESTAMP(3),  -- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND) WITH (  ...);
复制代码

更多参考:Flink Table & SQL 时间属性

获取 Group Window 开始和结束时间


示例:

CREATE TABLE Orders (  user       BIGINT,  product    STIRNG,  amount     INT,  order_time TIMESTAMP(3),  WATERMARK FOR order_time AS order_time - INTERVAL '1' MINUTE) WITH (...);
SELECT user, TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart, SUM(amount) FROM OrdersGROUP BY TUMBLE(order_time, INTERVAL '1' DAY), user
复制代码

Group Aggregation

GROUP BY 聚合函数,聚合多行数据计算单个结果。


在流式查询中,要理解连续查询的概念,当有新的数据时,会计算出新的结果。因此计算查询的状态可能会无限增长,状态大小取决于组的数量和聚合函数的类型(Count 相比 Max/Min 需要更少的存储)。可以为状态设置 TTL 以防止状态过大,但这可能影响查询结果的正确性。

SELECT COUNT(*)FROM OrdersGROUP BY order_id
复制代码

DISTINCT Aggregation

分组后去重复,只相同数据只保留一组

SELECT COUNT(DISTINCT order_id) FROM Orders
复制代码

GROUPING SETS

比标准 GROUP BY 更复杂的分组操作,在 Windows TVF 中有介绍过,行数据按照 GROUP SET 中设置分别分组,并进行计算。

SELECT supplier_id, rating, COUNT(*) AS totalFROM ProductsGROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ())
+-------------+--------+-------+| supplier_id | rating | total |+-------------+--------+-------+| supplier1 | 4 | 1 || supplier1 | (NULL) | 2 || (NULL) | (NULL) | 4 || supplier1 | 3 | 1 || supplier2 | 3 | 1 || supplier2 | (NULL) | 2 || supplier2 | 4 | 1 |+-------------+--------+-------+
复制代码


ROLLUP 和 CUBE 是简易写法,参考 Windows TVF aggregation。

HAVING

与 SQL 规范中的 Having 用法相同,用于排除不满足条件的组

SELECT SUM(amount)FROM OrdersGROUP BY usersHAVING SUM(amount) > 50
复制代码

Over Aggregation

OVER 聚合计算一系列有序行上每个输入行的聚合值。与 GROUP BY 聚合相比,OVER 聚合不会将每个组的结果行数减少到一行,相反会为每个输入行生成一个聚合值。


下面的查询为每个订单计算当前订单前一小时内收到的同一产品的所有订单的金额总和。

SELECT order_id, order_time, amount,  SUM(amount) OVER (    PARTITION BY product    ORDER BY order_time    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW  ) AS one_hour_prod_amount_sumFROM Orders
复制代码


语法如下:

SELECT  agg_func(agg_col) OVER (    [PARTITION BY col1[, col2, ...]]    ORDER BY time_col    range_definition),  ...FROM ...
复制代码

可以在 SELECT 子句中定义多个 OVER 聚合。但是,对于流式查询,所有 OVER 聚合窗口必须相同。

ORDER BY

OVER 窗口定义在有序的行上。由于表没有固有的顺序,因此 ORDER BY 子句是必需的。


对于流查询,Flink 当前仅支持使用升序时间属性,不支持其他排序方式。

PARTITION BY

OVER 窗口可以定义在分区表上,可以在分区表上定义 OVER windows。在存在 PARTITION BY 子句的情况下,只对每个输入行的分区行计算聚合。

Range Definitions

Range Definition 指定了聚合要包含的列的范围,使用 BETWEEN 子句定义边界,上界只支持 CURRENT ROW。另外有两个参数可以设置:时间范围(RANGE intervals)和行数范围(ROW intervals)。

RANGE intervals

RANGE intervals 定义在 ORDER BY 字段上的值,在 Flink 中总是时间属性字段。下面定义了包含时间属性比当前行少最多 30 分钟的所有行范围:

RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW
复制代码

ROW intervals

ROWS interval 基于行数设置范围,定义了一次聚合包含多少行。下面定义了包含当前行以及当前行之前的 10 行数据:

ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
复制代码

WINDOW

可以使用 WINDOW 在 SELECT 外定义 OVER window,可以使查询可读性更好。

SELECT order_id, order_time, amount,  SUM(amount) OVER w AS sum_amount,  AVG(amount) OVER w AS avg_amountFROM OrdersWINDOW w AS (  PARTITION BY product  ORDER BY order_time  RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
复制代码

Set(集合操作)

Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c');Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b');
复制代码

UNION

UNION 和 UNION ALL 返回表的并集。UNION 会对相同行去重,UNION ALL 不会。

Flink SQL> (SELECT s FROM t1) UNION (SELECT s FROM t2);+---+|  s|+---+|  c||  a||  b||  d||  e|+---+
Flink SQL> (SELECT s FROM t1) UNION ALL (SELECT s FROM t2);+---+| c|+---+| c|| a|| b|| b|| c|| d|| e|| a|| b|| b|+---+
复制代码

INTERSECT

INTERSECT 和 INTERSECT ALL 返回两个表的交集。INTERSECT 会对相同行去重,INTERSECT ALL 不会。

Flink SQL> (SELECT s FROM t1) INTERSECT (SELECT s FROM t2);+---+|  s|+---+|  a||  b|+---+
Flink SQL> (SELECT s FROM t1) INTERSECT ALL (SELECT s FROM t2);+---+| s|+---+| a|| b|| b|+---+
复制代码

EXCEPT

EXCEPT 和 EXCEPT ALL 返回两个表的差集。EXCEPT 会对相同行去重,EXCEPT ALL 不会。

Flink SQL> (SELECT s FROM t1) EXCEPT (SELECT s FROM t2);+---+| s |+---+| c |+---+
Flink SQL> (SELECT s FROM t1) EXCEPT ALL (SELECT s FROM t2);+---+| s |+---+| c || c |+---+
复制代码

IN

确定给定的值是否与子查询或列表中的值相匹配

SELECT user, amountFROM OrdersWHERE product IN (    SELECT product FROM NewProducts)
复制代码

SQL 优化器会把 IN 重写为 JOIN 和 GROUP 子句。

EXISTS

指定一个子查询,检测行的存在。只有在 JOIN 和 GROUP 可以重写操作时支持。效果等同于 IN。

SELECT user, amountFROM OrdersWHERE product EXISTS (    SELECT product FROM NewProducts)
复制代码


Notes. RDBMS 中 IN 和 EXISTS 的区别:如果子查询得出的结果集记录较少,主查询中的表较大且又有索引时应该用 IN,反之如果外层的主查询记录较少,子查询中的表大,又有索引时使用 EXISTS。区分 IN 和 EXIST 主要是造成了驱动顺序的改变(这是性能变化的关键),如果是 EXISTS,那么以外层表为驱动表,先被访问,如果是 IN,那么先执行子查询,所以会以驱动表的快速返回为目标,那么就会考虑到索引及结果集的关系了,另外 IN 不对 NULL 进行处理。IN 是把外表和内表作 HASH 连接,而 EXISTS 是对外表作 LOOP 循环,每次 LOOP 循环再对内表进行查询。一直以来认为 EXISTS 比 IN 效率高的说法是不准确的。

ORDER BY

根据指定字段的值进行排序,如果有多个字段,根据最左边的字段排序,如果相同,根据下一个字段排序。


在流处理模式,必须优先使用时间属性字段排序,后续排序字段可自由选择。批处理没有限制。

SELECT *FROM OrdersORDER BY order_time, order_id
复制代码

LIMIT

约束查询的行数,通常要和 ORDER BY 一起使用,保证结果具有一定程度的确定性。

只有批处理支持(1.13)。

SELECT *FROM OrdersORDER BY orderTimeLIMIT 3
复制代码

Top-N

Top-N 查询是根据列排序找到 N 个最大或最小的值。最大值集和最小值集都被视为是一种 Top-N 的查询。得到的结果集可以进行进一步的分析。


Flink 使用 OVER 窗口条件和过滤条件相结合以进行 Top-N 查询。利用 OVER 窗口的 PARTITION BY 子句的功能,Flink 还支持分组 Top-N。例如,查询每个类别中实时销量最高的前五种产品。批处理表和流处理表都支持基于 SQL 的 Top-N 查询。以下是 TOP-N 表达式的语法:

SELECT [column_list]FROM (   SELECT [column_list],     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum   FROM table_name)WHERE rownum <= N [AND conditions]
复制代码

参数说明:

  • ROW_NUMBER():根据当前分区内的各行的顺序从第一行开始,依次为每一行分配一个唯一且连续的号码。目前,只支持 ROW_NUMBER 在 over 窗口函数中使用。未来将会支持 RANK() 和 DENSE_RANK() 函数。

  • PARTITION BY col1[, col2...]:指定分区列,每个分区都将会有一个 Top-N 结果。

  • ORDER BY col1 [asc|desc][, col2 [asc|desc]...]:指定排序列,不同列的排序方向可以不一样。

  • WHERE rownum <= N:Flink 需要 rownum <= N 才能识别一个查询是否为 Top-N 查询。其中,N 代表最大或最小的 N 条记录会被保留。

  • [AND conditions]:在 where 语句中,可以随意添加其他的查询条件,但其他条件只允许通过 AND 与 rownum <= N 结合使用。


必须严格遵循上述模式,否则优化器将无法转换查询。


流处理模式需要注意:Top-N 查询结果会更新。Flink SQL 会根据排序键对数据进行排序。如果 Top N 记录发生了变化,变化的部分会以 retraction(回溯)或 update(更新)的方式发送给下游。推荐使用一个支持更新的存储作为 Top-N 查询的 Sink。


另外,若 top N 记录需要存储到外部存储,则结果表需要拥有相同与 Top-N 查询相同的唯一键。


Top-N 的唯一键是分区列和 rownum 列的结合,Top-N 查询也可以获得上游的唯一键。在下面的例子中,product_id 是 ShopSales 的唯一键,然后 Top-N 的唯一键是 [category, rownum] 和 [product_id] 。下面的样例描述了如何指定带有 Top-N 的 SQL 查询(查询每个分类实时销量最大的五个产品)。

CREATE TABLE ShopSales (  product_id   STRING,  category     STRING,  product_name STRING,  sales        BIGINT) WITH (...);

SELECT *FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS row_num FROM ShopSales)WHERE row_num <= 5
复制代码

No Ranking Output Optimization(无排名输出优化)

row_num 字段会作为唯一键的其中一个字段写到结果表里面。当原始结果(名为 product-1001)从排序第九变化为排序第一时,排名 1-9 的所有结果都会以更新消息的形式发送到结果表(有 9 条消息)。若结果表收到太多的数据,会成为 SQL 任务的瓶颈。


优化方法是在 Top-N 查询的外部 SELECT 子句中省略 row_num 字段。由于前 N 条记录的数量通常不大,下游消费者可以自己对记录进行快速排序。去掉 row_num 字段后,上述的例子中,只有前 N 个记录中变化的记录(product-1001)需要发送到下游,从而可以节省大量的对结果表的 IO 操作。

-- SELECT 忽略 row_num 字段SELECT product_id, category, product_name, salesFROM (  SELECT *,    ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS row_num  FROM ShopSales)WHERE row_num <= 5
复制代码

使用流处理模式时需注意:为了使上述查询输出可以输出到外部存储并且结果正确,外部存储需要拥有与 Top-N 查询一致的唯一键。在上述的查询例子中,若 product_id 是查询的唯一键,那么外部表必须要有 product_id 作为其唯一键。

Window Top-N

只适用流处理模式


Window Top-N 是一个特殊的 Top-N,它返回每个窗口和其他分区键的 N 个最小或最大值。


对于流式查询,与常规 Top-N 不同,Window Top-N 不生成中间结果,只生成最终结果,即窗口结束时的 Top-N 记录。此外 Window Top-N 在不需要时会清除所有中间状态。即如果程序不需要每个记录更新结果,Window Top-N 查询的性能会更好。通常,与窗口聚合一起使用。


Window Top-N 可以用与常规 Top-N 相同的语法定义。除此之外,Window Top-N 要求 PARTITION BY 子句包含 window_startwindow_end 列。否则,优化器将无法转换查询。

SELECT [column_list]FROM (   SELECT [column_list],     ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum   FROM table_name) -- relation applied windowing TVFWHERE rownum <= N [AND conditions]
复制代码


假设有一个 Bid

Flink SQL> desc Bid;+-------------+------------------------+------+-----+--------+---------------------------------+|        name |                   type | null | key | extras |                       watermark |+-------------+------------------------+------+-----+--------+---------------------------------+|     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND ||       price |         DECIMAL(10, 2) | true |     |        |                                 ||        item |                 STRING | true |     |        |                                 || supplier_id |                 STRING | true |     |        |                                 |+-------------+------------------------+------+-----+--------+---------------------------------+

Flink SQL> SELECT * FROM Bid;+------------------+-------+------+-------------+| bidtime | price | item | supplier_id |+------------------+-------+------+-------------+| 2020-04-15 08:05 | 4.00 | A | supplier1 || 2020-04-15 08:06 | 4.00 | C | supplier2 || 2020-04-15 08:07 | 2.00 | G | supplier1 || 2020-04-15 08:08 | 2.00 | B | supplier3 || 2020-04-15 08:09 | 5.00 | D | supplier4 || 2020-04-15 08:11 | 2.00 | B | supplier3 || 2020-04-15 08:13 | 1.00 | E | supplier1 || 2020-04-15 08:15 | 3.00 | H | supplier2 || 2020-04-15 08:17 | 6.00 | F | supplier5 |+------------------+-------+------+-------------+
复制代码


计算每 10 分钟内销售额最高的前 3 名供应商。

SELECT *  FROM (    SELECT *,         ROW_NUMBER() OVER (PARTITION BY window_start, window_end             ORDER BY price DESC) as rownum    FROM (      SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt      FROM TABLE(        TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))      GROUP BY window_start, window_end, supplier_id    )  ) WHERE rownum <= 3;    +------------------+------------------+-------------+-------+-----+--------+|     window_start |       window_end | supplier_id | price | cnt | rownum |+------------------+------------------+-------------+-------+-----+--------+| 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier1 |  6.00 |   2 |      1 || 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier4 |  5.00 |   1 |      2 || 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier2 |  4.00 |   1 |      3 || 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier5 |  6.00 |   1 |      1 || 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier2 |  3.00 |   1 |      2 || 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier3 |  2.00 |   1 |      3 |+------------------+------------------+-------------+-------+-----+--------+
复制代码

使用限制:目前 Window Top-N 还不支持使用在 Windowing TVF 上。

Deduplication(去重)

在一组列上值重复的行,只保留第一行或最后一行数据。在某些情况下,上游的作业不能实现端到端的 Exactly-once,这将可能导致在故障恢复时,Sink 中有重复的记录,将影响下游分析作业的正确性(例如,SUM、COUNT),所以在进一步分析之前需要进行数据去重。


与 Top-N 查询相似,Flink 使用 ROW_NUMBER() 去除重复的记录。去重是一个特殊的 Top-N 查询,其中 N 是 1 ,记录是以处理时间或事件事件进行排序的。

SELECT [column_list]FROM (   SELECT [column_list],     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]       ORDER BY time_attr [asc|desc]) AS rownum   FROM table_name)WHERE rownum = 1
复制代码

参数说明:

  • ROW_NUMBER():从第一行开始,依次为每一行分配一个唯一且连续的号码。

  • PARTITION BY col1[, col2...]:指定分区的列,例如去重的键。

  • ORDER BY time_attr [asc|desc]:指定排序的列。所指定的列必须为时间属性字段。升序(ASC)排列指只保留第一行,而降序排列(DESC)则指保留最后一行。

  • WHERE rownum = 1:Flink 需要 rownum = 1 以确定该查询是否为去重查询。

注意:必须严格遵循上述模式,否则优化器将无法转换查询。


下面的示例演示如何使用流表上的重复数据消除来指定 SQL 查询。

CREATE TABLE Orders (  order_time  STRING,  user        STRING,  product     STRING,  num         BIGINT,  proctime AS PROCTIME()) WITH (...);

-- remove duplicate rows on order_id and keep the first occurrence row,-- because there shouldn't be two orders with the same order_id.SELECT order_id, user, product, numFROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) AS row_num FROM Orders)WHERE row_num = 1
复制代码

Pattern Recognition(模式匹配)

只适用流处理模式


搜索一组事件模式(event pattern)是一种常见的用例,尤其是在数据流情景中。Flink 提供复杂事件处理(CEP)库,允许在事件流中进行模式检测。


Flink 的 SQL API 提供了一种关系式的查询表达方式,其中包含大量内置函数和基于规则的优化,可以开箱即用。


2016 年 12 月,国际标准化组织(ISO)发布了新版本的 SQL 标准,其中包括在 SQL 中的行模式识别(Row Pattern Recognition in SQL)(ISO/IEC TR 19075-5:2016)。允许 Flink 使用 MATCH_RECOGNIZE 子句融合 CEP 和 SQL API,以便在 SQL 中进行复杂事件处理。


MATCH_RECOGNIZE 子句启用以下任务:

  • 使用 PARTITION BY 和 ORDER BY 子句对数据进行逻辑分区和排序。

  • 使用 PATTERN 子句定义要查找的行模式。这些模式使用类似于正则表达式的语法。

  • 在 DEFINE 子句中指定行模式变量的逻辑组合。

  • measures 是指在 MEASURES 子句中定义的表达式,这些表达式可用于 SQL 查询中的其他部分。

SELECT T.aid, T.bid, T.cidFROM MyTable    MATCH_RECOGNIZE (      PARTITION BY userid      ORDER BY proctime      MEASURES        A.id AS aid,        B.id AS bid,        C.id AS cid      PATTERN (A B C)      DEFINE        A AS name = 'a',        B AS name = 'b',        C AS name = 'c'    ) AS T
复制代码


更多参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/match_recognize/

Hints

SQL hints 是和 SQL 语句一起使用来改变执行计划的。

  • 增强 planner:没有完美的 planner,所以实现 SQL hints 让用户更好地控制执行是非常有意义的;

  • 增加元数据(或者统计信息):如“已扫描的表索引”和“一些键的倾斜信息”的一些统计数据对于查询来说是动态的,用 hints 来配置它们会非常方便,因为我们从 planner 获得的计划元数据通常不那么准确;

  • 算子(Operator)资源约束:在许多情况下,会为执行算子提供默认的资源配置,即最小并行度或托管内存(UDF 资源消耗)或特殊资源需求(GPU 或 SSD 磁盘)等,可以使用 SQL hints 非常灵活地为每个查询(非作业)配置资源。

表的动态选项

动态表选项允许动态地指定或覆盖表选项,不同于用 SQL DDL 或 Connect API 定义表的静态选项,这些选项可以在每个查询的每个表范围内灵活地指定。


因此,它非常适合用于交互式终端中的特定查询,例如,在 SQL-CLI 中,你可以通过添加动态选项 /*+ OPTIONS('csv.ignore-parse-errors'='true') */ 来指定忽略 CSV 源的解析错误。


动态选项默认值禁止使用,因为可能会更改查询的语义。需要将配置项 table.dynamic-table-options.enabled 显式设置为 true(默认值为 false, set table.dynamic-table-options.enabled=true

语法

为了不破坏 SQL 兼容性,使用 Oracle 风格的 SQL hints 语法:

table_path /*+ OPTIONS(key=val [, key=val]*) */
-- key: stringLiteral-- val: stringLiteral
复制代码

示例

CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);
-- 覆盖查询语句中源表的选项select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
-- 覆盖 join 中源表的选项select * from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1 join kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2 on t1.id = t2.id;
-- 覆盖插入语句中结果表的选项insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;
复制代码


发布于: 2021 年 06 月 11 日阅读数: 13
用户头像

Alex🐒

关注

还未添加个人签名 2020.04.30 加入

还未添加个人简介

评论

发布
暂无评论
【FlinkSQL】Flink SQL Query 语法(二)