写点什么

大数据开发之 Flink SQL 建设实时数仓实践

  • 2022 年 1 月 19 日
  • 本文字数:6122 字

    阅读完需:约 20 分钟

​个推 Flink SQL 使用现状


在 SQL 模式下,个推通过 jar+SQL 文件+配置参数的方式使用 Flink。其中 jar 是基于 Flink 封装的执行 SQL 文件的执行 jar,提交命令示例如下:


/opt/flink/bin/flink run -m yarn-cluster -ynm KafkaSourceHbaseSinkCaseTestSql


-c ${mainClassName}


${jarPath}


--flink.parallelism 40


--mode stream


--sql.file.path ${sqlFile}


SQL 文件内容示例如下:


create table kafka_table(ts bigint,username string,num bigint) with ('connector' = 'kafka','topic' = 'test','properties.bootstrap.servers' = '','properties.group.id' = 'test-consumer001','scan.startup.mode' = 'latest-offset','format' = 'csv')


create table sink_table(ts bigint,username string,num bigint) with ('connector' = 'kafka','topic' = 'test2','properties.bootstrap.servers' = '','format' = 'json')


insert into sink_table select * from kafka_table


在将原有的 Spark Streaming 实时计算任务改造成 SQL 的过程中,我们发现了许多原生 Flink SQL 无法支持的需求,比如:


写 hbase 指定时间戳:原生 Flink SQL 写 hbase 的时间戳无法由数据时间指定。写 hbase 支持数据字段指定 qualifier:原生 Flink SQL 注册 hbase 表时就需要指定 qualifier,无法使用数据字段的值作为 qualifier。中间表注册:目前注册表只能调用 table api 实现。kafka source 数据预截断:由于业务原因,部分数据源写入 kafka 的数据默认增加指定前缀,解析前需要预截断。kafka schema 不匹配:由于业务原因,上游写入 csv 格式数据前会追加字段,导致和 schema 不匹配,数据无法解析。针对以上大部分场景,我们均结大数据培训合业务特色需求,对 Flink SQL 进行了拓展适配。本文从中间表注册入手,分享 Flink SQL 正确使用姿势。


Flink 中 SQL 的处理流程


为了帮助大家更好地理解中间表注册问题,我们先整体梳理下 Flink 中 SQL 的执行逻辑,如下图:


整个流程可以大致拆解为以下几个步骤:


1、SqlParser 解析阶段(SQL -> SqlNode)


Flink 的 Calcite 使用 JavaCC,根据 Parser.jj 生成 SqlParser(实际类名为 SqlParserImpl)。SqlParser 负责将 SQL 解析为 AST 语法树,数据类型为 SqlNode。


2、Validator 验证阶段


第一阶段后生成的 AST 树中,对字段、函数等并没有进行验证。第二阶段会进行校验,校验内容包括表名、字段名、函数名、数据类型等。


3、逻辑计划(SqlNode -> RelNode/RexNode)


经过语法校验的 AST 树经过 SqlToRelConverter.convertQuery 调用,将 SQL 转换为 RelNode,即生成逻辑计划 LogicalPlan。需要注意的是,Flink 为了统一“table api”和“sql 执行”两种方式,会在这个阶段将 RelNode 封装成 Operation。


4、优化器(RelNode -> LogicalNode -> ExecNode)


优化器的作用是将关系代数表达式(RelNode)转换为执行计划,用于执行引擎执行。优化器会使用过滤条件的下压、列裁剪等常见的优化规则进行优化,以生成更高效的执行计划。


Flink 主要使用 Calcite 的优化器,采用 HepPlanner 和 VolcanoPlanner 这两种优化方式进行优化。


HepPlanner: 是基于规则优化(RBO)的实现,它是一个启发式的优化器,按照规则进行匹配,直到达到次数限制(match 次数限制)或者遍历一遍后不再出现 rule match 的情况才算完成。VolcanoPlanner: 是基于成本优化(CBO)的实现,它会一直迭代 rules,直到找到 cost 最小的 plan。需要注意的是,在调用规则优化前,Flink 会有一个内部的 CommonSubGraphBasedOptimizer 优化器用于提取多个执行计划北京大数据培训的共用逻辑。CommonSubGraphBasedOptimizer 是 Flink 对于多流场景(常见为多个 insert)的优化器,主要作用是提取共用的逻辑,生成有向无环图,避免对共用逻辑进行重复计算。


