写点什么

【FlinkSQL】Flink SQL CREATE 语法

用户头像
Alex🐒
关注
发布于: 2021 年 06 月 10 日

主要引用官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create


CREATE 语句用于注册表、视图或函数。注册后的表、视图和函数可以在 SQL 查询中使用。目前 Flink SQL 支持下列 CREATE 语句:

  • CREATE TABLE

  • CREATE DATABASE

  • CREATE VIEW

  • CREATE FUNCTION

执行 CREATE

可以使用 TableEnvironment 的 executeSql() 方法执行 CREATE 语句。若 CREATE 操作执行成功,executeSql() 方法返回OK,否则会抛出异常。

EnvironmentSettings settings = EnvironmentSettings.newInstance()...TableEnvironment tableEnv = TableEnvironment.create(settings);
// 对已注册的表进行 SQL 查询// 注册名为 “Orders” 的表tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");// 在表上执行 SQL 查询,并把得到的结果作为一个新的表Table result = tableEnv.sqlQuery( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
// 对已注册的表进行 INSERT 操作// 注册 TableSinktableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");// 在表上执行 INSERT 语句并向 TableSink 发出结果tableEnv.executeSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
复制代码


SQL CLI

Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);[INFO] Table has been created.
Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);[INFO] Table has been created.
Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';[INFO] Submitting SQL update statement to the cluster...
复制代码

CREATE TABLE

以下语法概述了 CREATE TABLE 语法:

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name  (    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]    [ <watermark_definition> ]    [ <table_constraint> ][ , ...n]  )  [COMMENT table_comment]  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]  WITH (key1=val1, key2=val2, ...)  [ LIKE source_table [( <like_options> )] ]   <physical_column_definition>:  column_name column_type [ <column_constraint> ] [COMMENT column_comment]  <column_constraint>:  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
<table_constraint>: [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<metadata_column_definition>: column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
<computed_column_definition>: column_name AS computed_column_expression [COMMENT column_comment]
<watermark_definition>: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
<source_table>: [catalog_name.][db_name.]table_name
<like_options>:{ { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS } | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } }[, ...]
复制代码

Columns

Physical / Regular Columns

Pyhsical Columns 数据库中已知的常规字段,定义了字段的名称、类型和顺序。Connectors 和 Formats 使用这些列(按定义的顺序)来进行配置。

CREATE TABLE MyTable (  `user_id` BIGINT,  `name` STRING) WITH (  ...);
复制代码

Metadata Columns

Metadata Column 是 SQL 标准的扩展(可选项),允许访问连接器、格式化表中每一行数据,由 METADATA 关键字表示。


例如,可以使用 Metadata column 从 Kafka 记录中读取和写入时间戳,以进行基于时间的操作。


根据 Connector 和 Format 选择可使用的 Metadata column 。


下面定义了一个表,声明字段 record_time 使用 Metadata column timestamp

