写点什么

SparkSQL 内核剖析

用户头像
永健_何
关注
发布于: 2021 年 07 月 08 日
SparkSQL内核剖析

概述:

之前有尝试在 SparkSQL 内核添加自定义 SQL 操作不同的底层数据源,实现计算分析任务,这里就对 SparkSQL 的 Catalyst 模块进行简要的分析。在早期大数据时代的大规模处理数据的技术是 Hadoop 提供的 MapReduce 任务,但这种框架执行效率太慢,进行一些关系型处理(如 join)需要编写大量代码。后来 hive 这种框架可以让用户输入 sql 语句,自动进行优化并执行,降低了写 MR 代码的成本,同样对于早期的 Spark 来说,用户需要通过写 RDD 来完成执行逻辑,这样就使代码可读性不高,并且具体的执行逻辑不是最优的,会影响 Spark 任务的运行效率。

1. SparkSQL 的基本介绍:

下图为论文里提供的一张 SparkSQL 的架构图:

它提供了一个叫做 DataFrames 的可编程抽象数据模型,并且可被视为一个分布式的 SQL 查询引擎。对外提供 SQL 的操作方式主要为 JDBC 数据源,CLI shell 和 Programs 三种;而 SQL 解析,优化以及运行都是由 SparkSQL Catalyst 模块完成,最终转化为相应的 Spark Rdd 执行计算任务。

2. SparkSQL 逻辑计划概述:


select fieldA, fieldB, filedC from tableA where fieldA > 10;
复制代码

SQL 主要由 Projection(filedA,fieldB,fieldC),DataSource(tableA)和 Filter(fieldA>10)三个部分组成,分别对应 SQL 查询过程中的 ResultDataSource Operation:

实际的 SQL 执行顺序过程是按照 Opertaion->DataSouece->Result 的顺序,刚好与 SQL 的语法刚好相反,具体包括:

  1. 首先进行词法和语法 Parse,对输入的 SQL 语句进行解析,确定语句中哪些是关键词(SELECT、FROM 和 WHERE),哪些是 Peojection,哪些是 DataSource 等,判断 SQL 是否规范,并生成逻辑计划 Logical plan;

  2. 将 SQL 预计和数据库字典进行 Bind,如果 Projection 和 DataSource 都成功绑定,即这条 SQL 可执行;

  3. 进行 SQL 执行的 Optimize,选择一条最优的执行计划执行;

  4. 按照最优的 Exectue Plan 执行 SQL,从数据库中查询结果集返回。


在 SparkSQL 中同样会先将 SQL 语句进行 Parse 形成一个 Tree,然后使用 Rule 对 Tree 进行绑定,优化等处理过程(这里通过匹配模式对不同的节点采用不同的操作)。这个最核心的过程就是由 Spark 的 Catalyst 负责完成 SQL 的解析,绑定,优化以及生成物理计划。SparkSQL 模块主要由 core,catalyst,hive 和 hive-thriftserver 组成:

  • core:负责处理数据的输入/输出,从数据源获取数据,输出 DataFrame;

  • catalyst:SQL 的解析,绑定,优化以及生成物理计划

  • hive:负责对 hive 数据的处理

  • hive-thriftserver:提供CLI 和 JDBC 接口等。

论论文 SparkSQL Catalyst 的解析流程图:

  1. SQL 语句经过 Antlr4 解析,生成 Unresolved Logical Plan

  2. analyzer catalog 进行绑定,生成 Logical Plan

  3. optimizer Logical Plan 优化,生成 Optimized LogicalPlan

  4. SparkPlan Optimized LogicalPlan 转换成 Physical Plan

  5. prepareForExecution 方法将 Physical Plan 转换成 executed Physical Plan

  6. execute()执行可执行物理计划,得到 RDD

/** * 测试代码 */object TestSpark {  case class Person(name: String, age: Long)
def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("test").master("local").getOrCreate()
import spark.implicits._ val df: DataFrame = spark.sparkContext.parallelize(Array( Person("zhangsan", 10), Person("lisi", 20), Person("wangwu", 30))) .toDF("name", "age")
df.createTempView("people")
/** * 1.SparkSqlParser中的AstBuilder将语法树的各个节点转换为对应LogicalPlan节点,组成未解析的逻辑算子树,不包含数据信息与列信息 * * 2.Analyzer将一系列规则作用在未解析逻辑算子树上,生成解析后的逻辑算子树 * * 3.Optimizer将一系列优化规则应用在逻辑算子树中,确保结果正确的前提下改进低效结构,生成优化后的逻辑算子树 */ val sql = spark.sql("select name from people where age >= 20") sql.queryExecution.debug }}
复制代码

