写点什么

Spark 性能调优 -Shuffle 调优及故障排除篇

发布于: 2021 年 03 月 27 日
Spark性能调优-Shuffle调优及故障排除篇

Spark 调优之 Shuffle 调优

本节开始先讲解 Shuffle 核心概念;然后针对 HashShuffleSortShuffle 进行调优;接下来对 map 端reduce 端调优;再针对 Spark 中的数据倾斜问题进行剖析及调优;最后是 Spark 运行过程中的故障排除

本文首发于公众号【五分钟学大数据】,本公号专注于大数据技术,分享高质量大数据原创技术文章。

一、Shuffle 的核心概念

1. ShuffleMapStage 与 ResultStage

ShuffleMapStage 与 ResultStage

在划分 stage 时,最后一个 stage 称为 FinalStage,它本质上是一个 ResultStage 对象,前面的所有 stage 被称为 ShuffleMapStage

ShuffleMapStage 的结束伴随着 shuffle 文件的写磁盘。

ResultStage 基本上对应代码中的 action 算子,即将一个函数应用在 RDD 的各个 partition 的数据集上,意味着一个 job 的运行结束。

2. Shuffle 中的任务个数

我们知道,Spark Shuffle 分为 map 阶段和 reduce 阶段,或者称之为 ShuffleRead 阶段和 ShuffleWrite 阶段,那么对于一次 Shuffle,map 过程和 reduce 过程都会由若干个 task 来执行,那么 map task 和 reduce task 的数量是如何确定的呢?

假设 Spark 任务从 HDFS 中读取数据,那么初始 RDD 分区个数由该文件的 split 个数决定,也就是一个 split 对应生成的 RDD 的一个 partition,我们假设初始 partition 个数为 N。

初始 RDD 经过一系列算子计算后(假设没有执行 repartition 和 coalesce 算子进行重分区,则分区个数不变,仍为 N,如果经过重分区算子,那么分区个数变为 M),我们假设分区个数不变,当执行到 Shuffle 操作时,map 端的 task 个数和 partition 个数一致,即 map task 为 N 个

reduce 端的 stage 默认取spark.default.parallelism这个配置项的值作为分区数,如果没有配置,则以 map 端的最后一个 RDD 的分区数作为其分区数(也就是 N),那么分区数就决定了 reduce 端的 task 的个数。

3. reduce 端数据的读取

根据 stage 的划分我们知道,map 端 task 和 reduce 端 task 不在相同的 stage 中,map task 位于 ShuffleMapStagereduce task 位于 ResultStage,map task 会先执行,那么后执行的 reduce task 如何知道从哪里去拉取 map task 落盘后的数据呢?

reduce 端的数据拉取过程如下

  1. map task 执行完毕后会将计算状态以及磁盘小文件位置等信息封装到 MapStatus 对象中,然后由本进程中的 MapOutPutTrackerWorker 对象将 mapStatus 对象发送给 Driver 进程的 MapOutPutTrackerMaster 对象;

  2. 在 reduce task 开始执行之前会先让本进程中的 MapOutputTrackerWorker 向 Driver 进程中的 MapoutPutTrakcerMaster 发动请求,请求磁盘小文件位置信息;

  3. 当所有的 Map task 执行完毕后,Driver 进程中的 MapOutPutTrackerMaster 就掌握了所有的磁盘小文件的位置信息。此时 MapOutPutTrackerMaster 会告诉 MapOutPutTrackerWorker 磁盘小文件的位置信息;

  4. 完成之前的操作之后,由 BlockTransforService 去 Executor0 所在的节点拉数据,默认会启动五个子线程。每次拉取的数据量不能超过 48M(reduce task 每次最多拉取 48M 数据,将拉来的数据存储到 Executor 内存的 20%内存中)。

二、HashShuffle 解析

以下的讨论都假设每个 Executor 有 1 个 cpu core。

1. 未经优化的 HashShuffleManager

