Spark SQL 分析层优化
导读:本期是《深入浅出 Apache Spark》系列分享的第四期分享,第一期分享了 Spark core 的概念、原理和架构,第二期分享了 Spark SQL 的概念和原理,第三期则为 Spark SQL 解析层的原理和优化案例。本次分享内容主要是 Spark SQL 分析层的原理和优化的案例,且此优化案例是对于理解分析层原理很重要的。
本期介绍会围绕下面五点展开:
前情提要
Spark SQL 分析层原理
优化案例
总结
Q&A
►►►
前情提要
首先介绍数新智能与 Spark 有深度关系的两个产品。
赛博数智引擎,即 CyberEngine,它可以帮助用户管理、部署、运维各种组件。该引擎旨在打造一款云原生的数据服务底座,同时支持各种任务的调度,包括存储等各方面的管理和运维工作。
赛博数据平台,包括数据治理、数据开发、数据探索、数据查询等各种能力,其中 Spark 是它支持的重要数据引擎之一。
上一讲主要介绍了 Spark SQL 解析层的优化。Spark SQL 解析层实际上是一条 SQL 执行生命周期中的第一个阶段,首先它需要被解析成一棵抽象语法树。解析后的抽象语法树由于缺少与元数据的绑定,所以我们无法知道 UnresolvedAttribute 是一个数据库表的列,还是 Parquet 文件中的一个元数据字段;我们也无法知道或者是一 UnresolvedRelation 是一张数据库表,还是 HDFS 上的文件目录。这些分析依赖于 Spark SQL 的分析层,也就是本次要分享的内容。
►►►
Spark SQL 分析层原理
1. Spark SQL 分析层原理
刚才回顾了解析层,或者说是 Spark 里面的 Parser,它能够把输入的 SQL 文本转换成一棵解析后的抽象语法树,那么下一步就需要分析层 Analyzer,即分析器对抽象语法树进行分析,把抽象语法树跟元数据信息进行绑定,知道数据的位置信息、元数据信息等等,之后才能有机会真正地查询、执行或者修改它。
分析器分析的过程依赖于一些元数据的组件,如 SessionCatalog ,在 SQL 组件中属于元老级的组件。在 Spark 2.5 版本中引入的 Data Source V2 中一个很重要的组件叫 CatalogManager,其具有用户注册机制,可以让用户注册自己的 Catalog。可以理解为用户通过使用 Spark 内置的 SessionCatalog,就可以拥有自己的 Catalog。在多个 Catalog 注册的情况下,可以实现多个数据源之间的联邦,拥有更丰富的业务场景,更加灵活。
有些人觉得 Spark 内置的 SessionCatalog 对于 Hive 的支持会有一些局限,那么就可以用 CatalogManager 的方式去实现。 不管是老的 SessionCatalog,还是新的 CatalogManager,实际上最重要的是对 Catalog、数据库、表、字段等元数据信息的管理。当然这里提到的表只是从 Catalog 的角度抽象的一个概念,实际上它未必是一个真的表,例如一个以 Kafka 作为数据源的 Catalog,你可以把 Kafka 里面的一个 Topic 抽象成一个表,或者也可以根据这个逻辑把 Topic 看作是一个数据库。总之,这是一个很灵活、很便利的新扩展,是一个为了帮助分析器获取元数据信息的组件。
在分析层中除了解决元数据信息的绑定之外,还需要去解决一些其他的问题,如内置函数的绑定。比如在 SQL 里面包含一些函数,这些函数有些是无参数的,有些是带有多个参数的。那么这些函数是不是都是 Spark 本身就支持的呢?如果输入的是一个 ClickHouse 的函数呢?因为有些函数肯定是 Spark 不支持或不认识的,这是 Spark 在分析过程中首先要去分析的内容,它不能跳过分析直接去执行这个函数。
对函数的分析过程,Spark 依赖于 FunctionRegistry 这个函数注册表组件,函数注册表里面绑定了一些 Spark 支持的内置函数。如果用户想要实现一些 Spark 没有但想要扩展的功能的时候,Spark 也提供了用户自定义函数的支持,有新旧两套接口去实现 UDF。当用户自己去实现 UDF 的时候,实际上是注册到整个 Spark 的元数据体系里面,或者说是函数的体系里面,分析器首先会从函数注册表里去查找相关函数,来确保 SQL 执行的合法性和安全性。这里介绍了分析层在 SQL 整个执行流程中的作用,接下里会从更深入的角度去介绍分析层的一些内容。
刚刚提到解析后的查询树或者说逻辑执行树、抽象语法树需要跟元数据绑定,分析器就是负责这些处理的角色。
之前在第一、二讲中,整体介绍 Spark SQL 的执行流程时,介绍过两个很核心的抽象概念——规则(即 Rule)和规则执行器(即 RuleExecutor)。本次分享分析器,以及下一讲将要分享的优化器,实际上就是规则执行器的具体应用。分析器里面的每一个分析规则、优化规则都是规则的实现。规则执行器能够帮助逻辑计划去应用这些规则,这就是规则执行器的简单原理。
当然规则还需要一定的匹配过程,比如有一些规则要解析表或者数据源的位置,有一些可能是为了识别某些属性是一个别名还是表的名字、表的字段,每个规则都有具体的针对性工作。
最后需要提一提——规则的执行是有细节的。在第二讲中分享规则的时候,提到除了规则信息的抽象之外,就是规则的执行。例如处理抽象语法树,对整棵树应用一个规则的时候,它需要一定的封装逻辑。
在 Spark SQL 的执行流程中,分析器本身就是规则执行器的具体实现,它里面会应用很多规则,这些规则可能是分批的,每一批里面又有很多规则,这些规则有些可能只执行一次,有些可能执行多次。而后分析器会对解析后的语法树进行处理,也可以理解是应用了一系列分析规则之后,它就会变成分析后的逻辑语法树。
2. 分析规则介绍
本节介绍一个简单的例子,这个例子可以帮助大家快速了解如何去写一个规则,为将来工作中使用 Spark SQL 对一些业务场景进行增强分析提供参考。
例如 ResolveRelations 分析规则,如 select xxx from Tab1,解析层会把这个 SQL 解析成抽象语法树,这里 Tab1 是一张真的表,但是解析器是不知道的,它只会把它简单地封装成一个 UnresolvedRelation,即未分析的 Relation,那么这个 Relation 究竟是一个表还是其他什么东西,是需要分析器来处理。分析器主要就是依靠 ResolveRelations 这个分析规则进行处理,上图展示了分析规则的代码片段,实际这个规则的代码远比展示的要多,但因为其他代码对本次介绍的内容来说是一些干扰,所以就忽略了。从代码片段可以看到它调用了 plan.resolveOperatorsUpWithPruning 的遍历算法,resolveOperatorsUp 是一个自底向上的过程,如果跟树的遍历算法类比的话,可以理解为它是深度遍历的一种方式。
首先它会帮你找到这棵树的第一个叶子节点,但它不会立刻去执行计算,它还会往树的更深处去找,直到找到最后的叶子节点,等这个叶子节点处理完了之后才会往上卷,这个就是对应的深度遍历算法。但是既然 Up 了,为什么后面还有一个 WithPruning,Pruning 的中文意思是裁剪,这里为什么要这么用,后面再详细介绍。
抛开这个遍历的过程,深度遍历的方式会找到每一个 UnresolvedRelation,每找到一个还不知道具体是干什么的 relation 之后,就会首先去调用 ResolveRelation 的方法,去分析 Tab1 是一个文件目录还是一个表,或者是其他什么,如果调用完之后发现都不是,可能会返回一个 none,代表它不是一个表,也不是 Hadoop 里面的一个目录。之后就会交给一个叫 ResolveViews 的方法,因为它可能是一个视图,所以调用 ResolveViews 的方法去获取 views 的元信息。ResolveViews 方法的代码在这里没有展示,因为对本次的主题来说不是很重要,本次主要是想展示 ResolveRelations 这个分析规则模板实现的套路。
►►►
优化案例
1. 优化前 - 3.2.0 以前的 Rule 的全面遍历
针对树的遍历,Spark 3.2.0 版本引入了一个比较大的优化,这个优化对于分析层来说也是相当重要。
刚刚在介绍 ResolveRelations 时,采用了 Spark 3.2.0 之后的代码,其中使用的是带有 Withpruning 后缀的 operatorsUpWithpruning 方法,表示有裁剪的意思,那么它究竟是怎么裁剪的呢?
回到 Spark 3.2.0 以前这块代码的实现。 Spark 在分析层最主要依赖于以下几个方法,在 3.2.0 版本之前 resolveOperators 方法默认调用 resolveOperatorsDown 方法,resolveOperatorsDown 是一个广度优先的遍历方法,此外,还有采用深度遍历的 resolveOperatorsUp 方法,这是 Spark 3.2.0 以前分析层遍历树的基本原理。在应用规则实现遍历树的时候,不管是遍历整棵树,还是随着遍历的层次不同,遍历整个树的一部分或者是一个叶子节点,都会完完整整地去遍历。可以理解为这棵树不管有多深,每一层有多少个节点,在遍历这棵树的时候,它会把规则应用到整棵树的每一个节点。
Spark SQL 中有很多分析规则,有些分析规则可能只针对特殊的模式或者特殊的场景,如果用户写的 SQL 跟这个规则完全没有关系,并且 SQL 本身很复杂,对应的树的深度很高、体量庞大,如果不断地应用这个规则去执行,就会浪费大量的 CPU 时钟。
resolveOperators 等三个方法,主要是针对整个逻辑计划层面,而逻辑计划是一个树的结构,除了逻辑计划之外,Spark 里面的表达式本身也是树的结构。举个最简单的例子,例如在 where 条件里输入 a>10 and b<0,这就是由两个表达式用 and 组成的一棵树,除了 and 组成树之外,a > 10 本身也是颗树,大于号是一个父节点,大于号下面的 a 字段和 10 属性值是两个叶子。
在 Spark 3.2.0 之前,表达式的转换通过上图左边展示的 transformExpressions、transformExpressionsUp 和 transformExpressionsDown。 而对逻辑计划的处理则依赖上图右边展示的 resolveOperators、resolveOperatorsUp 和 resolveOperatorsDown。
2.优化后 - Support travelsal pruning in transform/resolve
Spark 3.2.0 之后的这个优化,我个人认为是分析层自 Spark SQL 诞生以来做的最重要的优化之一。大家有兴趣可以去看 Spark 社区任务里面一些具体的讨论,包括推进的过程和 commit 的一些代码。
这个优化主要包括以下三部分:首先是 TreePattern,它是一个枚举值,不管是表达式、还是逻辑计划,它都会匹配对应的模式去实现;第二是 TreePatternBits,是一个可以便于 TreePattern 快速查找的数据结构;第三是带有裁剪功能的 Transform 函数。
3.TreePattern
TreePattern 是一个用 Scala 实现的枚举类模板,实际的枚举类会非常多,但本次为了介绍方便,只列举了几个枚举值。例如 AGGREGATE_EXPRESSION,它跟在 Spark 表达式里面很重要的聚合表达式相关。如果表达式里面包含别名,那就会用到 ALIAS。再比如上文列举的 and 逻辑运算表达式,这里也会有 AND 和 and 表达式相对应。
本次分享的最后会具体介绍 EXPRESSION_WITH_RANDOM_SEED 枚举值。
4.TreePatternBits
这个接口的出现是为了能够快速地帮助一个树形结构(不管是逻辑计划的树,还是一个表达式的树)与具体的 Treepattern 进行匹配校验。从这个 trait 的代码可以看到它里面用了 BitSet 这个通过位运算来进行快速判别的数据结构。实际上每个 TreePattern 本身都有一个 ID 属性,可以通过 ID 来进行判定。比如这里有三个方法,第一个方法叫 containsPattern,这个方法的作用就判定当前这棵树有没有某个 pattern,如果有这个 pattern 才会处理。比如刚介绍的 ResolveRelations,它肯定会有与 relation 相关的 pattern,只有找到逻辑树里面与 relation 对应的 pattern 之后,才会进一步去执行规则。
当然,有些条件可能比较苛刻,比如像优化器层做优化,有 Limit 算子(字面上可以理解为 SQL 里的 limit)、Sort 算子(字面上可以理解为 SQL 里的 order by)。 当 order by 加 limit 同时出现时,就能应用很多优化规则。如果只有 limit 出现,或者只有 order by 出现,那它就不需要应用这个规则。所以第二个方法叫 containsAllPatterns,当同时满足多个 pattern 时,或者可以理解为通过它快速判断语法树同时有多个节点时,才会去应用某个规则。
第三种方法是 containsAnyPattern,当出现任何一种情况时,就可以应用某个规则。
据我理解,上面提到的 Limit 算子、Sort 算子、Aggregation 算子以及各种各样的表达式在 Spark 3.2.0 之后,绝大部分或者说全部都已经实现了这个特质。
5.带有裁剪功能的 Transform 函数
上文讲 resolveOperators 方法的时候,它里面默认是调用 resolveOperatorsDown,这是 Spark 3.2.0 之前的方法,而现在已经换成了 resolveOperatorsWithPruning,虽然还没有体现遍历树的方式,也没有说是深度还是广度,但已经开始去调用这个带有裁剪功能的解析操作了。可以看到 resolveOperatorsWithPruning 里面默认调用了 resolveOperatorsDownWithPruning,从深度遍历和广度遍历的角度来说,这个逻辑还是不变的,只不过它替换成了带有裁剪功能的一个新的实现或者调用而已。同样 resolveOperatorsUp 也替换成了 resolveOperatorsUpWithPruning。 这样也是为了兼容 Spark 之前的老版本,以确保用户升级到 SPARK 3.2.0 后原来生产的代码也可以正常执行,保证用户的可迁移性。
6.resolveOperatorsDownWithPruning
以 resolveOperatorsDownWithPruning 为例来深入的剖析裁剪具体是怎么做到的,为什么要裁剪?之前有提到在 Spark 3.2.0 之前,不管语法树跟某个规则有没有关系,规则都会应用一遍,因此浪费了大量 CPU 时钟。
而从 resolveOperatorsDownWithPruning 的代码来看,这个方法的签名带有一个 condition,这个 condition 就是以 TreePatternBits 特质为入参,也可以认为这里传递进来的是一个逻辑节点或一个逻辑计划树,然后返回一个布尔值。 就是说 condition 会判定一棵树符不符合这个规则,是否需要把这个规则应用到这棵树上,代码中 cond.apply(self) 意思是应用到它自己,把规则本身作为参数传递给一个匿名函数,传递完之后,TreePatternBits 来判断到底有没有符合这个树的 TreePattern。只有在符合了这个条件之后,才会进到最外层 if 的框里,接下来后面还有一个 isRuleIneffective,进入这个方法体之后,它会对里面的叶子节点进行处理,即调用 resolveOperatorsDownWithPruning,处理完之后,再一层一层地往里调用。可以理解为把整个树的子树一层一层的传递给 resolveOperatorsDownWithPruning。
所以基于现在这个逻辑,所有的树和子树都会被传递,但不一定会应用某个规则,而之前就是什么都不管就直接开始执行分析规则,这就是它们优化前后的区别。
7.resolveOperatorsUpWithPruning
resolveOperatorsUpWithPruning 方法和 resolveOperatorsDownWithPruning 其实大同小异,都是通过 condition 或者说实现了 treepatternbits 特质的逻辑计划节点来作为入参,然后来判断每个 pattern 是否满足规则。
以上两个方法的介绍中,没有对代码中的 isRuleIneffective(ruleId) 和 markRuleAsIneffective(ruleId) 进行介绍,其实它们跟模式匹配这种裁剪的方式在思想上是类似的,它们能够进一步帮助处理裁剪的规则,只不过方法实现上有所区别。这两个就留给感兴趣的读者自己去看看它们的实现。
8.优化后表达式树的转换
上面着重讲了 resolve 裁剪的实现,其对应的表达式也有相应的变化,比如左侧这三个新的函数或者接口都带有 WithPruning。表达式的优化也是类似的,根据之前对逻辑计划的裁剪优化的分析,相信看懂它不会有什么问题,感兴趣的读者可以去看看它的实现过程。
9.RevolveRandomSeed
上面给大家分享了本次优化带来的三个新的组件,一是 TreePattern 枚举类,二是 TreePatternBits 特质,可以快速判别一个逻辑计划节点跟某一个规则是否匹配的一个接口,三是增加了一批便利的带有裁剪功能的转换实现。那么这三者最后是怎么应用到一起呢?我从 Spark 的源码中挑了一个逻辑较为简单的 RevolveRandomSeed 规则来讲解。
从这个规则的名字可以看到它是一个解决随机种子表达式的特殊分析处理规则。从这个分析规则的代码中可以看到,它把之前的 resolveOperatorsUp 方法替换成为了带有裁剪功能的 resolveOperatorsUpWithPruning,同时传递参数的时候,调用了 containsPattern,这个 contentsPattern 就是上面提的那个数据结构特质里面用于快速判别一个逻辑计划跟一个模式是否匹配的数据结构,通过它来快速判断这个逻辑计划里面包不包含 EXPRESSION_WITH_RANDOM_SEED 模式。如果包含了,才会走里边的逻辑,如果不包含,那么就直接返回,不会有过多的消耗。
再往下看就是针对逻辑计划的介绍,可以看到进入这个分析规则的内部,以前的 transformExpressionsUp 现在也替换成了带有裁剪功能的 transformExpressionsUpWithPruning,并调用了 containsPattern 的方法来进行判断。
以上这个例子我相信已经足够简单的让大家去理解 Spark 3.2.0 版本带来的分析层、优化层的重大优化。
►►►
总结
以上就是本次分析层的介绍。最后这个标题叫 Stop earlier without traversing the entire tree,感兴趣的读者可以去 Spark 官网查看其详细内容,在此不做展开介绍。
►►►
Q&A
Q1:在实际应用中应该如何监控和调试 Spark SQL 查询计划,以确保 ResolveRelations 规则的正确执行?
A:ResultRelations 是 Spark 分析器里很重要的一个分析规则,它能分析 relation 对应的这个文件包括哪些信息,这些都是 Spark 社区经过了大量增强的一个很健壮的东西。 怎么监控它能正确执行?假如它不能够正确执行,那可能就是 Spark 的 bug 了,可以反馈给 Spark 社区,让 Spark 社区去解决。
Q2:逻辑计划算子树规则应用的优化效果,在批处理的背景下,用户应该是无感知的吗?
A:针对用户的 SQL 场景,大多数情况下用户肯定是无感知的,因为这块优化毕竟只是针对逻辑计划进行的优化,逻辑计划优化带来的性能提升可能没有针对物理算子进行的优化带来的提升那么明显,但是也不全然如此。比如说用到一些服务化的 Spark 的时候,如 HiveThriftServer2,Kyuubi 等长运行周期的服务,如果它要处理的 SQL 并发很高,且用户的 SQL 本身又很复杂,那么这种优化用户应该还是有感知的。
Q3:同样的数据量和运算量,用 Spark 做离线批量计算,用 Flink 做实时流计算,能节省多少资源,资源主要是节省在哪些地方?
A:我觉得节约最主要还是离线批量计算本身带来的优势。例如网络传输消息,从时效性来说,如果只传输一条信息,它会立即去传输,时效比较快,而从整个批量的角度来说,每传一个消息,就有一个数据包,这些消息加起来的数据量实际上比成批去传输的数据量要更大,如果网络的连接不是长连接而是短连接,还要大量的在连接建立方面的开销。所以 Spark 或者 Hive 批处理场景的诞生就是为了提高数据执行的处理效率,包括节省计算资源、提高数据的吞吐量等。从这个角度来说,按 Filnk 现在的发展也可以按批去处理一些数据,并不是来一条消息就立马下发的模式,实际上它现在也有消息的缓存,不管它是发送端还是接收端,都有缓存队列这种类似批的概念。当然具体的资源节省可能还是需要用实际的数据进行测试,以上只是偏理论的分析。
Q4:逻辑计划优化,除了常见的谓词下推、列剪裁、常量累加,还有哪些优化?
A:其实优化是非常多的,比如你刚刚说的谓词下推,还有 Project 下推、聚合下推。而常量,除了你说的那种常量之外,还有对 random 表达式进行比较运算的优化,Limit 下推、Limit 和 Sort 这种 top n 场景的优化,对规则的重写优化等等。
▌公司简介
浙江数新网络有限公司是一家专注于多云数据智能平台和数据价值流通的服务商。公司总部位于杭州,在上海、北京、深圳等各地设有分支机构,服务网络覆盖全国各区域,客户遍布全球 50+ 城市。数新智能自成立以来就在人工智能领域进行了深入的探索,已有成熟的产品、基于场景的解决方案及不同行业的成功案例。帮助金融、能源电力等行业相关企业实现数字化、智能化转型,提升企业新质生产力。
数新智能自主研发的一站式多云数据智能平台,主要包括赛博数智引擎 CyberEngine、赛博数据平台 CyberData、赛博智能平台 CyberAI,可提供基于大数据的大模型调优、深度学习、价值流通等多种服务。数新智能自主研发的赛博数智引擎 CyberEngine 基于开源开放的设计理念,兼容开源引擎并进行深度优化,开放式架构支持主流引擎生态,支持多元异构引擎灵活插拔,支持流批一体、湖仓一体、数智一体等场景化能力。在此基础上,CyberEngine 以 Spark、Flink 作为主计算引擎,以 Spark 为例,基于 Spark 实现数新智能的流批引擎、统一查询引擎,在性能、稳定性、云原生化等方面全面优于社区开源版本。
▌数新网络高级架构专家 Spark Committer
耿嘉安
2014 阿里巴巴御膳房主力开发
2016 软件开发 &大数据开发,出版畅销书籍《深入理解 Spark 》
2016 艺龙网大数据架构师,主导开发大数据平台
2017 360 大数据专家,出版畅销书籍《 Spark 内核设计的艺术》
2018 360 高级大数据专家,主导开发 XSQL 查询平台
2020 麒麟高级性能专家,主导 Kylin 执行引擎加速
2024 数新网络高级架构专家
评论