写点什么

袋鼠云数栈基于 CBO 在 Spark SQL 优化上的探索

作者:数栈DTinsight
  • 2022 年 8 月 11 日
    浙江
  • 本文字数:4180 字

    阅读完需:约 14 分钟

一、Spark SQL CBO 选型背景

Spark SQL 的优化器有两种优化方式:一种是基于规则的优化方式(Rule-Based Optimizer,简称为 RBO);另一种是基于代价的优化方式(Cost-Based Optimizer,简称为 CBO)。

1、RBO 是传统的 SQL 优化技术

RBO 是发展比较早且比较成熟的一项 SQL 优化技术,它按照制定好的一系列优化规则对 SQL 语法表达式进行转换,最终生成一个最优的执行计划。RBO 属于一种经验式的优化方法,严格按照既定的规则顺序进行匹配,所以不同的 SQL 写法直接决定执行效率不同。且 RBO 对数据不敏感,在表大小固定的情况下,无论中间结果数据怎么变化,只要 SQL 保持不变,生成的执行计划就都是固定的。

2、CBO 是 RBO 改进演化的优化方式

CBO 是对 RBO 改进演化的优化方式,它能根据优化规则对关系表达式进行转换,生成多个执行计划,在根据统计信息(Statistics)和代价模型(Cost Model)计算得出代价最小的物理执行计划。

3、 CBO 与 RBO 优势对比

● RBO 优化例子


下面我们来看一个例子:计算 t1 表(大小为:2G)和 t2 表(大小为:1.8G)join 后的行数



上图是:


SELECT COUNT(t1.id) FROM t1 JOIN t2 ON t1.id = t2.id WHERE t1.age > 24


基于 RBO 优化后生成的物理执行计划图。在图中我们可以看出,执行计划最后是选用 SortMergeJoin ⑴ 进行两个表 join 的。


在 Spark 中,join 的实现有三种:


1.Broadcast Join


2.ShuffleHash Join


3.SortMerge Join


ShuffleHash Join 和 SortMerge Join 都需要 shuffle,相对 Broadcast Join 来说代价要大很多,如果选用 Broadcast Join 则需要满足有一张表的大小是小于等于


spark.sql.autoBroadcastJoinThreshold 的大小(默认为 10M)。


而我们再看,上图的执行计划 t1 表,原表大小 2G 过滤后 10M,t2 表原表大小 1.8G 过滤后 1.5G。这说明 RBO 优化器不关心中间数据的变化,仅根据原表大小进行 join 的选择了 SortMergeJoin 作为最终的 join,显然这得到的执行计划不是最优的。


● CBO 优化例子


而使用 CBO 优化器得到的执行计划图如下:



我们不难看出,CBO 优化器充分考虑到中间结果,感知到中间结果的变化满足能 Broadcast Join 的条件,所以生成的最终执行计划会选择 Broadcast Join 来进行两个表 join。


● 其他优势


其实除了刻板的执行导致不能得到最优解的问题,RBO 还有学习成本高的问题:开发人员需要熟悉大部分优化规则,否则写出来的 SQL 性能可能会很差。


● CBO 是数栈 Spark SQL 优化的更佳选择


相对于 RBO,CBO 无疑是更好的选择,它使 Spark SQL 的性能提升上了一个新台阶,Spark 作为数栈平台底层非常重要的组件之一,承载着离线开发平台上大部分任务,做好 Spark 的优化也将推动着数栈在使用上更加高效易用。所以数栈选择 CBO 做研究探索,由此进一步提高数栈产品性能。

二、Spark SQL CBO 实现原理

Spark SQL 中实现 CBO 的步骤分为两大部分,第一部分是统计信息收集,第二部分是成本估算:

1、统计信息收集

统计信息收集分为两个部分:第一部分是原始表信息统计、第二部分是中间算子的信息统计。

1)原始表信息统计

Spark 中,通过增加新的 SQL 语法 ANALYZE TABLE 来用于统计原始表信息。原始表统计信息分为表级别和列级别两大类,具体的执行如下所示:


● 表级别统计信息


通过执行 ANALYZE TABLE table_name COMPUTE STATISTICS 语句来收集,统计指标包括 estimatedSize 解压后数据的大小、rowCount 数据总条数等。


