写点什么

一文搞懂 Flink SQL 执行过程

用户头像
shengjk1
关注
发布于: 2021 年 03 月 23 日

背景

学习了 apache calcite,基本上把 apache calcite 的官网看了一遍,也写了几个小例子,现在该分析一下 Flink SQL 的执行过程了,其中关于 apache calcite 的部分不深究,因为 apache calcite 有些复杂,真的要了解清楚需要大量时间,本次还是聚焦 Flink.


正文

以 SQL Query 为例 ``select a.* from a join b on a.id=b.id``


sql query 入口方法


// sql query 入口方法  override def sqlQuery(query: String): Table = {  // 最后生成是 PlannerQueryOperation,也就是 Flink 算子    val operations = parser.parse(query)
if (operations.size != 1) throw new ValidationException( "Unsupported SQL query! sqlQuery() only accepts a single SQL query.")
operations.get(0) match { case op: QueryOperation if !op.isInstanceOf[ModifyOperation] => createTable(op) case _ => throw new ValidationException( "Unsupported SQL query! sqlQuery() only accepts a single SQL query of type " + "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.") } }
复制代码

接下来具体看一下如何生成 PlannerQueryOperation

首先生成 SqlNode


@Override    public List<Operation> parse(String statement) {        CalciteParser parser = calciteParserSupplier.get();        FlinkPlannerImpl planner = validatorSupplier.get();        // parse the sql query        //SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode        SqlNode parsed = parser.parse(statement);
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed) .orElseThrow(() -> new TableException("Unsupported query: " + statement)); return Collections.singletonList(operation); }
复制代码

然后将 SqlNode 转化为 RelNode


 private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {        // transform to a relational tree        //语义分析,生成逻辑计划,作用是SqlNode–>RelNode        RelRoot relational = planner.rel(validated);        return new PlannerQueryOperation(relational.project());    }
复制代码

在转化 RelNode 的过程会,基于 Flink 定制的优化规则以及 calcite 自身的一些规则


/**  * Support all joins.  Flink定制的优化rules  */private class FlinkLogicalJoinConverter  extends ConverterRule(    classOf[LogicalJoin],    Convention.NONE,    FlinkConventions.LOGICAL,    "FlinkLogicalJoinConverter") {
override def convert(rel: RelNode): RelNode = { val join = rel.asInstanceOf[LogicalJoin] val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL) val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL) FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, join.getJoinType) }}
复制代码

生成 物理执行计划,对应的都是 RelNode

