写点什么

大数据开发 Spark 模块之 SparkSQL

  • 2021 年 12 月 23 日
  • 本文字数:7360 字

    阅读完需:约 24 分钟

在实际的开发过程中,SQL 化已经是数据领域的共识,大家疯狂的将大数据框架的易用性做到了最高,即使一个刚刚毕业的同学,只要有 SQL 基础就可以看懂甚至上手开发了。那么我们有必要对 SparkSQL 这个模块进行一个全面的解析。


SparkSQL 的前世今生


Spark SQL 的前身是 Shark,它发布时 Hive 可以说是 SQL on Hadoop 的唯一选择(Hive 负责将 SQL 编译成可扩展的 MapReduce 作业),鉴大数据培训于 Hive 的性能以及与 Spark 的兼容,Shark 由此而生。


Shark 即 Hive on Spark,本质上是通过 Hive 的 HQL 进行解析,把 HQL 翻译成 Spark 上对应的 RDD 操作,然后通过 Hive 的 Metadata 获取数据库里表的信息,实际为 HDFS 上的数据和文件,最后有 Shark 获取并放到 Spark 上计算。但是 Shark 框架更多是对 Hive 的改造,替换了 Hive 的物理执行引擎,使之有一个较快的处理速度。


然而不容忽视的是 Shark 继承了大量的 Hive 代码,因此给优化和维护带来大量的麻烦。


为了更好的发展,Databricks 在 2014 年 7 月 1 日 Spark Summit 上宣布终止对 Shark 的开发,将重点放到 SparkSQL 模块上。Spark 官网给 SparkSQL 做了定义:


Spark SQL is Apache Spark's module for working with structured data.


由此可见,Spark SQL 是 Spark 用来处理结构化数据的一个模块。结构化数据指的是:一般指数据有固定的 Schema(约束),例如在用户表中,name 字段是 String 型,那么每一条数据的 name 字段值都可以当作 String 来使用。并且将要处理的结构化数据封装在 DataFrame 中,在最开始的版本 1.0 中,其中 DataFrame = RDD + Schema 信息。


SparkSQL 在 1.6 时代,增加了一个新的 API 叫做 Dataset,Dataset 统一和结合了 SQL 的访问和命令式 API 的使用,这是一个划时代的进步。在 Dataset 中可以轻易的做到使用 SQL 查询并且筛选数据,然后使用命令式 API 进行探索式分析。


Spark 2.x 发布时,将 Dataset 和 DataFrame 统一为一套 API,以 Dataset 数据结构为主,其中 DataFrame = Dataset[Row]。Spark 3.x 时代,Spark 的开发者似乎对 SparkSQL 情有独钟,发布了大量的针对 SQL 的优化。我们在下文中会提到。


Spark SQL 运行原理


在 SparkSQL 中有两种数据抽象。


DataFrameDataFrame 是一种以 RDD 为基础的带有 Schema 元信息的分布式数据集,类似于传统数据库的二维表格。除了数据以外,还记录数据的结构信息,即 schema。同时,与 Hive 类似,DataFrame 也支持嵌套数据类型(struct、array 和 map)。从 API 易用性的角度上看,DataFrame API 提供的是一套高层的关系操作,比函数式的 RDD API 要更加友好,门槛更低。


上图直观地体现了 DataFrame 和 RDD 的区别。左侧的 RDD[Person]虽然以 Person 为类型参数,但 Spark 框架本身不了解 Person 类的内部结构。而右侧的 DataFrame 却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。


DataFrame 多了数据的结构信息,即 schema。RDD 是分布式的 Java 对象的集合。DataFrame 是分布式的 Row 对象的集合。DataFrame 除了提供了比 RDD 更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如 filter 下推、裁剪等。


DataFrame 为数据提供了 Schema 的视图。可以把它当做数据库中的一张表来对待,DataFrame 也是懒执行的。性能上比 RDD 要高,主要原因:优化的执行计划:查询计划通过 Spark catalyst optimiser 进行优化。


