写点什么

Spark 开源新特性:Catalyst 优化流程裁剪

发布于: 2021 年 07 月 23 日

​​摘要:为了解决过多依赖 Hive 的问题, SparkSQL 使用了一个新的 SQL 优化器替代 Hive 中的优化器, 这个优化器就是 Catalyst。


本文分享自华为云社区《Spark 开源新特性:Catalyst 优化流程裁剪》,作者:hzjturbo 。

1. 问题背景



上图是典型的 Spark Catalyst 优化器的布局,一条由用户输入的 SQL,到真实可调度执行的 RDD DAG 任务,需要经历以下五个阶段:


  • Parser: 将 SQL 解析成相应的抽象语法树(AST),spark 也称为 Unresolved Logical Plan;

  • Analyzer: 通过查找 Metadata 的 Catalog 信息,将 Unresolved Logical Plan 变为 Resolved Logical Plan,这个过程会做表、列、数据类型等做校验;

  • Optimizer: 逻辑优化流程,通过一些优化规则对匹配上的 Plan 做转换,得到优化后的逻辑 Plan

  • Planner:根据 Optimized Logical Plan 的统计信息等转换成相应的 Physical Plan

  • Query Execution: 主要是执行前的一些 preparations 优化,比如 AQE, Exchange Reuse, CodeGen stages 合并等


上述的五个阶段中,除了 Parser (由 Antlr 实现),其他的每个阶段都是由一个个规则(Rule)构成,总共大约有 200+个,对于不同的规则,还可能需要跑多次,所以对于相对比较复杂的查询,可能得到一个 executed Plan 都需要耗费数秒。


Databricks 内部基准测试表明,对于 TPC-DS 查询,每个查询平均调用树转换函数约 280k 次,这远远超出了必要的范围。因此,我们探索在每个树节点中嵌入 BitSet,以传递自身及其子树的信息,并利用计划不变性来修剪不必要的遍历。通过原型实现验证:在 TPC-DS 基准测试中,我们看到优化的速度约为 50%,分析的速度约为 30%,整个查询编译的速度约为 34%(包括 Hive 元存储 RPC 和文件列表)[1]。

2. 设计实现

2.1 Tree Pattern Bits and Rule Id Bits


  • Tree pattern bits

在 TreeNode 增加 nodePatterns 属性,所有继承该类的节点可以通过复写该属性值来标识自己的属性。


/** * @return a sequence of tree pattern enums in a TreeNode T. It does not include propagated *         patterns in the subtree of T. */protected val nodePatterns: Seq[TreePattern] = Seq()
复制代码


TreePattern 是一个枚举类型, 对于每个节点/表达式都可以为其设置一个 TreePattern 方便标识,具体可见 TreePatterns.scala 。

例如对于 Join 节点的 nodePatterns:


override val nodePatterns : Seq[TreePattern] = {  var patterns = Seq(JOIN)  joinType match {    case _: InnerLike => patterns = patterns :+ INNER_LIKE_JOIN    case LeftOuter | FullOuter | RightOuter => patterns = patterns :+ OUTER_JOIN    case LeftSemiOrAnti(_) => patterns = patterns :+ LEFT_SEMI_OR_ANTI_JOIN    case NaturalJoin(_) | UsingJoin(_, _) => patterns = patterns :+ NATURAL_LIKE_JOIN    case _ =>  }  patterns}
复制代码


  • Rule ID bits

将规则 ID 的缓存 BitSet 嵌入到每个树/表达式节点 T 中,这样我们就可以跟踪规则 R 对于根植于 T 的子树是有效还是无效。这样,如果 R 在 T 上被调用,并且已知 R 无效,如果 R 再次应用于 T(例如,R 位于定点规则批处理中),我们可以跳过它。这个想法最初被用于 Cascades optimizer,以加快探索性规划。

Rule:


abstract class Rule[TreeType <: TreeNode[_]] extends SQLConfHelper with Logging {
// The integer id of a rule, for pruning unnecessary tree traversals. protected lazy val ruleId = RuleIdCollection.getRuleId(this.ruleName)
复制代码


TreeNode:


/** * A BitSet of rule ids to record ineffective rules for this TreeNode and its subtree. * If a rule R (which does not read a varying, external state for each invocation) is * ineffective in one apply call for this TreeNode and its subtree, R will still be * ineffective for subsequent apply calls on this tree because query plan structures are * immutable. */private val ineffectiveRules: BitSet = new BitSet(RuleIdCollection.NumRules)
复制代码

2.2 Changes to The Transform Function Family


改造后的 transform 方法相比之前的多了两个判断,如下所示


def transformDownWithPruning(  cond: TreePatternBits => Boolean, // 判断是否存在可优化的节点,由规则设计者所提供  ruleId: RuleId = UnknownRuleId // 不会生效的规则ID,自动更新	)(rule: PartialFunction[BaseType, BaseType]): BaseType = {  // 如果上述两个条件存在一个不满足,直接跳过本次规则  if (!cond.apply(this) || isRuleIneffective(ruleId)) {    return this  }  // 执行rule的逻辑  val afterRule = CurrentOrigin.withOrigin(origin) {    rule.applyOrElse(this, identity[BaseType])  }
// Check if unchanged and then possibly return old copy to avoid gc churn. if (this fastEquals afterRule) { val rewritten_plan = mapChildren(_.transformDownWithPruning(cond, ruleId)(rule)) // 如果没生效,把规则ID加入到不生效的BitSet里 if (this eq rewritten_plan) { markRuleAsIneffective(ruleId) this } else { rewritten_plan } } else { // If the transform function replaces this node with a new one, carry over the tags. afterRule.copyTagsFrom(this) afterRule.mapChildren(_.transformDownWithPruning(cond, ruleId)(rule)) }}
复制代码

2.3 Changes to An Individual Rule


规则的例子:


object OptimizeIn extends Rule[LogicalPlan] with SQLConfHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform ({   case q: LogicalPlan => q transformExpressionsDown ({     case In(v, list) if list.isEmpty => ...     case expr @ In(v, list) if expr.inSetConvertible => ...   }, _.containsPattern(IN), ruleId) // 必须包含IN }, _.containsPattern(IN), ruleId) // 必须包含IN}
复制代码

3. 测试结果


在 Delta 中使用 TPC-DS SF10 对 TPC-DS 查询编译时间进行了基准测试。结果如下:

  • 图 1 显示了查询编译速度;

  • 表 1 显示了几个关键树遍历函数的调用计数和 CPU 减少的细分。




我简单运行了开版本的 TPCDSQuerySuite,该测试会把 TPCDS 的语句解析优化,并且检查下生成的代码(CodeGen),平均耗时的时间为三次运行得到的最优值, 得到的结果如下:


  • 合入 PR 前[2], 包含 156 个 Tpcds 查询,平均总耗时~56s

  • 最新 Spark 开源代码,包含 150 个 Tpcds 查询,平均总耗时~19s


之所以最新的 Tpcds 查询比合入 PR 前的条数少 6 条,是因为后续有个减少重复 TPCDS 的 PR。总时长优化前是优化后的两倍多。

参考引用


[1]. [SPARK-34916] Tree Traversal Pruning for Catalyst Transform/Resolve Function Families. SISP

[2]. [SPARK-35544][SQL] Add tree pattern pruning to Analyzer rules.

[3]. Building a SIMD Supported Vectorized Native Engine for Spark SQL. link


点击关注,第一时间了解华为云新鲜技术~

发布于: 2021 年 07 月 23 日阅读数: 1170
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
Spark 开源新特性:Catalyst 优化流程裁剪