shuffle write 阶段,主要就是在一个 stage 结束计算之后,为了下一个 stage 可以执行 shuffle 类的算子(比如 reduceByKey),而将每个 task 处理的数据按 key 进行“划分”。所谓“划分”,就是对相同的 key 执行 hash 算法,从而将相同 key 都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游 stage 的一个 task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去

下一个 stage 的 task 有多少个,当前 stage 的每个 task 就要创建多少份磁盘文件。比如下一个 stage 总共有 100 个 task,那么当前 stage 的每个 task 都要创建 100 份磁盘文件。如果当前 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,那么每个 Executor 上总共就要创建 500 个磁盘文件,所有 Executor 上会创建 5000 个磁盘文件。由此可见,未经优化的 shuffle write 操作所产生的磁盘文件的数量是极其惊人的

shuffle read 阶段,通常就是一个 stage 刚开始时要做的事情。此时该 stage 的每一个 task 就需要将上一个 stage 的计算结果中的所有相同 key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行 key 的聚合或连接等操作。由于 shuffle write 的过程中,map task 给下游 stage 的每个 reduce task 都创建了一个磁盘文件,因此 shuffle read 的过程中,每个 reduce task 只要从上游 stage 的所有 map task 所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read 的拉取过程是一边拉取一边进行聚合的。每个 shuffle read task 都会有一个自己的 buffer 缓冲,每次都只能拉取与 buffer 缓冲相同大小的数据,然后通过内存中的一个 Map 进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到 buffer 缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

未优化的 HashShuffleManager 工作原理如下图所示:

未优化的 HashShuffleManager 工作原理

2. 优化后的 HashShuffleManager

为了优化 HashShuffleManager 我们可以设置一个参数:spark.shuffle.consolidateFiles,该参数默认值为 false,将其设置为 true 即可开启优化机制,通常来说,如果我们使用 HashShuffleManager,那么都建议开启这个选项

开启 consolidate 机制之后,在 shuffle write 过程中,task 就不是为下游 stage 的每个 task 创建一个磁盘文件了,此时会出现 shuffleFileGroup 的概念,每个 shuffleFileGroup 会对应一批磁盘文件,磁盘文件的数量与下游 stage 的 task 数量是相同的。一个 Executor 上有多少个 cpu core,就可以并行执行多少个 task。而第一批并行执行的每个 task 都会创建一个 shuffleFileGroup,并将数据写入对应的磁盘文件内

当 Executor 的 cpu core 执行完一批 task,接着执行下一批 task 时,下一批 task 就会复用之前已有的 shuffleFileGroup,包括其中的磁盘文件,也就是说,此时 task 会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate 机制允许不同的 task 复用同一批磁盘文件,这样就可以有效将多个 task 的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升 shuffle write 的性能

假设第二个 stage 有 100 个 task,第一个 stage 有 50 个 task,总共还是有 10 个 Executor(Executor CPU 个数为 1),每个 Executor 执行 5 个 task。那么原本使用未经优化的 HashShuffleManager 时,每个 Executor 会产生 500 个磁盘文件,所有 Executor 会产生 5000 个磁盘文件的。但是此时经过优化之后,每个 Executor 创建的磁盘文件的数量的计算公式为:cpu core的数量 * 下一个stage的task数量,也就是说,每个 Executor 此时只会创建 100 个磁盘文件,所有 Executor 只会创建 1000 个磁盘文件。

优化后的 HashShuffleManager 工作原理如下图所示:

优化后的 HashShuffleManager 工作原理

三、 SortShuffle 解析

SortShuffleManager 的运行机制主要分成两种,一种是普通运行机制,另一种是 bypass 运行机制。当 shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为 200),就会启用 bypass 机制。

1. 普通运行机制

在该模式下,数据会先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存如果是 join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件是通过 Java 的 BufferedOutputStream 实现的。BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能

一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是 merge 过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。

SortShuffleManager 由于有一个磁盘文件 merge 的过程,因此大大减少了文件数量。比如第一个 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,而第二个 stage 有 100 个 task。由于每个 task 最终只有一个磁盘文件,因此此时每个 Executor 上只有 5 个磁盘文件,所有 Executor 只有 50 个磁盘文件。