● 列级别统计信息


通过执行 ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS column-name1, column-name2, …. 语句来收集。


列级别的信息又分为基本列信息和直方图,基本列信息包括列类型、Max、Min、number of nulls, number of distinct values, max column length, average column length 等,直方图描述了数据的分布。Spark 默认没有开启直方图统计,需要额外设置参数:spark.sql.statistics.histogram.enabled = true。


原始表的信息统计相对简单,推算中间节点的统计信息相对就复杂一些,并且不同的算子会有不同的推算规则,在 Spark 中算子有很多,有兴趣的同学可以看 Spark SQL CBO 设计文档:


https://issues.apache.org/jira/secure/attachment/12823839/Spark_CBO_Design_Spec.pdf

2)中间算子的信息统计

我们这里以常见的 filter 算子为例,看看推算算子统计信息的过程。基于上一节的 SQL SELECT COUNT(t1.id) FROM t1 JOIN t2 ON t1.id = t2.id WHERE t1.age > 24 生成的语法树来看下 t1 表中包含大于运算符 filter 节点的统计信息。图片


在这里需要分三种情况考虑:


第一种


过滤条件常数值大于 max(t1.age),返回结果为 0;


第二种


过滤条件常数值小于 min(t1.age),则全部返回;


第三种


过滤条件常数介于 min(t1.age)和 max(t1.age)之间,当没有开启直方图时过滤后统计信息的公式为 after_filter = (max(t1.age) - 过滤条件常数 24)/(max(t1.age) – min(t1.age)) * before_filter,没有开启直方图则默认任务数据分布是均匀的;当开启直方图时过滤后统计信息公式为 after_filter = height(>24) / height(All) * before_filter。然后将该节点 min(t1.age)等于过滤条件常数 24。

2、成本估算

介绍完如何统计原始表的统计信息和如何计算中间算子的统计信息,有了这些信息后就可以计算每个节点的代价成本了。


在介绍如何计算节点成本之前我们先介绍一些成本参数的含义,如下:


Hr: 从 HDFS 读取 1 个字节的成本    Hw: 从 HDFS 写1 个字节的成本
NEt: 在 Spark 集群中通过网络从任何节点传输 1 个字节到 目标节点的平均成本
Tr: 数据总条数
Tsz: 数据平均大小
CPUc: CPU 成本
复制代码


计算节点成本会从 IO 和 CPU 两个维度考虑,每个算子成本的计算规则不一样,我们通过 join 算子来举例说明如何计算算子的成本:


假设 join 是 Broadcast Join,大表分布在 n 个节点上,那么 CPU 代价和 IO 代价计算公式分别如下:


CPU Cost=小表构建 Hash Table 的成本 + 大表探测的成本 = Tr(Rsmall) * CPUc + (Tr(R1) + Tr(R2) + … + Tr(Rn)) * n * CPUc


IO Cost =读取小表的成本 + 小表广播的成本 + 读取大表的成本 = Tr(Rsmall) * Tsz(Rsmall) * Hr + n * Tr(Rsmall) * Tsz(Rsmall) * NEt + (Tr(R1)* Tsz(R1) + … + Tr(Rn) * Tsz(Rn)) * Hr


但是无论哪种算子,成本计算都和参与的数据总条数、数据平均大小等因素直接相关,这也是为什么在这之前要先介绍如何统计原表信息和推算中间算子的统计信息。


每个算子根据定义的规则计算出成本,每个算子成本相加便是整个执行计划的总成本,在这里我们可以考虑一个问题,最优执行计划是列举每个执行计划一个个算出每个的总成本得出来的吗?显然不是的,如果每个执行计划都计算一次总代价,那估计黄花菜都要凉了,Spark 巧妙的使用了动态规划的思想,快速得出了最优的执行计划。

三、数栈在 Spark SQL CBO 上的探索

了解完 Spark SQL CBO 的实现原理之后,我们来思考一下第一个问题:大数据平台想要实现支持 Spark SQL CBO 优化的话,需要做些什么?


在前文实现原理中我们提到,Spark SQL CBO 的实现分为两步,第一步是统计信息收集,第二步是成本估算。而统计信息收集又分为两步:第一步的原始表信息统计、第二步中间算子的信息统计。到这里我们找到了第一个问题的答案:平台中需要先有原始表信息统计的功能。


