【FlinkSQL】Flink SQL CREATE 语法
主要引用官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create
CREATE 语句用于注册表、视图或函数。注册后的表、视图和函数可以在 SQL 查询中使用。目前 Flink SQL 支持下列 CREATE 语句:
CREATE TABLE
CREATE DATABASE
CREATE VIEW
CREATE FUNCTION
执行 CREATE
可以使用 TableEnvironment 的 executeSql()
方法执行 CREATE 语句。若 CREATE 操作执行成功,executeSql()
方法返回OK
,否则会抛出异常。
SQL CLI
CREATE TABLE
以下语法概述了 CREATE TABLE 语法:
Columns
Physical / Regular Columns
Pyhsical Columns 数据库中已知的常规字段,定义了字段的名称、类型和顺序。Connectors 和 Formats 使用这些列(按定义的顺序)来进行配置。
Metadata Columns
Metadata Column 是 SQL 标准的扩展(可选项),允许访问连接器、格式化表中每一行数据,由 METADATA
关键字表示。
例如,可以使用 Metadata column 从 Kafka 记录中读取和写入时间戳,以进行基于时间的操作。
根据 Connector 和 Format 选择可使用的 Metadata column 。
下面定义了一个表,声明字段 record_time
使用 Metadata column timestamp
:
record_time
成为表结构的一部分,可以像常规列一样进行转换和存储:
如果列名使用 Metadata column 的标识键(kafka 中的 timesamp
),则可以简略写法:
如果列的数据类型与 Metadata column 的数据类型不同,则运行时将执行显式强制转换(这要求这两种数据类型是兼容的)。
默认情况下,Planner 假定 Metadata column 可以用于读写。但是,在许多情况下,外部系统提供的只读多于可写。因此,可以使用 VIRTUAL
关键字从持久化中排除元数据列(下面例子中的 offset
)。
Computed Columns
计算列是一个使用 column_name AS computed_column_expression
语法生成的虚拟列。由使用同一表中其他列的表达式生成,并且不会在表中进行物理存储。这个表达式可以包含物理列、常量、函数或变量的任意组合,但这个表达式不能存在任何子查询。
下面的例子是使用 cost AS price * quantity
定义的一个计算列
定义在 source table 上的计算列会在从数据源读取数据后被计算,可以在 SELECT 查询语句中使用。与使用 VIRTUAL
的 Metadata column 类似,计算列不会持久化。因此计算列不能作为 INSERT INTO 语句的目标(在 INSERT 语句中,SELECT 语句的 schema 需要与目标表不带有计算列的 schema 一致)。
计算列可用于为 CREATE TABLE 语句定义时间属性。Processing time 可以简单地通过使用了系统函数 PROCTIME()
的 proc AS PROCTIME()
语句进行定义。 Event time 可能需要从现有的字段中获得(例如,原始字段的类型不是 TIMESTAMP
或嵌套在 JSON 字符串中)。
WATERMARK
WATERMARK 定义了表的事件时间属性,其形式为 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
。
rowtime_column_name
把一个现有的列定义为事件时间的属性。该列的类型必须为 TIMESTAMP(3)
,且是 schema 中的顶层列(top-level column),也可以是一个计算列。
watermark_strategy_expression
定义了 watermark 的生成策略。允许使用包括计算列在内的任意非子查询表达式来计算 watermark;表达式的返回类型必须是 TIMESTAMP(3)
。仅当返回的 watermark 不为空且大于之前发出的 watermark 时才会被发出(以保证 watermark 递增)。
Flink 为每条记录的计算 watermark,定期(pipeline.auto-watermark-interval
)发出所生成的最大的 watermark(如果 watermark 为空或不大于之前的 watermark 不发出)。若 watermark 的间隔(pipeline.auto-watermark-interval
)是 0ms,那么每条记录都会产生一个 watermark(根据前述的规则发出)。
使用事件时间(Event time)语义时,表必须包含事件时间属性和 watermark 策略。
Flink 提供了几种常用的 watermark 策略:
严格递增时间戳(Strictly ascending timestamps):
WATERMARK FOR rowtime_column AS rowtime_column
发出到目前为止已观察到的最大时间戳的 watermark,时间戳大于最大时间戳的 Row 被认为没有迟到
递增时间戳(Ascending timestamps):
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
发出到目前为止已观察到的最大时间戳减 1 的 watermark,时间戳大于或等于最大时间戳的 Row 被认为没有迟到
有界乱序时间戳(Bounded out of orderness timestamps):
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit
发出到目前为止已观察到的最大时间戳减去指定延迟的 watermark,例如,
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND
是一个 5 秒延迟的 watermark 策略
PRIMARY KEY
主键约束是 Flink 优化的一种提示信息,表明一张表或视图的某个(些)列是唯一的并且不包含 Null 值。主键声明的列都是非空的,可以被用作表中每行的唯一标识。
主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,主键都不可以重复定义,否则 Flink 会报错。
有效性检查
SQL 标准主键限制可以有两种模式:ENFORCED
或者 NOT ENFORCED
。 申明了是否输入/输出数据做检查(是否唯一)。Flink 只支持 NOT ENFORCED
模式,<u>用户需要自己保证唯一性</u>。
Flink 假定声明了主键的列都是不包含 Null 值的,Connector 在处理数据时需要自己保证语义正确。
在 CREATE TABLE 语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。
PARTITIONED BY
根据指定的列对已经创建的表进行分区。若表使用 filesystem sink ,则将会为每个分区创建一个目录。
WITH Options
Table properties 用于创建 table source/sink,一般用于寻找和创建底层的连接器(Connector)。
表达式 key1=val1 的键和值必须为字符串字面量。不同 Connector 有各自的 properties。
Note. 表名可以为以下三种格式 1. catalog_name.db_name.table_name
2. db_name.table_name
3. table_name
。使用 catalog_name.db_name.table_name
的表将会与名为 catalog_name 的 catalog 和名为 db_name 的数据库一起注册到 metastore 中;使用 db_name.table_name
的表将会被注册到当前执行的 table environment 中的 catalog 且数据库会被命名为 db_name;对于 table_name
,数据表将会被注册到当前正在运行的 catalog 和数据库中。
Note. 使用 CREATE TABLE 语句注册的表均可用作 table source 和 table sink。 在被 DML 语句引用前,无法决定其实际用于 source 或是 sink。
LIKE
LIKE 子句可以基于现有表的定义去创建新表,并且可以扩展或排除原始表中的某些部分。LIKE 子句必须在 CREATE 语句中定义,并且是基于 CREATE 语句的更上层定义。LIKE 子句可以用于定义表的多个部分,而不仅仅是 schema 部分(可以重用或改写指定的连接器配置属性或者添加 watermark 定义)。
示例如下:
Orders_with_watermark 表等效于使用以下语句创建的表:
Merge Table
表属性的合并逻辑可以用 like options 来控制。可以控制合并的表属性如下:
CONSTRAINTS - 主键和唯一键约束
GENERATED - 计算列
OPTIONS - 连接器信息、格式化方式等配置项
PARTITIONS - 表分区信息
WATERMARKS - watermark 定义
并且有三种不同的表属性合并策略:
INCLUDING - 新表包含源表(source table)所有的表属性,如果和源表的表属性重复则会直接失败,如新表和源表存在相同 key 的属性。
EXCLUDING - 新表不包含源表指定的任何表属性。
OVERWRITING - 新表包含源表的表属性,但如果出现重复项,则会用新表的表属性覆盖源表中的重复表属性,如新表和源表存在相同 key 的属性,则会使用当前语句中定义的 key 的属性值。
可以使用 INCLUDING
/EXCLUDING ALL
这种声明方式来指定使用怎样的合并策略,例如使用 EXCLUDING ALL INCLUDING WATERMARKS
,代表只有源表的 WATERMARKS 属性才会被包含进新表。示例如下:
默认将使用 INCLUDING ALL OVERWRITING OPTIONS
的合并策略。
无法选择 physical columns 的合并策略,会按照 INCLUDING
策略合并。
CREATE CATALOG
Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。
更多参考官方文档
CREATE DATABASE
根据给定的属性创建数据库。若数据库中已存在同名表会抛出异常。
IF NOT EXISTS
若数据库已经存在,则不会进行任何操作。
WITH OPTIONS
数据库属性一般用于存储关于这个数据库额外的信息。 表达式 key1=val1 中的键和值都需要是字符串字面量。
CREATE VIEW
根据给定的 query 语句创建一个视图。若数据库中已经存在同名视图会抛出异常。
TEMPORARY
创建一个有 catalog 和数据库命名空间的临时视图,并覆盖原有的视图。
IF NOT EXISTS
若该视图已经存在,则不会进行任何操作。
CREATE FUNCTION
创建 function,可以指定 catalog 和 database,若 catalog 中,已经有同名的函数注册了,则无法注册。
LANGUAGE JAVA|SCALA|PYTHON 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA, SCALA 和 PYTHON,且函数的默认语言为 JAVA。
如果是 JAVA 或者 SCALA,则 identifier 是 UDF 实现类的全限定名
如果是 PYTHON,则 identifier 是 UDF 对象的全限定名
如果是 PYTHON,而当前程序是 Java/Scala 程序或者 SQL 程序,则需要配置 Python 相关的依赖
TEMPORARY
创建一个临时 catalog function,有 catalog 和 database,并覆盖原有的 function 。
TEMPORARY SYSTEM
创建一个临时 system function,有 catalog,没有 database,并覆盖系统内置的 function。
IF NOT EXISTS
若该函数已经存在,则不会进行任何操作。
版权声明: 本文为 InfoQ 作者【Alex🐒】的原创文章。
原文链接:【http://xie.infoq.cn/article/0a5e06b8f508ec762d0b0f8da】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论