普通运行机制的 SortShuffleManager 工作原理如下图所示:

普通运行机制的 SortShuffleManager 工作原理

2. bypass 运行机制

bypass 运行机制的触发条件如下:

  • shuffle map task 数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。

  • 不是聚合类的 shuffle 算子。

此时,每个 task 会为每个下游 task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。

而该机制与普通 SortShuffleManager 运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

bypass 运行机制的 SortShuffleManager 工作原理如下图所示:

bypass 运行机制的 SortShuffleManager 工作原理

四、map 和 reduce 端缓冲区大小

在 Spark 任务运行过程中,如果 shuffle 的 map 端处理的数据量比较大,但是 map 端缓冲的大小是固定的,可能会出现 map 端缓冲数据频繁 spill 溢写到磁盘文件中的情况,使得性能非常低下,通过调节 map 端缓冲的大小,可以避免频繁的磁盘 IO 操作,进而提升 Spark 任务的整体性能

map 端缓冲的默认配置是 32KB,如果每个 task 处理 640KB 的数据,那么会发生 640/32 = 20 次溢写,如果每个 task 处理 64000KB 的数据,即会发生 64000/32=2000 次溢写,这对于性能的影响是非常严重的。

map 端缓冲的配置方法:

val conf = new SparkConf()  .set("spark.shuffle.file.buffer""64")

Spark Shuffle 过程中,shuffle reduce task 的 buffer 缓冲区大小决定了 reduce task 每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能

reduce 端数据拉取缓冲区的大小可以通过spark.reducer.maxSizeInFlight参数进行设置,默认为 48MB。该参数的设置方法如下:

reduce 端数据拉取缓冲区配置:

val conf = new SparkConf()  .set("spark.reducer.maxSizeInFlight""96")

五、reduce 端重试次数和等待时间间隔

Spark Shuffle 过程中,reduce task 拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的 shuffle 操作的作业,建议增加重试最大次数(比如 60 次),以避免由于 JVM 的 full gc 或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的 shuffle 过程,调节该参数可以大幅度提升稳定性

reduce 端拉取数据重试次数可以通过spark.shuffle.io.maxRetries参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为 3,该参数的设置方法如下:

reduce 端拉取数据重试次数配置:

val conf = new SparkConf()  .set("spark.shuffle.io.maxRetries""6")

Spark Shuffle 过程中,reduce task 拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如 60s),以增加 shuffle 操作的稳定性

reduce 端拉取数据等待间隔可以通过spark.shuffle.io.retryWait参数进行设置,默认值为 5s,该参数的设置方法如下:

reduce 端拉取数据等待间隔配置:

val conf = new SparkConf()  .set("spark.shuffle.io.retryWait""60s")

六、bypass 机制开启阈值

对于 SortShuffleManager,如果 shuffle reduce task 的数量小于某一阈值则 shuffle write 过程中不会进行排序操作,而是直接按照未经优化的 HashShuffleManager 的方式去写数据,但是最后会将每个 task 产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

当你使用 SortShuffleManager 时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于 shuffle read task 的数量,那么此时 map-side 就不会进行排序了,减少了排序的性能开销,但是这种方式下,依然会产生大量的磁盘文件,因此 shuffle write 性能有待提高

SortShuffleManager 排序操作阈值的设置可以通过spark.shuffle.sort.bypassMergeThreshold这一参数进行设置,默认值为 200,该参数的设置方法如下:

reduce 端拉取数据等待间隔配置:

val conf = new SparkConf()  .set("spark.shuffle.sort.bypassMergeThreshold""400")

数据倾斜

就是数据分到各个区的数量不太均匀,可以自定义分区器,想怎么分就怎么分。

Spark 中的数据倾斜问题主要指 shuffle 过程中出现的数据倾斜问题,是由于不同的 key 对应的数据量不同导致的不同 task 所处理的数据量不同的问题