CREATE TABLE MyTable (  `user_id` BIGINT,  `name` STRING,  `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'    -- reads and writes a Kafka record's timestamp) WITH (  'connector' = 'kafka'  ...);
复制代码


record_time 成为表结构的一部分,可以像常规列一样进行转换和存储:

INSERT INTO MyTable SELECT user_id, name, record_time + INTERVAL '1' SECOND FROM MyTable;
复制代码


如果列名使用 Metadata column 的标识键(kafka 中的 timesamp),则可以简略写法:

CREATE TABLE MyTable (  `user_id` BIGINT,  `name` STRING,  `timestamp` TIMESTAMP_LTZ(3) METADATA    -- use column name as metadata key) WITH (  'connector' = 'kafka'  ...);
复制代码


如果列的数据类型与 Metadata column 的数据类型不同,则运行时将执行显式强制转换(这要求这两种数据类型是兼容的)。

CREATE TABLE MyTable (  `user_id` BIGINT,  `name` STRING,  `timestamp` BIGINT METADATA    -- cast the timestamp as BIGINT) WITH (  'connector' = 'kafka'  ...);
复制代码


默认情况下,Planner 假定 Metadata column 可以用于读写。但是,在许多情况下,外部系统提供的只读多于可写。因此,可以使用 VIRTUAL 关键字从持久化中排除元数据列(下面例子中的 offset)。

CREATE TABLE MyTable (  `timestamp` BIGINT METADATA,       -- part of the query-to-sink schema  `offset` BIGINT METADATA VIRTUAL,  -- not part of the query-to-sink schema  `user_id` BIGINT,  `name` STRING,) WITH (  'connector' = 'kafka'  ...);
复制代码

Computed Columns

计算列是一个使用 column_name AS computed_column_expression 语法生成的虚拟列。由使用同一表中其他列的表达式生成,并且不会在表中进行物理存储。这个表达式可以包含物理列、常量、函数或变量的任意组合,但这个表达式不能存在任何子查询。


下面的例子是使用 cost AS price * quantity 定义的一个计算列

CREATE TABLE MyTable (  `user_id` BIGINT,  `price` DOUBLE,  `quantity` DOUBLE,  `cost` AS price * quanitity,  -- evaluate expression and supply the result to queries) WITH (  'connector' = 'kafka'  ...);
复制代码


定义在 source table 上的计算列会在从数据源读取数据后被计算,可以在 SELECT 查询语句中使用。与使用 VIRTUAL 的 Metadata column 类似,计算列不会持久化。因此计算列不能作为 INSERT INTO 语句的目标(在 INSERT 语句中,SELECT 语句的 schema 需要与目标表不带有计算列的 schema 一致)。


计算列可用于为 CREATE TABLE 语句定义时间属性。Processing time 可以简单地通过使用了系统函数 PROCTIME()proc AS PROCTIME() 语句进行定义。 Event time 可能需要从现有的字段中获得(例如,原始字段的类型不是 TIMESTAMP 或嵌套在 JSON 字符串中)。

WATERMARK

WATERMARK 定义了表的事件时间属性,其形式为 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression


rowtime_column_name 把一个现有的列定义为事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列(top-level column),也可以是一个计算列。


watermark_strategy_expression 定义了 watermark 的生成策略。允许使用包括计算列在内的任意非子查询表达式来计算 watermark;表达式的返回类型必须是 TIMESTAMP(3)。仅当返回的 watermark 不为空且大于之前发出的 watermark 时才会被发出(以保证 watermark 递增)。


Flink 为每条记录的计算 watermark,定期(pipeline.auto-watermark-interval)发出所生成的最大的 watermark(如果 watermark 为空或不大于之前的 watermark 不发出)。若 watermark 的间隔(pipeline.auto-watermark-interval)是 0ms,那么每条记录都会产生一个 watermark(根据前述的规则发出)。


使用事件时间(Event time)语义时,表必须包含事件时间属性和 watermark 策略。


Flink 提供了几种常用的 watermark 策略:

  • 严格递增时间戳(Strictly ascending timestamps):WATERMARK FOR rowtime_column AS rowtime_column

  • 发出到目前为止已观察到的最大时间戳的 watermark,时间戳大于最大时间戳的 Row 被认为没有迟到

  • 递增时间戳(Ascending timestamps): WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

  • 发出到目前为止已观察到的最大时间戳减 1 的 watermark,时间戳大于或等于最大时间戳的 Row 被认为没有迟到

  • 有界乱序时间戳(Bounded out of orderness timestamps):WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit

  • 发出到目前为止已观察到的最大时间戳减去指定延迟的 watermark,例如,WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND 是一个 5 秒延迟的 watermark 策略


CREATE TABLE Orders (    `user` BIGINT,    product STRING,    order_time TIMESTAMP(3),    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND) WITH ( . . . );
复制代码

PRIMARY KEY

主键约束是 Flink 优化的一种提示信息,表明一张表或视图的某个(些)列是唯一的并且不包含 Null 值。主键声明的列都是非空的,可以被用作表中每行的唯一标识。


主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,主键都不可以重复定义,否则 Flink 会报错。

有效性检查

SQL 标准主键限制可以有两种模式:ENFORCED 或者 NOT ENFORCED。 申明了是否输入/输出数据做检查(是否唯一)。Flink 只支持 NOT ENFORCED 模式,<u>用户需要自己保证唯一性</u>。


Flink 假定声明了主键的列都是不包含 Null 值的,Connector 在处理数据时需要自己保证语义正确。


在 CREATE TABLE 语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。

PARTITIONED BY

根据指定的列对已经创建的表进行分区。若表使用 filesystem sink ,则将会为每个分区创建一个目录。

WITH Options

Table properties 用于创建 table source/sink,一般用于寻找和创建底层的连接器(Connector)。


表达式 key1=val1 的键和值必须为字符串字面量。不同 Connector 有各自的 properties。


Note. 表名可以为以下三种格式 1. catalog_name.db_name.table_name 2. db_name.table_name 3. table_name。使用 catalog_name.db_name.table_name 的表将会与名为 catalog_name 的 catalog 和名为 db_name 的数据库一起注册到 metastore 中;使用 db_name.table_name 的表将会被注册到当前执行的 table environment 中的 catalog 且数据库会被命名为 db_name;对于 table_name,数据表将会被注册到当前正在运行的 catalog 和数据库中。


Note. 使用 CREATE TABLE 语句注册的表均可用作 table source 和 table sink。 在被 DML 语句引用前,无法决定其实际用于 source 或是 sink。

LIKE

LIKE 子句可以基于现有表的定义去创建新表,并且可以扩展或排除原始表中的某些部分。LIKE 子句必须在 CREATE 语句中定义,并且是基于 CREATE 语句的更上层定义。LIKE 子句可以用于定义表的多个部分,而不仅仅是 schema 部分(可以重用或改写指定的连接器配置属性或者添加 watermark 定义)。


示例如下:

CREATE TABLE Orders (    `user` BIGINT,    product STRING,    order_time TIMESTAMP(3)) WITH (     'connector' = 'kafka',    'scan.startup.mode' = 'earliest-offset');
CREATE TABLE Orders_with_watermark ( -- 添加 watermark 定义 WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( -- 改写 startup-mode 属性 'scan.startup.mode' = 'latest-offset')LIKE Orders;
复制代码


Orders_with_watermark 表等效于使用以下语句创建的表:

CREATE TABLE Orders_with_watermark (    `user` BIGINT,    product STRING,    order_time TIMESTAMP(3),    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH (    'connector' = 'kafka',    'scan.startup.mode' = 'latest-offset');
复制代码

Merge Table

表属性的合并逻辑可以用 like options 来控制。可以控制合并的表属性如下:

  • CONSTRAINTS - 主键和唯一键约束

  • GENERATED - 计算列

  • OPTIONS - 连接器信息、格式化方式等配置项

  • PARTITIONS - 表分区信息

  • WATERMARKS - watermark 定义


并且有三种不同的表属性合并策略:

  • INCLUDING - 新表包含源表(source table)所有的表属性,如果和源表的表属性重复则会直接失败,如新表和源表存在相同 key 的属性。

  • EXCLUDING - 新表不包含源表指定的任何表属性。

  • OVERWRITING - 新表包含源表的表属性,但如果出现重复项,则会用新表的表属性覆盖源表中的重复表属性,如新表和源表存在相同 key 的属性,则会使用当前语句中定义的 key 的属性值。


可以使用 INCLUDING/EXCLUDING ALL 这种声明方式来指定使用怎样的合并策略,例如使用 EXCLUDING ALL INCLUDING WATERMARKS,代表只有源表的 WATERMARKS 属性才会被包含进新表。示例如下:

-- 存储在文件系统的源表CREATE TABLE Orders_in_file (    `user` BIGINT,    product STRING,    order_time_string STRING,    order_time AS to_timestamp(order_time))PARTITIONED BY (`user`) WITH (     'connector' = 'filesystem',    'path' = '...');
-- 对应存储在 kafka 的源表CREATE TABLE Orders_in_kafka ( -- 添加 watermark 定义 WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', ...)LIKE Orders_in_file ( -- 排除需要生成 watermark 的计算列之外的所有内容。 -- 去除不适用于 kafka 的所有分区和文件系统的相关属性。 EXCLUDING ALL INCLUDING GENERATED);
复制代码

默认将使用 INCLUDING ALL OVERWRITING OPTIONS 的合并策略。

无法选择 physical columns 的合并策略,会按照 INCLUDING 策略合并。

CREATE CATALOG

CREATE CATALOG catalog_name  WITH (key1=val1, key2=val2, ...)
复制代码

Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

更多参考官方文档

CREATE DATABASE

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name  [COMMENT database_comment]  WITH (key1=val1, key2=val2, ...)
复制代码

根据给定的属性创建数据库。若数据库中已存在同名表会抛出异常。


IF NOT EXISTS

若数据库已经存在,则不会进行任何操作。


WITH OPTIONS

数据库属性一般用于存储关于这个数据库额外的信息。 表达式 key1=val1 中的键和值都需要是字符串字面量。

CREATE VIEW

CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name  [( columnName [, columnName ]* )] [COMMENT view_comment]  AS query_expression
复制代码


根据给定的 query 语句创建一个视图。若数据库中已经存在同名视图会抛出异常。


TEMPORARY

创建一个有 catalog 和数据库命名空间的临时视图,并覆盖原有的视图。


IF NOT EXISTS

若该视图已经存在,则不会进行任何操作。

CREATE FUNCTION

CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION   [IF NOT EXISTS] [catalog_name.][db_name.]function_name   AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
复制代码


创建 function,可以指定 catalog 和 database,若 catalog 中,已经有同名的函数注册了,则无法注册。


LANGUAGE JAVA|SCALA|PYTHON 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA, SCALA 和 PYTHON,且函数的默认语言为 JAVA。

  • 如果是 JAVA 或者 SCALA,则 identifier 是 UDF 实现类的全限定名

  • 如果是 PYTHON,则 identifier 是 UDF 对象的全限定名

  • 如果是 PYTHON,而当前程序是 Java/Scala 程序或者 SQL 程序,则需要配置 Python 相关的依赖


TEMPORARY

创建一个临时 catalog function,有 catalog 和 database,并覆盖原有的 function 。


TEMPORARY SYSTEM

创建一个临时 system function,有 catalog,没有 database,并覆盖系统内置的 function。


IF NOT EXISTS

若该函数已经存在,则不会进行任何操作。

发布于: 2021 年 06 月 10 日阅读数: 28
用户头像

Alex🐒

关注

还未添加个人签名 2020.04.30 加入

还未添加个人简介

评论

发布
暂无评论
【FlinkSQL】Flink SQL CREATE 语法