class StreamExecJoinRule  extends RelOptRule(    ......
// 基于Flink rules 将optimized LogicalPlan转成 Flink 物理执行计划 override def onMatch(call: RelOptRuleCall): Unit = { val join: FlinkLogicalJoin = call.rel(0) val left = join.getLeft val right = join.getRight
def toHashTraitByColumns( columns: util.Collection[_ <: Number], inputTraitSets: RelTraitSet): RelTraitSet = { val distribution = if (columns.isEmpty) { FlinkRelDistribution.SINGLETON } else { FlinkRelDistribution.hash(columns) } inputTraitSets .replace(FlinkConventions.STREAM_PHYSICAL) .replace(distribution) }
val joinInfo = join.analyzeCondition() val (leftRequiredTrait, rightRequiredTrait) = ( toHashTraitByColumns(joinInfo.leftKeys, left.getTraitSet), toHashTraitByColumns(joinInfo.rightKeys, right.getTraitSet))
val providedTraitSet = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
val newLeft: RelNode = RelOptRule.convert(left, leftRequiredTrait) val newRight: RelNode = RelOptRule.convert(right, rightRequiredTrait)
// Stream physical RelNode,物理执行计划 val newJoin = new StreamExecJoin( join.getCluster, providedTraitSet, newLeft, newRight, join.getCondition, join.getJoinType) call.transformTo(newJoin) }}
复制代码

然后通过 translateToPlanInternal 生成 Flink 算子


class StreamExecJoin(    cluster: RelOptCluster,    traitSet: RelTraitSet,    leftRel: RelNode,    rightRel: RelNode,    condition: RexNode,    joinType: JoinRelType)  extends CommonPhysicalJoin(cluster, traitSet, leftRel, rightRel, condition, joinType)  with StreamPhysicalRel  with StreamExecNode[RowData] {  ......
// 作用是生成 StreamOperator, 即Flink算子 override protected def translateToPlanInternal( planner: StreamPlanner): Transformation[RowData] = {
val tableConfig = planner.getTableConfig val returnType = InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType))
val leftTransform = getInputNodes.get(0).translateToPlan(planner) .asInstanceOf[Transformation[RowData]] val rightTransform = getInputNodes.get(1).translateToPlan(planner) .asInstanceOf[Transformation[RowData]]
val leftType = leftTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]] val rightType = rightTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
val (leftJoinKey, rightJoinKey) = JoinUtil.checkAndGetJoinKeys(keyPairs, getLeft, getRight, allowEmptyKey = true)
val leftSelect = KeySelectorUtil.getRowDataSelector(leftJoinKey, leftType) val rightSelect = KeySelectorUtil.getRowDataSelector(rightJoinKey, rightType)
val leftInputSpec = analyzeJoinInput(left) val rightInputSpec = analyzeJoinInput(right)
val generatedCondition = JoinUtil.generateConditionFunction( tableConfig, cluster.getRexBuilder, getJoinInfo, leftType.toRowType, rightType.toRowType)
val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
val operator = if (joinType == JoinRelType.ANTI || joinType == JoinRelType.SEMI) { new StreamingSemiAntiJoinOperator( joinType == JoinRelType.ANTI, leftType, rightType, generatedCondition, leftInputSpec, rightInputSpec, filterNulls, minRetentionTime) } else { val leftIsOuter = joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL val rightIsOuter = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL new StreamingJoinOperator( leftType, rightType, generatedCondition, leftInputSpec, rightInputSpec, leftIsOuter, rightIsOuter, filterNulls, minRetentionTime) }
val ret = new TwoInputTransformation[RowData, RowData, RowData]( leftTransform, rightTransform, getRelDetailedDescription, operator, returnType, leftTransform.getParallelism)
if (inputsContainSingleton()) { ret.setParallelism(1) ret.setMaxParallelism(1) }
// set KeyType and Selector for state ret.setStateKeySelectors(leftSelect, rightSelect) ret.setStateKeyType(leftSelect.getProducedType) ret }
复制代码

算子执行方式


public class StreamingJoinOperator extends AbstractStreamingJoinOperator {
private static final long serialVersionUID = -376944622236540545L;
// whether left side is outer side, e.g. left is outer but right is not when LEFT OUTER JOIN private final boolean leftIsOuter; // whether right side is outer side, e.g. right is outer but left is not when RIGHT OUTER JOIN private final boolean rightIsOuter;
private transient JoinedRowData outRow; private transient RowData leftNullRow; private transient RowData rightNullRow;
// left join state private transient JoinRecordStateView leftRecordStateView; // right join state private transient JoinRecordStateView rightRecordStateView;
public StreamingJoinOperator( InternalTypeInfo<RowData> leftType, InternalTypeInfo<RowData> rightType, GeneratedJoinCondition generatedJoinCondition, JoinInputSideSpec leftInputSideSpec, JoinInputSideSpec rightInputSideSpec, boolean leftIsOuter, boolean rightIsOuter, boolean[] filterNullKeys, long stateRetentionTime) { ......
@Override public void processElement1(StreamRecord<RowData> element) throws Exception { processElement(element.getValue(), leftRecordStateView, rightRecordStateView, true); }
@Override public void processElement2(StreamRecord<RowData> element) throws Exception { processElement(element.getValue(), rightRecordStateView, leftRecordStateView, false); }
复制代码

也是 join 最终执行的地方


总结

sql 解析,生成抽象语法树,由 SQL---> SqlNode,然后进行语义分析,生成 Logical Plan, SqlNode---->RelNode 未经过优化的 RelNode ----> 应用 Flink 定制的一些优化 rule,优化 Logical Plan

----> 转化为物理执行计划 Stream physical RelNode -----> 生成 StreamOperator Flink 算子

----> 算子执行



本文的主要目的是在大方向上明白 Flink SQL 的解析过程,具体细节读者感兴趣可以自行深入研究


发布于: 2021 年 03 月 23 日阅读数: 11
用户头像

shengjk1

关注

还未添加个人签名 2018.04.26 加入

博客 https://blog.csdn.net/jsjsjs1789

评论

发布
暂无评论
一文搞懂Flink SQL执行过程