SparkSQL 的解析过程我们直接应用《图解 Spark 核心技术与案例实战》这本书中的内容,大概分为四个步骤:


  • 词法和语法解析 Parse:生成逻辑计划

  • 绑定 Bind:生成可执行计划

  • 优化 Optimize:生成最优执行计划

  • 执行 Execute:返回实际数据


SparkSQL 对 SQL 语句的处理和关系型数据库采用了类似的方法, SparkSQL 会先将 SQL 语句进行解析 Parse 形成一个 Tree,然后使用 Rule 对 Tree 进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。


而 SparkSQL 的查询优化器是 Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst 是 SparkSQL 最核心的部分,其性能优劣将决定整体的性能。SparkSQL 由 4 个部分构成:


  • Core:负责处理数据的输入/输出,从不同的数据源获取数据(如 RDD、Parquet 文件),然后将查询结果输出成 DataFrame

  • Catalyst:负责处理查询语句的整个过程,包括解析、绑定、优化、物理计划等

  • Hive:负责对 Hive 数据的处理

  • Hive-thriftserver:提供 CLI 和 JDBC/ODBC 接口等


TreeTree 是 Catalyst 执行计划表示的数据结构。LogicalPlans,Expressions 和 Pysical Operators 都可以使用 Tree 来表示。Tree 具备一些 Scala Collection 的操作能力和树遍历能力。Tree 提供三种特质:


  • UnaryNode:一元节点,即只有一个子节点

  • BinaryNode:二元节点,即有左右子节点的二叉节点

  • LeafNode:叶子节点,没有子节点的节点


针对不同的节点,Tree 提供了不同的操作方法。对 Tree 的遍历,主要是通过迭代将 Rule 应用到该节点以及子节点。Tree 有两个子类继承体系,即 QueryPlan 和 Expression。


  • QueryPlan 下面的两个子类分别是 LogicalPlan(逻辑执行计划)和 SparkPlan(物理执行计划)。QueryPlan 内部带有 output:Seq[Attribute]、transformExpressionDown 和 transformExpressionUp 等方法,它的主要子体系是 LogicalPlan,即逻辑执行计划表示,它在 Catalyst 优化器里有详细实现。LogicalPlan 内部带一个 reference:Set[Attribute],主要方法为 resolve(name:String): Option[NamedExpression],用于分析生成对应的 NamedExpression。对于 SparkPlan,即物理执行计划表示,需要用户在系统中自己实现。LogicalPlan 本身也有很多具体子类,也分为 UnaryNode,BinaryNode 和 LeafNode 三类。

  • Expression 是表达式体系,是指不需要执行引擎计算,而可以直接计算或处理的节点,包括 Cast 操作、Porjection 操作、四则运算和逻辑操作符运算等等。


RuleRule[TreeType <: TreeNode[__]]是一个抽象类,子类需要复写 apply(plan: TreeType)方法来指定处理逻辑。对 于 Rule 的具体实现是通过 RuleExecutor 完成的,凡是需要处理执行计划树进行实施规则匹配和节点处理的,都需要继承 RuleExecutor[TreeType]抽象类。


在 RuleExecutor 的实现子类(如 Analyzer 和 Optimizer)中会定义 Batch,Once 和 FixedPoint。其中每个 Batch 代表着一套规则,这样可以简便地、模块化地对 Tree 进行 Transform 操作。Once 和 FixedPoint 是配备策略。RuleExecutor 内部有一个 Seq[Batch]属性,定义的是该 RuleExecutor 的处理逻辑,具体的处理逻辑由具体的 Rule 子类实现。


RuleExecutor 中的 apply 方法会按照 Batch 顺序和 Batch 内的 Rules 顺序,对传入的节点进行迭代操作。在 Analyzer 过程中处理由解析器(SqlParser)生成的未绑定逻辑计划 Tree 时,就定义了多种 Rules 应用到该 Unresolved 逻辑计划 Tree 上。