根据运行环境不同(批和流式),CommonSubGraphBasedOptimizer 优化器有 BatchCommonSubGraphBasedOptimizer 和 StreamCommonSubGraphBasedOptimizer 两种实现方式。从执行结果来看,CommonSubGraphBasedOptimizer 优化类似于 Spark 表的物化,最终目的都是避免数据重复计算。


源码中 RelNodeBlock 类注释很形象地描述了优化效果:


  • {{{-

  • val sourceTable = tEnv.scan("test_table").select('a, 'b, 'c)

  • val leftTable = sourceTable.filter('a > 0).select('a as 'a1, 'b as 'b1)

  • val rightTable = sourceTable.filter('c.isNotNull).select('b as 'b2, 'c as 'c2)

  • val joinTable = leftTable.join(rightTable, 'a1 === 'b2)

  • joinTable.where('a1 >= 70).select('a1, 'b1).writeToSink(sink1)

  • joinTable.where('a1 < 70 ).select('a1, 'c2).writeToSink(sink2)

  • }}}

  • the RelNode DAG is:

  • {{{-

  • Sink(sink1) Sink(sink2)

  • | |

  • Project(a1,b1) Project(a1,c2)

  • | |

  • Filter(a1>=70) Filter(a1<70)

  • Project(a1,b1) Project(b2,c2)

  • Filter(a>0) Filter(c is not null)

  • }}}

  • This [[RelNode]] DAG will be decomposed into three [[RelNodeBlock]]s, the break-point

  • is the [RelNode] which data outputs to multiple [[LegacySink]]s.

  • <p>Notes: Although Project(a,b,c) has two parents (outputs),

  • they eventually merged at Join(a1=b2). So Project(a,b,c) is not a break-point.

  • <p>the first [[RelNodeBlock]] includes TableScan, Project(a,b,c), Filter(a>0),

  • Filter(c is not null), Project(a1,b1), Project(b2,c2) and Join(a1=b2)

  • <p>the second one includes Filter(a1>=70), Project(a1,b1) and Sink(sink1)

  • <p>the third one includes Filter(a1<70), Project(a1,c2) and Sink(sink2)

  • <p>And the first [[RelNodeBlock]] is the child of another two.

  • The [[RelNodeBlock]] plan is:

  • {{{-

  • RelNodeBlock2 RelNodeBlock3

  • }}}


可以看到,在这段注释中,sink1 和 sink2 这两个 sink 流有一段逻辑是共用的,即 Join(a1=b2)。那么,优化器在优化阶段会将这段逻辑切分成 3 个 block,其中共用的逻辑为单独的 RelNodeBlock1,优化器将把这部分共用逻辑提取出来,避免重复计算。


中间表注册语法扩展


问题描述


值得注意的是,原生的 Flink SQL 只能通过调用 table api 来提取共用逻辑。在非 table api 的场景下,比如,数据经过计算后将根据字段条件被写入不同的 kafka topic,SQL 示例如下:


create table source_table(data string,topic string,) with ('connector' = 'kafka','topic' = 'topic','properties.bootstrap.servers' = '','format' = 'csv')


create table sink_table1(data string,) with ('connector' = 'kafka','topic' = 'topic2','properties.bootstrap.servers' = '','format' = 'csv')


create table sink_table2(data string,) with ('connector' = 'kafka','topic' = 'topic2','properties.bootstrap.servers' = '','format' = 'csv')


insert into sink_table1 select * from (select SBSTR(data, 0, 6) data,topic from source_table) where topic='topic1'


insert into sink_table2 select * from (select SBSTR(data, 0, 6) data,topic from source_table) where topic='topic2'


如果使用 StreamTableEnviroment.executeSql()去分别执行这两条 insert sql,最终会异步生成两个任务,因此需要使用 Flink 提供的 statementset 先缓存多条 insert sql,最后调用执行,在一个任务中完成多条数据流的处理。


可以发现,在这两条 insert sql 中存在复用逻辑,即 select SBSTR(data, 0, 6) data、topic from source_table。预期的结果是 Flink 能够识别到这段共用逻辑并复用,但是实际情况并非预期中的,如下图:


问题分析


分析出现该问题的原因是:Flink 在解析阶段将 select SBSTR(data, 0, 6) data、topic from source_table 解析成 SqlNode(SqlSelect)并生成相应的 RelNode。由于即便是相同逻辑的 SQL,其解析为 RelNode 的摘要也是不同的。而 Flink 正是通过摘要来寻找复用的 RelNode。因此,Flink 也就不能识别到这段逻辑是可以共用的。


判断逻辑共用的源码如下:


/*** Reuse common sub-plan in different RelNode tree, generate a RelNode dag** @param relNodes RelNode trees* @return RelNode dag which reuse common subPlan in each tree*/private def reuseRelNodes(relNodes: Seq[RelNode], tableConfig: TableConfig): Seq[RelNode] = {val findOpBlockWithDigest = tableConfig.getConfiguration.getBoolean(RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED)if (!findOpBlockWithDigest) {return relNodes}


// reuse sub-plan with same digest in input RelNode trees.val context = new SubplanReuseContext(true, relNodes: _*)val reuseShuttle = new SubplanReuseShuttle(context)relNodes.map(_.accept(reuseShuttle))
复制代码


}


解决思路


那么如何解决呢?首先想到的思路就是将这段共用逻辑注册成表,这样 Flink 就能知道这段逻辑是共用的。


目前有“注册视图”(create view as query)和“注册表”(registerTable)两种方式能够将共用逻辑注册成表。在 Flink 中,当执行‘create view as query' 创建视图或者调用 registerTable 注册表时,底层都会在 catalog 中创建临时表,区别在于 create view 创建表的实现类为 CatalogViewImpl,而 registerTable 创建表的实现类为 QueryOperationCatalogView。


前者 CatalogViewImpl 的查询逻辑使用字符串表示,而后者 QueryOperationCatalogView 的查询逻辑已经被解析为 QueryOperation。也就是说,执行创建视图的语句时,最终创建的临时表仅仅是缓存了查询部分的 SQL 语句,当其他命令使用这个临时表时还需要重新解析临时表中的查询语句,而重新解析带来的问题就是创建新的 RelNode,产生不同的摘要,这样 Flink 仍然不能够识别到这段共用逻辑并复用。


相反,regiterTable 这样的方式就不需要对临时表中的查询语句进行重新解析。因此可以采用 regiterTable 将共用逻辑注册成表。示例代码如下:


// 创建视图 StreamTableEnvironment.executeSql("create view as select SBSTR(data, 0, 6) data,topic from source_table);


// 注册表 Table table = StreamTableEnvironment.executeSql("select SBSTR(data, 0, 6) data,topic from source_table");


StreamTableEnvironment.registerTable("tmp", table);


但是,为了 SQL 化,就需要有语法去支持中间表注册,以屏蔽底层的 api 调用,实现用户无感知。新语法预期如下:


REGISTER TABLE TABLE_NAME AS SELECT_QUERY


// 示例 register table tmp as select SBSTR(data, 0, 6) data,topic from source_table


实现语法支持


如何实现新的语法,来支持中间表注册呢?目前有 2 种解决方案:


方案 1:框架先使用正则匹配判断 SQL 类型,之后提取出临时表名和查询逻辑,比如上面的 SQL 经过正则匹配提取组之后可以得到表名为 tmp,查询逻辑为'select SBSTR(data, 0, 6) data,topic from source_table',之后框架去调用 table api 进行注册。


方案 2:修改 flink-table 模块源码扩展语法,实现对 register table 语法的支持。


从实现难度上来说,方案 1 的改动少,难度也较小,而方案 2 虽然改动较大,但是通用性更好。下面主要围绕方案 2 的实现展开。


register table 语法扩展大致分为以下 3 个步骤:


Step1 SQL 解析与校验


即修改 Java CC 相关文件,使得 SqlParser 可以识别新的语法并解析为 AST。


1.1 增加关键字“REGISTER”


首先需要让解析器识别新的关键字“REGISTER”,因此修改 Parser.tdd,在 keywords 和 nonReservedKeywords 中分别增加“REGISTER”关键字。


1.2 创建 SqlNode (SqlRegisterTable)


由于 SqlParser 解析 SQL 生成的 AST 数据类型为 SqlNode,因此需要增加相应的 SqlNode。在 org.apache.flink.sql.parser.ddl 下创建 SqlRegisterTable:


package org.apache.flink.sql.parser.ddl;


import org.apache.calcite.sql.*;import org.apache.calcite.sql.parser.SqlParserPos;import javax.annotation.Nonnull;


import java.util.Collections;import java.util.List;


public class SqlRegisterTable extends SqlCall {public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("REGISTER TABLE", SqlKind.OTHER_DDL);


private SqlIdentifier tableName;private SqlNode query;// 部分代码省略}


修改 Parser.tdd,在 imports 增加 org.apache.flink.sql.parser.ddl.SqlRegisterTable,即刚才新建的 SqlRegisterTable 的全类名路径。


1.3 增加语法解析模版


除了关键字和 SqlNode,还需要相应的语法模版,让解析器能够把 SQL 解析为 SqlRegisterTable。


修改 parserImpls.ftl,增加语法解析模版:


SqlRegisterTable SqlRegisterTable() :{SqlIdentifier tableName = null;SqlNode query = null;SqlParserPos pos;}{<REGISTER> <TABLE> { pos = getPos();}tableName = CompoundIdentifier()<AS>query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY){return new SqlRegisterTable(pos, tableName, query);}}


Step2:SqlNode 转为 Operation


根据 calcite 在 Flink 中的执行流程,Flink 会将 SqlNode 封装为 Operation,因此需要创建相应的 RegisterTableOperation,并修改相关的转换逻辑。


2.1 新建 RegisterTableOperation


package org.apache.flink.table.operations.ddl;


import org.apache.flink.table.catalog.ObjectIdentifier;import org.apache.flink.table.operations.Operation;import org.apache.flink.table.operations.QueryOperation;


public class RegisterTableOperation implements Operation{private final ObjectIdentifier tableIdentifier;private final QueryOperation query;


// 省略部分代码
复制代码


}


2.2 增加解析逻辑


生成 RegisterTableOperation 之后,还需要让 Flink 能将 SqlNode 转换成对应的 Operation,因此我们要修改 SqlToOperationConverter.convert 内部代码,增加解析逻辑,代码如下:


private Operation convertRegisterTable(SqlRegisterTable sqlRegisterTable) {UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlRegisterTable.fullTableName());ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);PlannerQueryOperation operation = toQueryOperation(flinkPlanner, validateQuery);


return new RegisterTableOperation(identifier, operation);
复制代码


}


Step3 Operation 执行


即底层调用 RegisterTable 实现注册。由于流处理底层使用 TableEnvironmentImpl 进行相关 SQL 操作,比如常见的 executeSql(String statement) 操作 :


@Overridepublic TableResult executeSql(String statement) {List<Operation> operations = parser.parse(statement);


if (operations.size() != 1) {  throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);}

return executeOperation(operations.get(0));
复制代码


}


因此在 executeOperation()方法中,需要识别 RegisterTableOperation 进行额外操作,因此增加 operation 执行逻辑如下:


通过以上三步,完成源码修改,然后将 flink-parser 打包替换当前依赖,即可实现对 register table 语法的扩展。


Flink 的 SQL 执行基于 calcite,语法拓展的实现简要概括分为语法解析、转换、优化和执行 4 个阶段,其中会涉及到 Java CC、Planner 等知识,有兴趣的同学可以查阅相关内容做深入了解。


来源个推技术实践

用户头像

关注尚硅谷,轻松学IT 2021.11.23 加入

还未添加个人简介

评论

发布
暂无评论
大数据开发之Flink SQL建设实时数仓实践