第一个问题解决后,我们需要思考第二个问题:什么时候进行表信息统计比较合适?针对这个问题,我们初步设想了三种解决信息统计的方案:


● 在每次 SQL 查询前,先进行一次表信息统计


这种方式得到的统计信息比较准确,经过 CBO 优化后得出的执行计划也是最优的,但是信息统计的代价最大。


● 定期刷新表统计信息


每次 SQL 查询前不需要进行表信息统计,因为业务数据更新的不确定性,所以这种方式进行 SQL 查询时得到的表统计信息可能不是最新的,那么 CBO 优化后得到的执行计划有可能不是最优的。


● 在变更数据的业务方执行信息统计


这种方式对于信息统计的代价是最小的,也能保证 CBO 优化得到的执行计划是最优的,但是对于业务代码的侵入性是最大的。


不难看出三种方案各有利弊,所以进行表信息统计的具体方案取决于平台本身的架构设计。


基于数栈平台建设数仓的结构图如下图所示:



首先通过ChunJun将业务数据库数据采集到Hive ODS层
然后通过Hive或者Spark进行数据处理
最后通过ChunJun将Hive库的数据写入到业务数据库用于业务处理
复制代码


从结构图可看出数栈有用到 Hive、Spark 和 ChunJun 三个组件,并且这三个组件都会读写 Hive, 数栈多个子产品(如离线平台和实时平台)也都有可能对 Hive 进行读写,所以如果基于方案 3 来做成本是非常高的。


方案 1 本身代价就已经较大,每次查询前都进行一次信息统计,信息统计的时间是要算在本次查询耗时中的,如果表数据量比较大增加的时间可能是十几分钟甚至更久。


综合考虑,我们选用了更灵活合理的方案 2 来进行表信息统计。虽然 Spark SQL 运行时得到的统计信息可能不是最新的,但是总体相比较 RBO 来说还是有很大的性能提升。


接下来就为大家分享,数栈是如何如何统计收集原表信息统计:


我们在离线平台项目管理页面上添加了表信息统计功能,保证了每个项目可以根据项目本身情况配置不同的触发策略。触发策略可配置按天或者按小时触发,按天触发支持配置到从当天的某一时刻触发,从而避开业务高峰期。配置完毕后,到了触发的时刻离线平台就会自动以项目为单位提交一个 Spark 任务来统计项目表信息。


在数栈没有实现 CBO 支持之前,Spark SQL 的优化只能通过调整 Spark 本身的参数实现。这种调优方式很高的准入门槛,需要使用者比较熟悉 Spark 的原理。数栈 CBO 的引入大大降低了使用者的学习门槛,用户只需要在 Spark Conf 中开启


CBO-spark.sql.cbo.enabled=true


然后在对应项目中配置好表信息统计就可以做到 SQL 优化了。

四、未来展望

在 CBO 优化方面持续投入研究后,Spark SQL CBO 整体相比较 RBO 而言已经有了很大的性能提升。但这并不说明整个操作系统就没有优化的空间了,已经拿到的进步只会鼓舞我们继续进行更深层次的探索,努力往前再迈一步。


完成对 CBO 的初步支持探索后,数栈把目光看向了 Spark 3.0 版本引入的新特性——AQE(Adaptive Query Execution)。


AQE 是动态 CBO 的优化方式,是在 CBO 基础上对 SQL 优化技术又一次的性能提升。如前文所说,CBO 目前的计算对前置的原始表信息统计是仍有依赖的,而且信息统计过时的情况会给 CBO 带来不小的影响。


如果在运行时动态的优化 SQL 执行计划,就不再需要像 CBO 那样需要提前做表信息统计。数栈正在针对这一个新特性进行,相信不久的将来我们就能引入 AQE,让数栈在易用性高性能方面更上一层楼。希望小伙伴们保持关注,数栈愿和大家一起成长。

发布于: 刚刚阅读数: 4
用户头像

还未添加个人签名 2021.05.06 加入

还未添加个人简介

评论

发布
暂无评论
袋鼠云数栈基于CBO在Spark SQL优化上的探索_数栈DTinsight_InfoQ写作社区