2.1 SQL Parse

SparkSQL 的采用的是 Anrl4 进行 SQL 的词法和语法解析(Spark SQL 和 Presto 采用的是 Antlr4,FLink 采用的是 Calcite),Anrl4 主要提供了 Parser 编译器和 Translator 解释器框架。

在 Spark 源码中提供了一个.g4 文件,编译的时候会使用 Antlr 根据这个.g4 生成对应的词法分析类和语法分析类,采用访问者模式,即会去遍历生成的语法树(针对语法树中每个节点生成一个 visit 方法),以及返回相应的值,用以构建 Logical Plan 语法树。

SparkSQL 使用 Antlr4 的访问者模式,生成 Unresolved Logical Plan。这里,可以用 IDEA ANTLR Preview 插件可以看到到SQL 解析后生成的语法树,譬如:

SELECT A FROM TABLE
复制代码

转换成一棵语法树的可视图,SparkBase.g4 文件还有很多其他类型的语句,比如 INSERT,ALERT 等等。

其中,LogicalPlan 其实是继承自 TreeNode,所以本质上 LogicalPlan 就是一棵树。Tree 提供 UnaryNode,BinaryNode 和 LeafNode 三种 trait:

  • LeafNode,叶子节点,一般用来表示用户命令

  • UnaryNode,一元节点,表示 FILTER 等操作

  • BinaryNode,二元节点,表示 JOIN,GROUP BY 等操作

/** * A logical plan node with no children. */trait LeafNode extends LogicalPlan with LeafLike[LogicalPlan] 
/** * A logical plan node with single child. */trait UnaryNode extends LogicalPlan with UnaryLike[LogicalPlan]
/** * A logical plan node with a left and right child. */trait BinaryNode extends LogicalPlan with BinaryLike[LogicalPlan]
复制代码

譬如上述 SQL 在 parse 阶段使用 antlr4,将一条 SQL 语句解析成语法树,然后使用 antlr4 的访问者模式遍历生成语法树,也就是 Logical Plan,但此时还是 Unresolved LogicalPlan,即无法确定 src 是否存在,以及具体的的元数据是什么样。没有通过 Analysis 阶段,无法确定 A 的具体类型以及 TABLE 这个数据源是否存在等信息,只有通过 Analysis 阶段后,才会把 Unresolved 变成 Resolved LogicalPlan。

spark.sql("select name from people where age >= 20")
logicalPlan: 'Project ['name]+- 'Filter ('age >= 20) +- 'UnresolvedRelation `people`
复制代码

2.2 Analyzed

在 Analysis 阶段,使用 Analysis Rules 结合 SeesionCatalog 元数据,对会将 Unresolved LogicalPlan 进行解析,生成 Resolved LogicalPlan 的。Spark SQL 通过使用 Catalyst rule 和 Catalog 来跟踪数据源的 table 信息。这个阶段核心处理类是 Analyzer 类,自身实现大量的 rule,然后注册到 batch 变量中:

