主要引用官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/overview/
SELECT 语句和 VALUES 语句需要使用 TableEnvironment 的 sqlQuery()
方法加以指定,会以 Table 的形式返回 SELECT (或 VALUE)的查询结果。Table 可被用于 SQL 或 Table API 查询、转换为 DataSet 或 DataStream、输出到 TableSink。SQL 与 Table API 的查询可以进行无缝融合、整体优化。
为了可以在 SQL 查询中访问到表,需要先在 TableEnvironment 中注册表(可以通过 TableSource、Table、CREATE TABLE 语句、DataStream 或 DataSet 注册)。为方便起见 Table.toString() 将会在其 TableEnvironment 中以唯一的名称自动注册表,并返回名称。
注意: 查询若包括了不支持的 SQL 特性,将会抛出 TableException
。
指定查询
以下示例显示如何在已注册和内联表上指定 SQL 查询。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 从外部数据源获取一个 DataStream
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
// 查询一个未注册的 Table
Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount"));
Table result = tableEnv.sqlQuery(
"SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");
// 查询一个注册的 Table
tableEnv.createTemporaryView("Orders", ds, $("user"), $("product"), $("amount"));
// 执行 sqlQuery() 返回 Table 对象
Table result2 = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
final Schema schema = new Schema()
.field("product", DataTypes.STRING())
.field("amount", DataTypes.INT());
// 创建并注册 TableSink
tableEnv.connect(new FileSystem().path("/path/to/file"))
.withFormat(...)
.withSchema(schema)
.createTemporaryTable("RubberOrders");
// 调用 executeSql() 执行 INSERT SQL,查询结果写入 TableSink
tableEnv.executeSql(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
复制代码
执行查询
SELECT 语句或者 VALUES 语句可以通过 TableEnvironment.executeSql()
方法来执行,该方法返回 TableResult 对象用于包装查询的结果,一个 Table 对象可以通过 Table.execute()
方法执行获取查询结果。TableResult.collect()
方法返回一个可以关闭的行迭代器(除非所有的数据都被收集到本地,否则一个查询作业永远不会结束。所以通过 CloseableIterator#close()
方法主动地关闭作业以防止资源泄露)。 还可以通过 TableResult.print()
方法将查询结果打印到控制台。TableResult 中的结果数据只能被访问一次,因此一个 TableResult 实例中,collect()
方法和 print()
方法不能被同时使用。
TableResult.collect()
与 TableResult.print()
的行为在不同的 checkpointing 模式下略有不同。
对于批作业或没有配置任何 checkpointing 的流作业,TableResult.collect()
与 TableResult.print()
既不保证 Exactly-once
、也不保证 At-least-once
。查询结果在产生后可被客户端即刻访问,但作业失败和重启时将会报错。
对于配置了 Exactly-once checkpointing 的流作业,TableResult.collect()
与 TableResult.print()
保证 Exactly-once
。一条结果数据只有在其关联的 checkpointing 完成后才能在客户端被访问。
对于配置了 At-least-once checkpointing 的流作业,TableResult.collect()
与 TableResult.print()
保证 At-least-once
。查询结果在产生后可被客户端即刻访问,同一条结果可能被多次传递给客户端。
语法
Flink 通过支持标准 ANSI SQL 的 Apache Calcite 解析 SQL。以下“BNF-语法”描述了批处理和流处理查询中所支持的 SQL 特性的超集。
query:
values
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| UNNEST '(' expression ')'
tablePath:
[ [ catalogName . ] schemaName . ] tableName
systemTimePeriod:
FOR SYSTEM_TIME AS OF dateTimeExpression
dynamicTableOptions:
/*+ OPTIONS(key=val [, key=val]*) */
key:
stringLiteral
val:
stringLiteral
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
matchRecognize:
MATCH_RECOGNIZE '('
[ PARTITION BY expression [, expression ]* ]
[ ORDER BY orderItem [, orderItem ]* ]
[ MEASURES measureColumn [, measureColumn ]* ]
[ ONE ROW PER MATCH ]
[ AFTER MATCH
( SKIP TO NEXT ROW
| SKIP PAST LAST ROW
| SKIP TO FIRST variable
| SKIP TO LAST variable
| SKIP TO variable )
]
PATTERN '(' pattern ')'
[ WITHIN intervalLiteral ]
DEFINE variable AS condition [, variable AS condition ]*
')'
measureColumn:
expression AS alias
pattern:
patternTerm [ '|' patternTerm ]*
patternTerm:
patternFactor [ patternFactor ]*
patternFactor:
variable [ patternQuantifier ]
patternQuantifier:
'*'
| '*?'
| '+'
| '+?'
| '?'
| '??'
| '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
| '{' repeat '}'
复制代码
Flink SQL 对于标识符(表、属性、函数名)的命名策略类似于 Java 的词法约定:
字符串文本常量需要被单引号包起来(如 SELECT 'Hello World'
)。两个单引号表示转义(如 SELECT 'It''s me.'
)。字符串文本常量支持 Unicode 字符,如需明确使用 Unicode 编码,请使用以下语法:
操作符
WITH
WITH 提供了编写辅助语句的方法,以便在更大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression,CTE),可以认为它定义了只存在于一个查询中的临时视图。
WITH 语法:
WITH <with_item_definition> [ , ... ]
SELECT ... FROM ...;
<with_item_defintion>:
with_item_name (column_name[, ...n]) AS ( <select_query> )
复制代码
下面的示例定义了一个 CTE:orders_with_total
,并在 GROUP BY 查询中使用它。
WITH orders_with_total AS (
SELECT order_id, price + tax AS total
FROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id;
复制代码
SELECT & WHERE
SELECT 语句的一般语法为:
SELECT select_list FROM table_expression [ WHERE boolean_expression ]
复制代码
table_expression
可以是任何数据源(表、视图、VALUES 子句、多个表的 Join 结果、子查询)。下面的事例读取 Orders 表的所有列:
select_list
指定 *
表示解析所有的列,但是不建议在生产环境中使用,会降低性能,建议只查询需要的列:
SELECT order_id, price + tax FROM Orders
复制代码
查询可以使用 VALUES 子句,每个元组(Tuple)对应一个 Row,并且可以设置别名:
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price)
复制代码
WHERE 语句可以过滤 Row:
SELECT price + tax FROM Orders WHERE id = 10
复制代码
可以对每行数据的指定列调用函数(内置、自定义函数,自定义函数必须提前注册):
SELECT PRETTY_PRINT(order_id) FROM Orders
复制代码
SELECT DISTINCT
如果指定 SELECT DISTINCT,则将从结果集中删除重复行(每组重复中保留一行)。
SELECT DISTINCT id FROM Orders
复制代码
对于流式查询,计算查询结果所需的状态(State)可能会无限增长。状态大小取决于不同行的数量。可以为查询配置适当的状态生存时间(TTL),以防止状态大小过大。这可能会影响查询结果的正确性。
Windowing TVF(1.13)
Window 是流处理的核心。Windows 将流拆分为有限大小的片段应用计算。只有流处理支持。
Flink 1.13 提供了几个 Table-valued functions(TVF,区别于 Group Window Function),将表中的元素划分为 windows,包括:
每个元素在逻辑上可以属于多个窗口,具体取决于所使用的窗口函数。TVF 必须和聚合操作一起使用:
假设存在一个 Bid
表
Flink SQL> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
| name | type | null | key | extras | watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
| price | DECIMAL(10, 2) | true | | | |
| item | STRING | true | | | |
+-------------+------------------------+------+-----+--------+---------------------------------+
Flink SQL> SELECT * FROM Bid;
+------------------+-------+------+
| bidtime | price | item |
+------------------+-------+------+
| 2020-04-15 08:05 | 4.00 | C |
| 2020-04-15 08:07 | 2.00 | A |
| 2020-04-15 08:09 | 5.00 | D |
| 2020-04-15 08:11 | 3.00 | B |
| 2020-04-15 08:13 | 1.00 | E |
| 2020-04-15 08:17 | 6.00 | F |
+------------------+-------+------+
复制代码
滚动窗口(Tumbling windows)
指定一个固定大小的窗口,并且不重叠,语法:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size)
-- data: 表名,表需要有时间属性字段
-- timecol: 表中的时间属性字段,用于划分窗口
-- size: 窗口大小
复制代码
设定一个 10 分钟大小的滚动窗口,
SELECT window_start, window_end, SUM(price)
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
| window_start | window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+
复制代码
滑动窗口(Hop, Sliding windows)
指定一个固定大小的窗口,设定滑动间隔,元素会被指定给多个窗口,语法:
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
-- data: 表名,表需要有时间属性字段
-- timecol: 表中的时间属性字段,用于划分窗口
-- size: 窗口大小
-- slide:窗口滑动的大小
复制代码
设定一个 10 分钟大小,每 5 分钟滑动的窗口,
SELECT window_start, window_end, SUM(price)
FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
| window_start | window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 |
+------------------+------------------+-------+
复制代码
累加窗口(Cumulate windows)
指定一个窗口的最大规模,按照指定时间间隔增长累加,直到达到窗口的最大规模,每次窗口增长会进行一次计算,可以理解为多次计算的滚动窗口,语法:
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
-- data: 表名,表需要有时间属性字段
-- timecol: 表中的时间属性字段,用于划分窗口
-- size: 窗口最大大小
-- step:窗口增长大小
复制代码
设定一个 10 分钟大小,每 2 分钟累计一次的窗口,
SELECT window_start, window_end, SUM(price)
FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
| window_start | window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 |
| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 |
| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+
复制代码
评论