SparkSQL 内核剖析
概述:
之前有尝试在 SparkSQL 内核添加自定义 SQL 操作不同的底层数据源,实现计算分析任务,这里就对 SparkSQL 的 Catalyst 模块进行简要的分析。在早期大数据时代的大规模处理数据的技术是 Hadoop 提供的 MapReduce 任务,但这种框架执行效率太慢,进行一些关系型处理(如 join)需要编写大量代码。后来 hive 这种框架可以让用户输入 sql 语句,自动进行优化并执行,降低了写 MR 代码的成本,同样对于早期的 Spark 来说,用户需要通过写 RDD 来完成执行逻辑,这样就使代码可读性不高,并且具体的执行逻辑不是最优的,会影响 Spark 任务的运行效率。
1. SparkSQL 的基本介绍:
下图为论文里提供的一张 SparkSQL 的架构图:
它提供了一个叫做 DataFrames 的可编程抽象数据模型,并且可被视为一个分布式的 SQL 查询引擎。对外提供 SQL 的操作方式主要为 JDBC 数据源,CLI shell 和 Programs 三种;而 SQL 解析,优化以及运行都是由 SparkSQL Catalyst 模块完成,最终转化为相应的 Spark Rdd 执行计算任务。
2. SparkSQL 逻辑计划概述:
SQL 主要由 Projection(filedA,fieldB,fieldC),DataSource(tableA)和 Filter(fieldA>10)三个部分组成,分别对应 SQL 查询过程中的 Result,DataSource 和 Operation:
实际的 SQL 执行顺序过程是按照 Opertaion->DataSouece->Result 的顺序,刚好与 SQL 的语法刚好相反,具体包括:
首先进行词法和语法 Parse,对输入的 SQL 语句进行解析,确定语句中哪些是关键词(SELECT、FROM 和 WHERE),哪些是 Peojection,哪些是 DataSource 等,判断 SQL 是否规范,并生成逻辑计划 Logical plan;
将 SQL 预计和数据库字典进行 Bind,如果 Projection 和 DataSource 都成功绑定,即这条 SQL 可执行;
进行 SQL 执行的 Optimize,选择一条最优的执行计划执行;
按照最优的 Exectue Plan 执行 SQL,从数据库中查询结果集返回。
在 SparkSQL 中同样会先将 SQL 语句进行 Parse 形成一个 Tree,然后使用 Rule 对 Tree 进行绑定,优化等处理过程(这里通过匹配模式对不同的节点采用不同的操作)。这个最核心的过程就是由 Spark 的 Catalyst 负责完成 SQL 的解析,绑定,优化以及生成物理计划。SparkSQL 模块主要由 core,catalyst,hive 和 hive-thriftserver 组成:
core:负责处理数据的输入/输出,从数据源获取数据,输出 DataFrame;
catalyst:SQL 的解析,绑定,优化以及生成物理计划
hive:负责对 hive 数据的处理
hive-thriftserver:提供CLI 和 JDBC 接口等。
论论文 SparkSQL Catalyst 的解析流程图:
SQL 语句经过 Antlr4 解析,生成 Unresolved Logical Plan
analyzer 与 catalog 进行绑定,生成 Logical Plan
optimizer 对 Logical Plan 优化,生成 Optimized LogicalPlan
SparkPlan 将 Optimized LogicalPlan 转换成 Physical Plan
prepareForExecution 方法将 Physical Plan 转换成 executed Physical Plan
execute()执行可执行物理计划,得到 RDD
2.1 SQL Parse
SparkSQL 的采用的是 Anrl4 进行 SQL 的词法和语法解析(Spark SQL 和 Presto 采用的是 Antlr4,FLink 采用的是 Calcite),Anrl4 主要提供了 Parser 编译器和 Translator 解释器框架。
在 Spark 源码中提供了一个.g4 文件,编译的时候会使用 Antlr 根据这个.g4 生成对应的词法分析类和语法分析类,采用访问者模式,即会去遍历生成的语法树(针对语法树中每个节点生成一个 visit 方法),以及返回相应的值,用以构建 Logical Plan 语法树。
SparkSQL 使用 Antlr4 的访问者模式,生成 Unresolved Logical Plan。这里,可以用 IDEA ANTLR Preview 插件可以看到到SQL 解析后生成的语法树,譬如:
转换成一棵语法树的可视图,SparkBase.g4 文件还有很多其他类型的语句,比如 INSERT,ALERT 等等。
其中,LogicalPlan 其实是继承自 TreeNode,所以本质上 LogicalPlan 就是一棵树。Tree 提供 UnaryNode,BinaryNode 和 LeafNode 三种 trait:
LeafNode,叶子节点,一般用来表示用户命令
UnaryNode,一元节点,表示 FILTER 等操作
BinaryNode,二元节点,表示 JOIN,GROUP BY 等操作
譬如上述 SQL 在 parse 阶段使用 antlr4,将一条 SQL 语句解析成语法树,然后使用 antlr4 的访问者模式遍历生成语法树,也就是 Logical Plan,但此时还是 Unresolved LogicalPlan,即无法确定 src 是否存在,以及具体的的元数据是什么样。没有通过 Analysis 阶段,无法确定 A 的具体类型以及 TABLE 这个数据源是否存在等信息,只有通过 Analysis 阶段后,才会把 Unresolved 变成 Resolved LogicalPlan。
2.2 Analyzed
在 Analysis 阶段,使用 Analysis Rules 结合 SeesionCatalog 元数据,对会将 Unresolved LogicalPlan 进行解析,生成 Resolved LogicalPlan 的。Spark SQL 通过使用 Catalyst rule 和 Catalog 来跟踪数据源的 table 信息。这个阶段核心处理类是 Analyzer 类,自身实现大量的 rule,然后注册到 batch 变量中:
具体调用 RuleExecutor 的 execute 方法串行执行这些 Rule,匹配 UnresolvedRelation,然后递归去 Catlog 中获取对应的元数据信息,递归将它及子节点变成 Resoulved。
可以看到经过 Analyzed 分析后,Parse 阶段的 Unresolved LogicalPlan 进行展开,每一步的操作都与 Session Catalog 中的元数据进行了绑定,这个就是一条可执行的逻辑计划,即 Resolved LogicalPlan。
2.3 Optimized
上面经过 Analyzer 阶段后,即生成了可执行的 Logical Plan,但 SparkSQL 不会立即转化为 Spark Plan 执行,而是会对生成 Resolved Logical Plan 执行逻辑进行优化,提高运代码行效率,这个阶段叫 Optimized。Optimizer 优化器的实现和处理方式与上面 Analyzer 类似定义了一系列的 Rule,然后利用这些 Rule 对 Logical Plan 和 Expression 进行迭代处理,其中主要的优化策略是合并,列裁剪和谓词下推等。
Optimizer 的优化策略不仅对已绑定的 Logical PLan 进行优化,而且对 Logical Plan 中的 Expression 也进行优化,其原理就是遍历树,然后应用优化 Rule。
可以看到,经过 Optimizer 阶段,生成的 optimizedPlan 对比 Analyzer 阶段生成的 Resolved LogicalPlan 简化了很多,Project 部分只剩下 name 字段,其他没有可优化的地方保持不变。
2.4 Physical Plan
经过 Optimzer 的优化,SparkPlanner 这个类会使用 Strategies 将优化后的 Logical Plan 进行转换,生成可执行的 Physical Plan,相比较于 Logical Plan,Physical Plan 算是 Spark 能够执行的东西,这里 spkarPlan 就是相当于 Physical Plan。
这里传入一个逻辑计划,生成一个物理计划,即 SparkPlan:
这里 SparkSQL 在真正执行时,会调用 prepareForExecution 将 sparkPlan 转换成 executedPlan,并在 sparkPlan 中执行过程中,如果出现 stage 分区规则不同时插入 Shuffle 操作以及进行一些数据格式转换操作等等:
最后,基于 executedPlan.execute 方法返回一个 RDD,之后 spark 任务就会对这个 RDD 进行操作,返回结果集。
这里打印出生成的 sparkPlan:
3. 小结
本文把 SparkSQL 的解析流程大致的介绍了一遍,大致可以划分为解析->绑定->优化->逻辑计划转物理计划->预准备->生成 rdd,与直接写 rdd 的最本质的区别 SparkSQL 的 Catalyst 会进行逻辑计划的优化 RBO(Rule-Based Optimizer),当然 Spark 在具体的执行的时候,还会执行代价优化--CBO(Cost-Based Optimizer,CBO);而另外一个实时计算框架 Flink SQL 的引擎采用的是 Apache Calcite(https://calcite.apache.org/)实现支持 SQL 语句的解析和验证;HBase 可以通过 Apache Phoenix(http://phoenix.apache.org/)实现 SQL 驱动,后续有时间也研究总结一下。
参考链接:
spark 源码地址:https://github.com/apache/spark
https://amplab.cs.berkeley.edu/wp-content/uploads/2015/03/SparkSQLSigmod2015.pdf
《图解 Spark 核心技术与案例实践》
《SparkSQL 内核剖析》
版权声明: 本文为 InfoQ 作者【永健_何】的原创文章。
原文链接:【http://xie.infoq.cn/article/2a70e9fb993bed9bc9ed02c46】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论