override def batches: Seq[Batch] = Seq(    Batch("Substitution", fixedPoint,      // This rule optimizes `UpdateFields` expression chains so looks more like optimization rule.      // However, when manipulating deeply nested schema, `UpdateFields` expression tree could be      // very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early      // at the beginning of analysis.      OptimizeUpdateFields,      CTESubstitution,      WindowsSubstitution,      EliminateUnions,      SubstituteUnresolvedOrdinals),    Batch("Disable Hints", Once,      new ResolveHints.DisableHints),    Batch("Hints", fixedPoint,      ResolveHints.ResolveJoinStrategyHints,      ResolveHints.ResolveCoalesceHints),    Batch("Simple Sanity Check", Once,      LookupFunctions),    Batch("Resolution", fixedPoint,      ResolveTableValuedFunctions(v1SessionCatalog) ::      ResolveNamespace(catalogManager) ::      new ResolveCatalogs(catalogManager) ::      ResolveUserSpecifiedColumns ::      ResolveInsertInto ::      ResolveRelations ::      ResolveTables ::      ResolvePartitionSpec ::      AddMetadataColumns ::      DeduplicateRelations ::      ResolveReferences ::      ResolveCreateNamedStruct ::      ResolveDeserializer ::      ResolveNewInstance ::      ResolveUpCast ::      ResolveGroupingAnalytics ::      ResolvePivot ::      ResolveOrdinalInOrderByAndGroupBy ::      ResolveAggAliasInGroupBy ::      ResolveMissingReferences ::      ExtractGenerator ::      ResolveGenerate ::      ResolveFunctions ::      ResolveAliases ::      ResolveSubquery ::      ResolveSubqueryColumnAliases ::      ResolveWindowOrder ::      ResolveWindowFrame ::      ResolveNaturalAndUsingJoin ::      ResolveOutputRelation ::      ExtractWindowExpressions ::      GlobalAggregates ::      ResolveAggregateFunctions ::      TimeWindowing ::      ResolveInlineTables ::      ResolveHigherOrderFunctions(catalogManager) ::      ResolveLambdaVariables ::      ResolveTimeZone ::      ResolveRandomSeed ::      ResolveBinaryArithmetic ::      ResolveUnion ::      typeCoercionRules ++      extendedResolutionRules : _*),    Batch("Remove TempResolvedColumn", Once, RemoveTempResolvedColumn),    Batch("Apply Char Padding", Once,      ApplyCharTypePadding),    Batch("Post-Hoc Resolution", Once,      Seq(ResolveCommandsWithIfExists) ++      postHocResolutionRules: _*),    Batch("Normalize Alter Table Field Names", Once, ResolveFieldNames),    Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),    Batch("Remove Unresolved Hints", Once,      new ResolveHints.RemoveAllHints),    Batch("Nondeterministic", Once,      PullOutNondeterministic),    Batch("UDF", Once,      HandleNullInputsForUDF,      ResolveEncodersInUDF),    Batch("UpdateNullability", Once,      UpdateAttributeNullability),    Batch("Subquery", Once,      UpdateOuterReferences),    Batch("Cleanup", fixedPoint,      CleanupAliases),    Batch("HandleAnalysisOnlyCommand", Once,      HandleAnalysisOnlyCommand)  )
复制代码

具体调用 RuleExecutor 的 execute 方法串行执行这些 Rule,匹配 UnresolvedRelation,然后递归去 Catlog 中获取对应的元数据信息,递归将它及子节点变成 Resoulved。

spark.sql("select name from people where age >= 20")
analyzed: Project [name#6]+- Filter (age#7L >= cast(20 as bigint)) +- SubqueryAlias `people` +- Project [name#3 AS name#6, age#4L AS age#7L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, TestSpark$Person, true])).name, true, false) AS name#3, assertnotnull(assertnotnull(input[0, TestSpark$Person, true])).age AS age#4L] +- ExternalRDD [obj#2]
复制代码

可以看到经过 Analyzed 分析后,Parse 阶段的 Unresolved LogicalPlan 进行展开,每一步的操作都与 Session Catalog 中的元数据进行了绑定,这个就是一条可执行的逻辑计划,即 Resolved LogicalPlan。

2.3 Optimized

上面经过 Analyzer 阶段后,即生成了可执行的 Logical Plan,但 SparkSQL 不会立即转化为 Spark Plan 执行,而是会对生成 Resolved Logical Plan 执行逻辑进行优化,提高运代码行效率,这个阶段叫 Optimized。Optimizer 优化器的实现和处理方式与上面 Analyzer 类似定义了一系列的 Rule,然后利用这些 Rule 对 Logical Plan 和 Expression 进行迭代处理,其中主要的优化策略是合并,列裁剪和谓词下推等。

 def defaultBatches: Seq[Batch] = {    val operatorOptimizationRuleSet =      Seq(        // Operator push down        PushProjectionThroughUnion,        ReorderJoin,        EliminateOuterJoin,        PushPredicateThroughJoin,        PushDownPredicate,        LimitPushDown,        ColumnPruning,        InferFiltersFromConstraints,        // Operator combine        CollapseRepartition,        CollapseProject,        CollapseWindow,        CombineFilters,        CombineLimits,        CombineUnions,        // Constant folding and strength reduction        NullPropagation,        ConstantPropagation,        FoldablePropagation,        OptimizeIn,        ConstantFolding,        ReorderAssociativeOperator,        LikeSimplification,        BooleanSimplification,        SimplifyConditionals,        RemoveDispensableExpressions,        SimplifyBinaryComparison,        PruneFilters,        EliminateSorts,        SimplifyCasts,        SimplifyCaseConversionExpressions,        RewriteCorrelatedScalarSubquery,        EliminateSerialization,        RemoveRedundantAliases,        RemoveRedundantProject,        SimplifyExtractValueOps,        CombineConcats) ++        extendedOperatorOptimizationRules
val operatorOptimizationBatch: Seq[Batch] = { val rulesWithoutInferFiltersFromConstraints = operatorOptimizationRuleSet.filterNot(_ == InferFiltersFromConstraints) Batch("Operator Optimization before Inferring Filters", fixedPoint, rulesWithoutInferFiltersFromConstraints: _*) :: Batch("Infer Filters", Once, InferFiltersFromConstraints) :: Batch("Operator Optimization after Inferring Filters", fixedPoint, rulesWithoutInferFiltersFromConstraints: _*) :: Nil }
(Batch("Eliminate Distinct", Once, EliminateDistinct) :: // Technically some of the rules in Finish Analysis are not optimizer rules and belong more // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). // However, because we also use the analyzer to canonicalized queries (for view definition), // we do not eliminate subqueries or compute current time in the analyzer. Batch("Finish Analysis", Once, EliminateSubqueryAliases, EliminateView, ReplaceExpressions, ComputeCurrentTime, GetCurrentDatabase(sessionCatalog), RewriteDistinctAggregates, ReplaceDeduplicateWithAggregate) :: ////////////////////////////////////////////////////////////////////////////////////////// // Optimizer rules start here ////////////////////////////////////////////////////////////////////////////////////////// // - Do the first call of CombineUnions before starting the major Optimizer rules, // since it can reduce the number of iteration and the other rules could add/move // extra operators between two adjacent Union operators. // - Call CombineUnions again in Batch("Operator Optimizations"), // since the other rules might make two separate Unions operators adjacent. Batch("Union", Once, CombineUnions) :: // run this once earlier. this might simplify the plan and reduce cost of optimizer. // for example, a query such as Filter(LocalRelation) would go through all the heavy // optimizer rules that are triggered when there is a filter // (e.g. InferFiltersFromConstraints). if we run this batch earlier, the query becomes just // LocalRelation and does not trigger many rules Batch("LocalRelation early", fixedPoint, ConvertToLocalRelation, PropagateEmptyRelation) :: Batch("Pullup Correlated Expressions", Once, PullupCorrelatedPredicates) :: Batch("Subquery", Once, OptimizeSubqueries) :: Batch("Replace Operators", fixedPoint, RewriteExceptAll, RewriteIntersectAll, ReplaceIntersectWithSemiJoin, ReplaceExceptWithFilter, ReplaceExceptWithAntiJoin, ReplaceDistinctWithAggregate) :: Batch("Aggregate", fixedPoint, RemoveLiteralFromGroupExpressions, RemoveRepetitionFromGroupExpressions) :: Nil ++ operatorOptimizationBatch) :+ Batch("Join Reorder", Once, CostBasedJoinReorder) :+ Batch("Remove Redundant Sorts", Once, RemoveRedundantSorts) :+ Batch("Decimal Optimizations", fixedPoint, DecimalAggregates) :+ Batch("Object Expressions Optimization", fixedPoint, EliminateMapObjects, CombineTypedFilters) :+ Batch("LocalRelation", fixedPoint, ConvertToLocalRelation, PropagateEmptyRelation) :+ Batch("Extract PythonUDF From JoinCondition", Once, PullOutPythonUDFInJoinCondition) :+ // The following batch should be executed after batch "Join Reorder" "LocalRelation" and // "Extract PythonUDF From JoinCondition". Batch("Check Cartesian Products", Once, CheckCartesianProducts) :+ Batch("RewriteSubquery", Once, RewritePredicateSubquery, ColumnPruning, CollapseProject, RemoveRedundantProject) :+ Batch("UpdateAttributeReferences", Once, UpdateNullabilityInAttributeReferences) }
复制代码

Optimizer 的优化策略不仅对已绑定的 Logical PLan 进行优化,而且对 Logical Plan 中的 Expression 也进行优化,其原理就是遍历树,然后应用优化 Rule。

spark.sql("select name from people where age >= 20")
optimizedPlan: Project [name#3]+- Filter (age#4L >= 20) +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, TestSpark$Person, true]).name, true, false) AS name#3, assertnotnull(input[0, TestSpark$Person, true]).age AS age#4L] +- ExternalRDD [obj#2]
复制代码

可以看到,经过 Optimizer 阶段,生成的 optimizedPlan 对比 Analyzer 阶段生成的 Resolved LogicalPlan 简化了很多,Project 部分只剩下 name 字段,其他没有可优化的地方保持不变。

2.4 Physical Plan

经过 Optimzer 的优化,SparkPlanner 这个类会使用 Strategies 将优化后的 Logical Plan 进行转换,生成可执行的 Physical Plan,相比较于 Logical Plan,Physical Plan 算是 Spark 能够执行的东西,这里 spkarPlan 就是相当于 Physical Plan。

override def strategies: Seq[Strategy] =    experimentalMethods.extraStrategies ++      extraPlanningStrategies ++ (      PythonEvals ::      DataSourceV2Strategy ::      FileSourceStrategy ::      DataSourceStrategy(conf) ::      SpecialLimits ::      Aggregation ::      Window ::      JoinSelection ::      InMemoryScans ::      BasicOperators :: Nil)
/** * Override to add extra planning strategies to the planner. These strategies are tried after * the strategies defined in [[ExperimentalMethods]], and before the regular strategies. */ def extraPlanningStrategies: Seq[Strategy] = Nil
复制代码

这里传入一个逻辑计划,生成一个物理计划,即 SparkPlan:

lazy val sparkPlan: SparkPlan = {    // We need to materialize the optimizedPlan here because sparkPlan is also tracked under    // the planning phase    assertOptimized()    executePhase(QueryPlanningTracker.PLANNING) {      // Clone the logical plan here, in case the planner rules change the states of the logical      // plan.      QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())    }  }
/** * Transform a [[LogicalPlan]] into a [[SparkPlan]]. * * Note that the returned physical plan still needs to be prepared for execution. */ def createSparkPlan( sparkSession: SparkSession, planner: SparkPlanner, plan: LogicalPlan): SparkPlan = { // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, // but we will implement to choose the best plan. planner.plan(ReturnAnswer(plan)).next() }
复制代码

这里 SparkSQL 在真正执行时,会调用 prepareForExecution 将 sparkPlan 转换成 executedPlan,并在 sparkPlan 中执行过程中,如果出现 stage 分区规则不同时插入 Shuffle 操作以及进行一些数据格式转换操作等等:

// executedPlan should not be used to initialize any SparkPlan. It should be  // only used for execution.  lazy val executedPlan: SparkPlan = {    // We need to materialize the optimizedPlan here, before tracking the planning phase, to ensure    // that the optimization time is not counted as part of the planning phase.    assertOptimized()    executePhase(QueryPlanningTracker.PLANNING) {      // clone the plan to avoid sharing the plan instance between different stages like analyzing,      // optimizing and planning.      QueryExecution.prepareForExecution(preparations, sparkPlan.clone())    }  }
复制代码

最后,基于 executedPlan.execute 方法返回一个 RDD,之后 spark 任务就会对这个 RDD 进行操作,返回结果集。

lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(    executedPlan.execute(), sparkSession.sessionState.conf)

/** * Returns the result of this query as an RDD[InternalRow] by delegating to `doExecute` after * preparations. * * Concrete implementations of SparkPlan should override `doExecute`. */ final def execute(): RDD[InternalRow] = executeQuery { if (isCanonicalizedPlan) { throw new IllegalStateException("A canonicalized plan is not supposed to be executed.") } doExecute() }
复制代码

这里打印出生成的 sparkPlan:

spark.sql("select name from people where age >= 20")
sparkPlan: Project [name#3]+- Filter (age#4L >= 20) +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, TestSpark$Person, true]).name, true, false) AS name#3, assertnotnull(input[0, TestSpark$Person, true]).age AS age#4L] +- Scan[obj#2]
复制代码

3. 小结

本文把 SparkSQL 的解析流程大致的介绍了一遍,大致可以划分为解析->绑定->优化->逻辑计划转物理计划->预准备->生成 rdd,与直接写 rdd 的最本质的区别 SparkSQL 的 Catalyst 会进行逻辑计划的优化 RBO(Rule-Based Optimizer),当然 Spark 在具体的执行的时候,还会执行代价优化--CBO(Cost-Based Optimizer,CBO);而另外一个实时计算框架 Flink SQL 的引擎采用的是 Apache Calcite(https://calcite.apache.org/)实现支持 SQL 语句的解析和验证;HBase 可以通过 Apache Phoenix(http://phoenix.apache.org/)实现 SQL 驱动,后续有时间也研究总结一下。


参考链接:

发布于: 2021 年 07 月 08 日阅读数: 85
用户头像

永健_何

关注

水平不高,始终坚持 2018.10.21 加入

还未添加个人简介

评论

发布
暂无评论
SparkSQL内核剖析