Analyzer 过程中使用了自身定义的多个 Batch,如 MultiInstanceRelations,Resolution,CheckAnalysis 和 AnalysisOperators:每个 Batch 又由不同的 Rules 构成,每个 Rule 又有自己相对应的处理函数。注意,不同 Rule 的使用次数不同(Once FixedPoint)。整个 Spark SQL 运行流程如下:


  1. 将 SQL 语句通过词法和语法解析生成未绑定的逻辑执行计划(Unresolved LogicalPlan),包含 Unresolved Relation、Unresolved Function 和 Unresolved Attribute,然后在后续步骤中使用不同的 Rule 应用到该逻辑计划上。

  2. Analyzer 使用 Analysis Rules,配合元数据(如 SessionCatalog 或是 Hive Metastore 等)完善未绑定的逻辑计划的属性而转换成绑定的逻辑计划。具体流程是县实例化一个 Simple Analyzer,然后遍历预定义好的 Batch,通过父类 Rule Executor 的执行方法运行 Batch 里的 Rules,每个 Rule 会对未绑定的逻辑计划进行处理,有些可以通过一次解析处理,有些需要多次迭代,迭代直到达到 FixedPoint 次数或前后两次的树结构没变化才停止操作。

  3. Optimizer 使用 Optimization Rules,将绑定的逻辑计划进行合并、列裁剪和过滤器下推等优化工作后生成优化的逻辑计划。

  4. Planner 使用 Planning Strategies,对优化的逻辑计划进行转换(Transform)生成可以执行的物理计划。根据过去的性能统计数据,选择最佳的物理执行计划 CostModel,最后生成可以执行的物理执行计划树,得到 SparkPlan。

  5. 在最终真正执行物理执行计划之前,还要进行 preparations 规则处理,最后调用 SparkPlan 的 execute 执行计算 RDD。


Spark SQL 优化


在 Spark3.0 之前,我们经常做的优化包括:代码层面的优化


使用 reduceByKey/aggregateByKey 替代 groupByKey。


使用 mapPartitions 替代普通 map。


mapPartitions 类的算子,一次函数调用会处理一个 partition 所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用 mapPartitions 会出现 OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个 partition 所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现 OOM 异常。所以使用这类操作时要慎重!


使用 foreachPartitions 替代 foreach。


原理类似于“使用 mapPartitions 替代 map”,也是一次函数调用处理一个 partition 的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions 类的算子,对性能的提升还是很有帮助的。比如在 foreach 函数中,将 RDD 中所有数据写 MySQL,那么如果是普通的 foreach 算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用 foreachPartitions 算子一次性处理一个 partition 的数据,那么对于每个 partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于 1 万条左右的数据量写 MySQL,性能可以提升 30%以上。


使用 filter 之后进行 coalesce 操作。


通常对一个 RDD 执行 filter 算子过滤掉 RDD 中较多数据后(比如 30%以上的数据),建议使用 coalesce 算子,手动减少 RDD 的 partition 数量,将 RDD 中的数据压缩到更少的 partition 中去。因为 filter 之后,RDD 的每个 partition 中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个 task 处理的 partition 中的数据量并不是很多,有一点资源浪费,而且此时处理的 task 越多,可能速度反而越慢。因此用 coalesce 减少 partition 数量,将 RDD 中的数据压缩到更少的 partition 之后,只要使用更少的 task 即可处理完所有的 partition。在某些场景下,对于性能的提升会有一定的帮助。


使用 repartitionAndSortWithinPartitions 替代 repartition 与 sort 类操作。


repartitionAndSortWithinPartitions 是 Spark 官网推荐的一个算子。官方建议,如果是需要在 repartition 重分区之后还要进行排序,就可以直接使用 repartitionAndSortWithinPartitions 算子。因为该算子可以一边进行重分区的 shuffle 操作,一边进行排序。shuffle 与 sort 两个操作同时进行,比先 shuffle 再 sort 来说,性能可能是要高的。


尽量减少 shuffle 相关操作,减少 join 操作。


写入数据库时,设置批量插入,关闭事务


result.write.mode(SaveMode.Append).format("jdbc")