例如,reduced 端一共要处理 100 万条数据,第一个和第二个 task 分别被分配到了 1 万条数据,计算 5 分钟内完成,第三个 task 分配到了 98 万数据,此时第三个 task 可能需要 10 个小时完成,这使得整个 Spark 作业需要 10 个小时才能运行完成,这就是数据倾斜所带来的后果。

注意,要区分开数据倾斜数据过量这两种情况,数据倾斜是指少数 task 被分配了绝大多数的数据,因此少数 task 运行缓慢;数据过量是指所有 task 被分配的数据量都很大,相差不多,所有 task 都运行缓慢。

数据倾斜的表现:

  1. Spark 作业的大部分 task 都执行迅速,只有有限的几个 task 执行的非常慢,此时可能出现了数据倾斜,作业可以运行,但是运行得非常慢;

  2. Spark 作业的大部分 task 都执行迅速,但是有的 task 在运行过程中会突然报出 OOM,反复执行几次都在某一个 task 报出 OOM 错误,此时可能出现了数据倾斜,作业无法正常运行。定位数据倾斜问题:

  3. 查阅代码中的 shuffle 算子,例如 reduceByKey、countByKey、groupByKey、join 等算子,根据代码逻辑判断此处是否会出现数据倾斜;

  4. 查看 Spark 作业的 log 文件,log 文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个 stage,对应的 shuffle 算子是哪一个;

1. 预聚合原始数据

1. 避免 shuffle 过程

绝大多数情况下,Spark 作业的数据来源都是 Hive 表,这些 Hive 表基本都是经过 ETL 之后的昨天的数据。为了避免数据倾斜,我们可以考虑避免 shuffle 过程,如果避免了 shuffle 过程,那么从根本上就消除了发生数据倾斜问题的可能。

如果 Spark 作业的数据来源于 Hive 表,那么可以先在 Hive 表中对数据进行聚合,例如按照 key 进行分组,将同一 key 对应的所有 value 用一种特殊的格式拼接到一个字符串里去,这样,一个 key 就只有一条数据了;之后,对一个 key 的所有 value 进行处理时,只需要进行 map 操作即可,无需再进行任何的 shuffle 操作。通过上述方式就避免了执行 shuffle 操作,也就不可能会发生任何的数据倾斜问题。

对于 Hive 表中数据的操作,不一定是拼接成一个字符串,也可以是直接对 key 的每一条数据进行累计计算。要区分开,处理的数据量大和数据倾斜的区别

2. 增大 key 粒度(减小数据倾斜可能性,增大每个 task 的数据量)

如果没有办法对每个 key 聚合出来一条数据,在特定场景下,可以考虑扩大 key 的聚合粒度。

例如,目前有 10 万条用户数据,当前 key 的粒度是(省,城市,区,日期),现在我们考虑扩大粒度,将 key 的粒度扩大为(省,城市,日期),这样的话,key 的数量会减少,key 之间的数据量差异也有可能会减少,由此可以减轻数据倾斜的现象和问题。(此方法只针对特定类型的数据有效,当应用场景不适宜时,会加重数据倾斜)

2. 预处理导致倾斜的 key

1. 过滤

如果在 Spark 作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的 key 进行过滤,滤除可能导致数据倾斜的 key 对应的数据,这样,在 Spark 作业中就不会发生数据倾斜了。

2. 使用随机 key

当使用了类似于 groupByKey、reduceByKey 这样的算子时,可以考虑使用随机 key 实现双重聚合,如下图所示:

随机 key 实现双重聚合

首先,通过 map 算子给每个数据的 key 添加随机数前缀,对 key 进行打散,将原先一样的 key 变成不一样的 key,然后进行第一次聚合,这样就可以让原本被一个 task 处理的数据分散到多个 task 上去做局部聚合;随后,去除掉每个 key 的前缀,再次进行聚合。

此方法对于由 groupByKey、reduceByKey 这类算子造成的数据倾斜有比较好的效果,仅仅适用于聚合类的 shuffle 操作,适用范围相对较窄。如果是 join 类的 shuffle 操作,还得用其他的解决方案

此方法也是前几种方案没有比较好的效果时要尝试的解决方案。

