写点什么

Spark 3.0 关键新特性回顾

用户头像
hanke
关注
发布于: 2021 年 01 月 06 日
Spark 3.0 关键新特性回顾

从 Spark 3.0 官方的Release Notes可以看到,这次大版本的升级主要是集中在性能优化和文档丰富上(如下图),其中 46%的优化都集中在 Spark SQL 上。



今天 Spark SQL 的优化不仅仅服务于 SQL 语言,还服务于机器学习、流计算和 DataFrame 等计算任务, 因此社区对于 Spark SQL 的投入非常大。对外公布的 TPC-DS 性能测试结果相较于 Spark 2.4 会有 2 倍的提升。SQL 优化里最引人注意的非Adaptive Query Execution莫属了, 还有一些其他的优化比如Dynamic Pruning Partition,通过 Aggregator 注册UDAF等等都极大的提升了 SQL 引擎的性能。

本文会着重回顾 AQE 新特性及相关关注的特性和文档监控方面的变化。其他更多的信息比如复用子查询优化,SQL Hints,ANSI SQL 兼容,SparkR 向量化读写,加速器感知 GPU 调度等等,感兴趣的同学可以参考官网notes

Adaptive Query Execution (AQE)

AQE 对于整体的 Spark SQL 的执行过程做了相应的调整和优化(如下图),它最大的亮点是可以根据已经完成的计划结点真实且精确的执行统计结果来不停的反馈并重新优化剩下的执行计划。

Spark 2.x 的 SQL 执行过程:

  • 当用户提交了 Spark SQL/Dataset/DataFrame 时,在逻辑执行计划阶段,Spark SQL 的 Parser 会用 ANLTER 将其转化为对应的语法树(Unresolved Logic Plan),接着 Analyzer 会利用 catalog 里的信息找到表和数据类型及对应的列将其转化为解析后有 schema 的 Logical Plan,然后 Optimizer 会通过一系列的优化 rule 进行算子下推(比如 filter,列剪裁),提前计算常量(比如当前时间),replace 一些操作符等等来去优化 Logical Plan。

  • 而在物理计划阶段,Spark Planner 会将各种物理计划策略作用于对应的 Logical Plan 节点上,生成多个物理计划,然后通过 CBO 选择一个最佳的作为最终的物理算子树(比如选择用 Broadcast 的算子,而不是 SortMerge 的 Join 算子), 最终将算子树的节点转化为 Spark 底层的 RDD,Transformation 和 Action 等,以支持其提交执行。

在 Spark 3.0 之前, Spark 的 Catalyst 的优化主要是通过基于逻辑计划的 rule 和物理计划里的CBO,这些优化要么基于数据库里的静态信息,要么通过预先得到统计信息,比如数值分布的直方图等来预估并判断应该使用哪种优化策略。这样的优化存在很多问题,比如数据的 meta 信息不准确或者不全,或者复杂的 filter,黑盒的 UDFs 等导致无法预估准确的数值,因此很难得到较优的优化策略。


主要功能点

此时,提出 AQE 通过真实且精确的执行统计结果进行优化就很有意义了。基于这个设计和背景,AQE 就能够比较方便解决用户在使用 Spark 中一些头疼的地方。主要体现在以下三个方面:

  • 自动调整 reducer 的数量,减小 partition 数量

Spark 任务的并行度一直是让用户比较困扰的地方。如果并行度太大的话,会导致 task 过多,overhead 比较大,整体拉慢任务的运行。而如果并行度太小的,数据分区会比较大,容易出现 OOM 的问题,并且资源也得不到合理的利用,并行运行任务优势得不到最大的发挥。而且由于 Spark Context 整个任务的并行度,需要一开始设定好且没法动态修改,这就很容易出现任务刚开始的时候数据量大需要大的并行度,而运行的过程中通过转化过滤可能最终的数据集已经变得很小,最初设定的分区数就显得过大了。

AQE 能够很好的解决这个问题,在 reducer 去读取数据时,会根据用户设定的分区数据的大小(spark.sql.adaptive.advisoryPartitionSizeInBytes)来自动调整和合并(Coalesce)小的 partition,自适应地减小 partition 的数量,以减少资源浪费和 overhead,提升任务的性能。参考示例图中可以看到从最开始的 shuffle 产生 50 个 partitions,最终合并为只有 5 个 partitions:


  • 自动解决 Join 时的数据倾斜问题

Join 里如果出现某个 key 的数据倾斜问题,那么基本上就是这个任务的性能杀手了。在 AQE 之前,用户没法自动处理 Join 中遇到的这个棘手问题,需要借助外部手动收集数据统计信息,并做额外的加盐,分批处理数据等相对繁琐的方法来应对数据倾斜问题。

而 AQE 由于可以实时拿到运行时的数据,通过 Skew Shuffle Reader 自动调整不同 key 的数据大小

(spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes) 来避免数据倾斜,从而提高性能。参考示例图可以看到 AQE 自动将 A 表里倾斜的 partition 进一步划分为 3 个小的 partitions 跟 B 表对应的 partition 进行 join,消除短板倾斜任务:


  • 优化 Join 策略

AQE 可以在 Join 的初始阶段获悉数据的输入特性,并基于此选择适合的 Join 算法从而最大化地优化性能。比如从 Cost 比较高的 SortMerge 在不超过阈值的情况下调整为代价较小的 Broadcast Join。参考示例图:


Dynamic Partition Pruning(DPP)

DPP 主要解决的是对于星型模型的查询场景中过滤条件无法下推的情况。通过 DPP 可以将小表过滤后的数据作为新的过滤条件下推到另一个大表里,从而可以做到对大表 scan 运行阶段的提前过滤掉不必要 partition 读取。这样也可以避免引入不必要的额外 ETL 过程(例如预先 ETL 生成新的过滤后的大表),在查询的过程中极大的提升查询性能,感兴趣的同学可以更进一步阅读DPP的详细信息。



通过 Aggregator 注册 UDAF

Spark 3.0 新特性通过用户定制实现的 Aggregator 来注册实现 UDAF,可以避免对每一行的数据反复进行序列化和反序列化来进行聚合,而只需在整个分区里序列化一次 ,缓解了对 CPU 的压力,提升性能。假如一个 DataFrame 有 100 万行数据共 10 个 paritions,那么旧的 UDAF 方式的序列化反序列化需要至少 100 万+10 次(合并分区里的结果)。而新的函数只需要 10 次即可,大大减少整体的序列化操作。

其中实现部分最主要的区别体现在 UDAF 的 update 函数部分:

//Old Waydef update(buf: MutableAggregationBuffer, input: Row): Unit = {  val agg = buf.getAs[AggregatorType](0)  // UDT deserializes the aggregator from 'buf'  agg.update(input)    // update the state of your aggregation  buf(0) = agg    // UDT re-serializes the aggregator back into buf}//New waydef update(agg: AggregatorType, input: Row): AggregatorType = {  agg.update(input) // update the state of your aggregator from the input  agg // return the aggregator}
复制代码

更多技术细节部分可以阅读 Aggregator 注册UDAF

文档与监控

Spark 3.0 完善和丰富了很多文档及监控信息,来辅助大家更好的进行调优和监控任务的性能动态。


Spark SQL 和 Web UI 文档

增加了Spark SQL语法SQL配置的文档页面 和相关WebUI的文档


更多的 Shuffle 指标

Spark 3.0 引入了更多可观察的指标来去观测数据的运行质量。Shuffle 是 Spark 任务里非常重要的一部分,如果能拿到更详细的阶段数据,那么对于程序的调优是很有帮助的。


新的 Structured Streaming UI

作为社区主推的 Spark 实时的模块 Structured Streaming 是在 Spark 2.0 中发布的,这次在 Spark 3.0 中正式加入了 UI 的配置。新的 UI 主要包括了两种统计信息,已完成的 Streaming 查询聚合的信息和正在进行的 Streaming 查询的当前信息, 具体包括 Input Rate、 Process Rate、Input Rows、 Batch Duration 和 Operate Duration,可以辅助用户更进一步观察任务的负载和运行能力。


支持 event logs 的滚动

Spark 3.0 提供了类似 Log4j 那样对于长时间运行的日志按照时间或者文件的大小进行切割,这样对于 streaming 长期运行的任务来说比较友好。不然 Spark 历史服务器打开一个动辄几十 GB 大小的 event log, 打开的速度可想而知。当然,对于 Spark 的 event log 不能像其他普通的应用程序日志那样,简单粗暴的进行切割,而是需要保证 Spark 的历史服务器依赖能够解析已经滚动或者压缩后的日志,并能在 Spark UI 中展示出来,方便用户进行后续的调优和排查问题操作。具体的细节可进一步阅读相关ticket


生态圈建设

扩展相关生态圏版本的升级和建设

  • 支持 Java 11

  • 支持 Hadoop 3

  • 支持 Hive 3

  • ....


Reference


更多大数据相关分享,可关注数据元素及本人微信公众号,可搜索“数据元素”或扫描下方二维码。



发布于: 2021 年 01 月 06 日阅读数: 291
用户头像

hanke

关注

凡是过往,皆为序章 2019.09.11 加入

热爱大数据技术沉淀和分享,致力于构建让数据业务产品更易用的大数据生态圈,为业务增值。

评论 (2 条评论)

发布
用户头像
应该是"Dynamic Partition Pruning",而不是"Dynamic Pruning Partition"
2021 年 02 月 26 日 15:39
回复
感谢提醒,已更正:)
2021 年 02 月 28 日 13:03
回复
没有更多了
Spark 3.0 关键新特性回顾