.option(JDBCOptions.JDBC_URL,"jdbc:mysql://127.0.0.1:3306/db?rewriteBatchedStatement=true") //开启批量处理

.option("user","root")

.option("password","XXX")

.option(JDBCOptions.JDBC_TABLE_NAME,"xxx")

.option(JDBCOptions.JDBC_TXN_ISOLATION_LEVEL,"NONE") //不开启事务

.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE,10000) //设置批量插入数据量

.save()


缓存复用数据如在代码下方反复用到了 Result 数据,可以考虑将此数据缓存下来。


val Result = spark.sql(

"""

|SELECT * from A

|UNION

|SELECT * FROM B

""".stripMargin)

Result.persist(StorageLevel.DISK_ONLY_2)

Result.registerTempTable("Result")


参数优化注意:具体参数设置需要具体问题具体分析,并不是越大越好,需反复测试寻找最优值。另外不同 Spark 版本的参数可能有过期,请注意区分。


   //CBO优化sparkConf.set("spark.sql.cbo.enabled","true")sparkConf.set("spark.sql.cbo.joinReorder.enabled","true")sparkConf.set("spark.sql.statistics.histogram.enabled","true")//自适应查询优化(2.4版本之后)sparkConf.set("spark.sql.adaptive.enabled","true")//开启consolidateFilessparkConf.set("spark.shuffle.consolidateFiles","true")//设置并行度sparkConf.set("spark.default.parallelism","150")//设置数据本地化等待时间sparkConf.set("spark.locality.wait","6s")//设置mapTask写磁盘缓存sparkConf.set("spark.shuffle.file.buffer","64k")//设置byPass机制的触发值sparkConf.set("spark.shuffle.sort.bypassMergeThreshold","1000")//设置resultTask拉取缓存sparkConf.set("spark.reducer.maxSizeInFlight","48m")//设置重试次数sparkConf.set("spark.shuffle.io.maxRetries","10")//设置重试时间间隔sparkConf.set("spark.shuffle.io.retryWait","10s")//设置reduce端聚合内存比例sparkConf.set("spark.shuffle.memoryFraction","0.5")//设置序列化sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//设置自动分区sparkConf.set("spark.sql.auto.repartition","true")//设置shuffle过程中分区数sparkConf.set("spark.sql.shuffle.partitions","500")//设置自动选择压缩码sparkConf.set("spark.sql.inMemoryColumnarStorage.compressed","true")//关闭自动推测分区字段类型sparkConf.set("spark.sql.source.partitionColumnTypeInference.enabled","false")//设置spark自动管理内存sparkConf.set("spark.sql.tungsten.enabled","true")//执行sort溢写到磁盘sparkConf.set("spark.sql.planner.externalSort","true")//增加executor通信超时时间sparkConf.set("spark.executor.heartbeatInterval","60s")//cache限制时间sparkConf.set("spark.dynamicAllocation.cachedExecutorIdleTimeout","120")//设置广播变量sparkConf.set("spark.sql.autoBroadcastJoinThreshold","104857600")//其他设置sparkConf.set("spark.sql.files.maxPartitionBytes","268435456")sparkConf.set("spark.sql.files.openCostInBytes","8388608")sparkConf.set("spark.debug.maxToStringFields","500")//推测执行机制sparkConf.set("spark.speculation","true")sparkConf.set("spark.speculation.interval","500")sparkConf.set("spark.speculation.quantile","0.8")sparkConf.set("spark.speculation.multiplier","1.5")
复制代码


Spark3.0 YYDS


Apache Spark 3.0 增加了很多令人兴奋的新特性,包括动态分区修剪(Dynamic Partition Pruning)、自适应查询执行(Adaptive Query Execution)、加速器感知调度(Accelerator-aware Scheduling)、支持 Catalog 的数据源 API(Data Source API with Catalog Supports)、SparkR 中的向量化(Vectorization in SparkR)、支持 Hadoop 3/JDK 11/Scala 2.12 等等。这个版本一共解决了 3400 多个 ISSUES。