3. sample 采样对倾斜 key 单独进行 join

在 Spark 中,如果某个 RDD 只有一个 key,那么在 shuffle 过程中会默认将此 key 对应的数据打散,由不同的 reduce 端 task 进行处理

所以当由单个 key 导致数据倾斜时,可有将发生数据倾斜的 key 单独提取出来,组成一个 RDD,然后用这个原本会导致倾斜的 key 组成的 RDD 和其他 RDD 单独 join,此时,根据 Spark 的运行机制,此 RDD 中的数据会在 shuffle 阶段被分散到多个 task 中去进行 join 操作。

倾斜 key 单独 join 的流程如下图所示:

倾斜 key 单独 join 流程

适用场景分析:

对于 RDD 中的数据,可以将其转换为一个中间表,或者是直接使用 countByKey()的方式,看一下这个 RDD 中各个 key 对应的数据量,此时如果你发现整个 RDD 就一个 key 的数据量特别多,那么就可以考虑使用这种方法。

当数据量非常大时,可以考虑使用 sample 采样获取 10%的数据,然后分析这 10%的数据中哪个 key 可能会导致数据倾斜,然后将这个 key 对应的数据单独提取出来。

不适用场景分析:

如果一个 RDD 中导致数据倾斜的 key 很多,那么此方案不适用。

3. 提高 reduce 并行度

当方案一和方案二对于数据倾斜的处理没有很好的效果时,可以考虑提高 shuffle 过程中的 reduce 端并行度,reduce 端并行度的提高就增加了 reduce 端 task 的数量,那么每个 task 分配到的数据量就会相应减少,由此缓解数据倾斜问题。

1. reduce 端并行度的设置

在大部分的 shuffle 算子中,都可以传入一个并行度的设置参数,比如 reduceByKey(500),这个参数会决定 shuffle 过程中 reduce 端的并行度,在进行 shuffle 操作的时候,就会对应着创建指定数量的 reduce task。对于 Spark SQL 中的 shuffle 类语句,比如 group by、join 等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了 shuffle read task 的并行度,该值默认是 200,对于很多场景来说都有点过小。

增加 shuffle read task 的数量,可以让原本分配给一个 task 的多个 key 分配给多个 task,从而让每个 task 处理比原来更少的数据。

举例来说,如果原本有 5 个 key,每个 key 对应 10 条数据,这 5 个 key 都是分配给一个 task 的,那么这个 task 就要处理 50 条数据。而增加了 shuffle read task 以后,每个 task 就分配到一个 key,即每个 task 就处理 10 条数据,那么自然每个 task 的执行时间都会变短了。

2. reduce 端并行度设置存在的缺陷

提高 reduce 端并行度并没有从根本上改变数据倾斜的本质和问题(方案一和方案二从根本上避免了数据倾斜的发生),只是尽可能地去缓解和减轻 shuffle reduce task 的数据压力,以及数据倾斜的问题,适用于有较多 key 对应的数据量都比较大的情况。

该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个 key 对应的数据量有 100 万,那么无论你的 task 数量增加到多少,这个对应着 100 万数据的 key 肯定还是会分配到一个 task 中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的一种手段,尝试去用最简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。

在理想情况下,reduce 端并行度提升后,会在一定程度上减轻数据倾斜的问题,甚至基本消除数据倾斜;但是,在一些情况下,只会让原来由于数据倾斜而运行缓慢的 task 运行速度稍有提升,或者避免了某些 task 的 OOM 问题,但是,仍然运行缓慢,此时,要及时放弃方案三,开始尝试后面的方案。

4. 使用 map join

正常情况下,join 操作都会执行 shuffle 过程,并且执行的是 reduce join,也就是先将所有相同的 key 和对应的 value 汇聚到一个 reduce task 中,然后再进行 join。普通 join 的过程如下图所示:

普通 join 过程

普通的 join 是会走 shuffle 过程的,而一旦 shuffle,就相当于会将相同 key 的数据拉取到一个 shuffle read task 中再进行 join,此时就是 reduce join。但是如果一个 RDD 是比较小的,则可以采用广播小 RDD 全量数据+map 算子来实现与 join 同样的效果,也就是 map join,此时就不会发生 shuffle 操作,也就不会发生数据倾斜。

注意:RDD 是并不能直接进行广播的,只能将 RDD 内部的数据通过 collect 拉取到 Driver 内存然后再进行广播

1. 核心思路:

不使用 join 算子进行连接操作,而使用 broadcast 变量与 map 类算子实现 join 操作,进而完全规避掉 shuffle 类的操作,彻底避免数据倾斜的发生和出现。将较小 RDD 中的数据直接通过 collect 算子拉取到 Driver 端的内存中来,然后对其创建一个 broadcast 变量;接着对另外一个 RDD 执行 map 类算子,在算子函数内,从 broadcast 变量中获取较小 RDD 的全量数据,与当前 RDD 的每一条数据按照连接 key 进行比对,如果连接 key 相同的话,那么就将两个 RDD 的数据用你需要的方式连接起来。

根据上述思路,根本不会发生 shuffle 操作,从根本上杜绝了 join 操作可能导致的数据倾斜问题。

当 join 操作有数据倾斜问题并且其中一个 RDD 的数据量较小时,可以优先考虑这种方式,效果非常好

map join 的过程如下图所示:

map join 过程

2. 不适用场景分析:

由于 Spark 的广播变量是在每个 Executor 中保存一个副本,如果两个 RDD 数据量都比较大,那么如果将一个数据量比较大的 RDD 做成广播变量,那么很有可能会造成内存溢出。

故障排除

1. 避免 OOM-out of memory

在 Shuffle 过程,reduce 端 task 并不是等到 map 端 task 将其数据全部写入磁盘后再去拉取,而是 map 端写一点数据,reduce 端 task 就会拉取一小部分数据,然后立即进行后面的聚合、算子函数的使用等操作

reduce 端 task 能够拉取多少数据,由 reduce 拉取数据的缓冲区 buffer 来决定,因为拉取过来的数据都是先放在 buffer 中,然后再进行后续的处理,buffer 的默认大小为 48MB

reduce 端 task 会一边拉取一边计算,不一定每次都会拉满 48MB 的数据,可能大多数时候拉取一部分数据就处理掉了。

虽然说增大 reduce 端缓冲区大小可以减少拉取次数,提升 Shuffle 性能,但是有时 map 端的数据量非常大,写出的速度非常快,此时 reduce 端的所有 task 在拉取的时候,有可能全部达到自己缓冲的最大极限值,即 48MB,此时,再加上 reduce 端执行的聚合函数的代码,可能会创建大量的对象,这可能会导致内存溢出,即 OOM

如果一旦出现 reduce 端内存溢出的问题,我们可以考虑减小 reduce 端拉取数据缓冲区的大小,例如减少为 12MB

在实际生产环境中是出现过这种问题的,这是典型的以性能换执行的原理。reduce 端拉取数据的缓冲区减小,不容易导致 OOM,但是相应的,reudce 端的拉取次数增加,造成更多的网络传输开销,造成性能的下降。

注意,要保证任务能够运行,再考虑性能的优化。

2. 避免 GC 导致的 shuffle 文件拉取失败

在 Spark 作业中,有时会出现shuffle file not found的错误,这是非常常见的一个报错,有时出现这种错误以后,选择重新执行一遍,就不再报出这种错误

出现上述问题可能的原因是 Shuffle 操作中,后面 stage 的 task 想要去上一个 stage 的 task 所在的 Executor 拉取数据,结果对方正在执行 GC,执行 GC 会导致 Executor 内所有的工作现场全部停止,比如 BlockManager、基于 netty 的网络通信等,这就会导致后面的 task 拉取数据拉取了半天都没有拉取到,就会报出shuffle file not found的错误,而第二次再次执行就不会再出现这种错误。

可以通过调整 reduce 端拉取数据重试次数和 reduce 端拉取数据时间间隔这两个参数来对 Shuffle 性能进行调整,增大参数值,使得 reduce 端拉取数据的重试次数增加,并且每次失败后等待的时间间隔加长。