Spark3.0 中对 SparkSQL 进行了重大更新,可以看出 Spark 社区对待 SparkSQL 的态度。


动态分区修剪(Dynamic Partition Pruning)


在 Spark 2.x 里面加了基于代价的优化,但是这个并不表现的很好。主要有以下几个原因:统计信息的缺失;统计信息过期;很难抽象出一个通用的 cost model。


为了解决这些问题,Apache Spark 3.0 引入了基于 Runtime 的查询优化。



比如上面的 SQL 查询,假设 t2 表 t2.id < 2 过滤出来的数据比较少,但是由于之前版本的 Spark 无法进行动态计算代价,所以可能会导致 t1 表扫描出大量无效的数据。有了动态分区裁减,可以在运行的时候过滤掉 t1 表无用的数据。



经过这个优化,查询扫描的数据大大减少,性能提升了 30+ 倍。



自适应查询执行(Adaptive Query Execution)!


而有了 AQE(自适应查询执行) 之后,Spark 就可以动态统计相关信息,并动态调整执行计划,比如把 SortMergeJoin 变成 BroadcastHashJoin:



spark.sql.optimizer.dynamicPartitionPruning.enabled 参数必须设置为 true。


映射下推(Project PushDown)


说到列式存储的优势,映射下推是最突出的,它意味着在获取表中原始数据时只需要扫描查询中需要的列,由于每一列的所有值都是连续存储的,所以分区取出每一列的所有值就可以实现 TableScan 算子,而避免扫描整个表文件内容。


在 Parquet 中原生就支持映射下推,执行查询的时候可以通过 Configuration 传递需要读取的列的信息,这些列必须是 Schema 的子集,映射每次会扫描一个 Row Group 的数据,然后一次性得将该 Row Group 里所有需要的列的 Cloumn Chunk 都读取到内存中,每次读取一个 Row Group 的数据能够大大降低随机读的次数,除此之外,Parquet 在读取的时候会考虑列是否连续,如果某些需要的列是存储位置是连续的,那么一次读操作就可以把多个列的数据读取到内存。


谓词下推(Predicate PushDown)


在数据库之类的查询系统中最常用的优化手段就是谓词下推了,通过将一些过滤条件尽可能的在最底层执行可以减少每一层交互的数据量,从而提升性能,


例如”select count(1) from A Join B on A.id = B.id where A.a > 10 and B.b < 100”SQL 查询中,在处理 Join 操作之前需要首先对 A 和 B 执行 TableScan 操作,然后再进行 Join,再执行过滤,最后计算聚合函数返回,但是如果把过滤条件 A.a > 10 和 B.b < 100 分别移到 A 表的 TableScan 和 B 表的 TableScan 的时候执行,可以大大降低 Join 操作的输入数据。


无论是行式存储还是列式存储,都可以在将过滤条件在读取一条记录之后执行以判断该记录是否需要返回给调用者,在 Parquet 做了更进一步的优化,优化的方法时对每一个 Row Group 的每一个 Column Chunk 在存储的时候都计算对应的统计信息,包括该 Column Chunk 的最大值、最小值和空值个数。通过这些统计值和该列的过滤条件可以判断该 Row Group 是否需要扫描。另外 Parquet 还增加诸如 Bloom Filter 和 Index 等优化数据,更加有效的完成谓词下推。


在使用 Parquet 的时候可以通过如下两种策略提升查询性能:


  • 类似于关系数据库的主键,对需要频繁过滤的列设置为有序的,这样在导入数据的时候会根据该列的顺序存储数据,这样可以最大化的利用最大值、最小值实现谓词下推。

  • 减小行组大小和页大小,这样增加跳过整个行组的可能性,但是此时需要权衡由于压缩和编码效率下降带来的 I/O 负载。


原创作者:王知无


用户头像

关注尚硅谷,轻松学IT 2021.11.23 加入

还未添加个人简介

评论

发布
暂无评论
大数据开发 Spark 模块之SparkSQL