JVM GC 导致的 shuffle 文件拉取失败调整数据重试次数和 reduce 端拉取数据时间间隔:

val conf = new SparkConf()  .set("spark.shuffle.io.maxRetries""6")  .set("spark.shuffle.io.retryWait""60s")

3. YARN-CLIENT 模式导致的网卡流量激增问题

在 YARN-client 模式下,Driver 启动在本地机器上,而 Driver 负责所有的任务调度,需要与 YARN 集群上的多个 Executor 进行频繁的通信。

假设有 100 个 Executor,1000 个 task,那么每个 Executor 分配到 10 个 task,之后,Driver 要频繁地跟 Executor 上运行的 1000 个 task 进行通信,通信数据非常多,并且通信品类特别高。这就导致有可能在 Spark 任务运行过程中,由于频繁大量的网络通讯,本地机器的网卡流量会激增。

注意,YARN-client 模式只会在测试环境中使用,而之所以使用 YARN-client 模式,是由于可以看到详细全面的 log 信息,通过查看 log,可以锁定程序中存在的问题,避免在生产环境下发生故障。

在生产环境下,使用的一定是 YARN-cluster 模式。在 YARN-cluster 模式下,就不会造成本地机器网卡流量激增问题,如果 YARN-cluster 模式下存在网络通信的问题,需要运维团队进行解决。

4. YARN-CLUSTER 模式的 JVM 栈内存溢出无法执行问题

当 Spark 作业中包含 SparkSQL 的内容时,可能会碰到 YARN-client 模式下可以运行,但是 YARN-cluster 模式下无法提交运行(报出 OOM 错误)的情况。

YARN-client 模式下,Driver 是运行在本地机器上的,Spark 使用的 JVM 的 PermGen 的配置,是本地机器上的 spark-class 文件,JVM 永久代的大小是 128MB,这个是没有问题的,但是在 YARN-cluster 模式下,Driver 运行在 YARN 集群的某个节点上,使用的是没有经过配置的默认设置,PermGen 永久代大小为 82MB

SparkSQL 的内部要进行很复杂的 SQL 的语义解析、语法树转换等等,非常复杂,如果 sql 语句本身就非常复杂,那么很有可能会导致性能的损耗和内存的占用,特别是对 PermGen 的占用会比较大。

所以,此时如果 PermGen 占用好过了 82MB,但是又小于 128MB,就会出现 YARN-client 模式下可以运行,YARN-cluster 模式下无法运行的情况

解决上述问题的方法是增加 PermGen(永久代)的容量,需要在 spark-submit 脚本中对相关参数进行设置,设置方法如下:

--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"

通过上述方法就设置了 Driver 永久代的大小,默认为 128MB,最大 256MB,这样就可以避免上面所说的问题。

5. 避免 SparkSQL JVM 栈内存溢出

当 SparkSQL 的 sql 语句有成百上千的 or 关键字时,就可能会出现 Driver 端的 JVM 栈内存溢出。

JVM 栈内存溢出基本上就是由于调用的方法层级过多,产生了大量的,非常深的,超出了 JVM 栈深度限制的递归。(我们猜测 SparkSQL 有大量 or 语句的时候,在解析 SQL 时,例如转换为语法树或者进行执行计划的生成的时候,对于 or 的处理是递归,or 非常多时,会发生大量的递归)

此时,建议将一条 sql 语句拆分为多条 sql 语句来执行,每条 sql 语句尽量保证 100 个以内的子句。根据实际的生产环境试验,一条 sql 语句的 or 关键字控制在 100 个以内,通常不会导致 JVM 栈内存溢出。


更多大数据好文,欢迎关注公众号【五分钟学大数据

--end--

文章推荐

Spark性能调优-RDD算子调优篇


发布于: 2021 年 03 月 27 日阅读数: 7
用户头像

还未添加个人签名 2020.11.10 加入

专注于大数据技术

评论

发布
暂无评论
Spark性能调优-Shuffle调优及故障排除篇