写点什么

Spark 性能调优 -RDD 算子调优

  • 2022 年 1 月 25 日
  • 本文字数:5919 字

    阅读完需:约 19 分钟

Spark 调优之 RDD 算子调优

不废话,直接进入正题!

1. RDD 复用

在对 RDD 进行算子时,要避免相同的算子和计算逻辑之下对 RDD 进行重复的计算,如下图所示:

RDD的重复计算

RDD 的重复计算

对上图中的 RDD 计算架构进行修改,得到如下图所示的优化结果:

RDD架构优化

RDD 架构优化

2. 尽早 filter

获取到初始 RDD 后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升 Spark 作业的运行效率。

3. 读取大量小文件-用 wholeTextFiles

当我们将一个文本文件读取为 RDD 时,输入的每一行都会成为 RDD 的一个元素。

也可以将多个完整的文本文件一次性读取为一个 pairRDD,其中键是文件名,值是文件内容。

val input:RDD[String] = sc.textFile("dir/*.log") 
复制代码

如果传递目录,则将目录下的所有文件读取作为 RDD。文件路径支持通配符。

但是这样对于大量的小文件读取效率并不高,应该使用 wholeTextFiles 返回值为 RDD[(String, String)],其中 Key 是文件的名称,Value 是文件的内容。

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])
复制代码



wholeTextFiles 读取小文件:

val filesRDD: RDD[(String, String)] =sc.wholeTextFiles("D:\\data\\files", minPartitions = 3)val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n"))val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)
复制代码

4. mapPartition 和 foreachPartition

  • mapPartitions

map(_….)  表示每一个元素

mapPartitions(_….)  表示每个分区的数据组成的迭代器

普通的 map 算子对 RDD 中的每一个元素进行操作,而 mapPartitions 算子对 RDD 中每一个分区进行操作。

如果是普通的 map 算子,假设一个 partition 有 1 万条数据,那么 map 算子中的 function 要执行 1 万次,也就是对每个元素进行操作。

map 算子

map 算子

如果是 mapPartition 算子,由于一个 task 处理一个 RDD 的 partition,那么一个 task 只会执行一次 function,function 一次接收所有的 partition 数据,效率比较高。

mapPartition 算子

mapPartition 算子

比如,当要把 RDD 中的所有数据通过 JDBC 写入数据,如果使用 map 算子,那么需要对 RDD 中的每一个元素都创建一个数据库连接,这样对资源的消耗很大,如果使用 mapPartitions 算子,那么针对一个分区的数据,只需要建立一个数据库连接

mapPartitions 算子也存在一些缺点:对于普通的 map 操作,一次处理一条数据,如果在处理了 2000 条数据后内存不足,那么可以将已经处理完的 2000 条数据从内存中垃圾回收掉;但是如果使用 mapPartitions 算子,但数据量非常大时,function 一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会 OOM,即内存溢出。

因此,mapPartitions 算子适用于数据量不是特别大的时候,此时使用 mapPartitions 算子对性能的提升效果还是不错的。(当数据量很大的时候,一旦使用 mapPartitions 算子,就会直接 OOM)

在项目中,应该首先估算一下 RDD 的数据量、每个 partition 的数据量,以及分配给每个 Executor 的内存资源,如果资源允许,可以考虑使用 mapPartitions 算子代替 map。

  • foreachPartition

rrd.foreache(_….) 表示每一个元素

rrd.forPartitions(_….)  表示每个分区的数据组成的迭代器

在生产环境中,通常使用 foreachPartition 算子来完成数据库的写入,通过 foreachPartition 算子的特性,可以优化写数据库的性能。

如果使用 foreach 算子完成数据库的操作,由于 foreach 算子是遍历 RDD 的每条数据,因此,每条数据都会建立一个数据库连接,这是对资源的极大浪费,因此,对于写数据库操作,我们应当使用 foreachPartition 算子

与 mapPartitions 算子非常相似,foreachPartition 是将 RDD 的每个分区作为遍历对象,一次处理一个分区的数据,也就是说,如果涉及数据库的相关操作,一个分区的数据只需要创建一次数据库连接,如下图所示:

foreachPartition 算子

foreachPartition 算子

使用了 foreachPartition 算子后,可以获得以下的性能提升:

  1. 对于我们写的 function 函数,一次处理一整个分区的数据;

  2. 对于一个分区内的数据,创建唯一的数据库连接;

  3. 只需要向数据库发送一次 SQL 语句和多组参数;

在生产环境中,全部都会使用 foreachPartition 算子完成数据库操作。foreachPartition 算子存在一个问题,与 mapPartitions 算子类似,如果一个分区的数据量特别大,可能会造成 OOM,即内存溢出

5. filter+coalesce/repartition(减少分区)

在 Spark 任务中我们经常会使用 filter 算子完成 RDD 中数据的过滤,在任务初始阶段,从各个分区中加载到的数据量是相近的,但是一旦进过 filter 过滤后,每个分区的数据量有可能会存在较大差异,如下图所示:

分区数据过滤结果

分区数据过滤结果

根据上图我们可以发现两个问题:

  1. 每个 partition 的数据量变小了,如果还按照之前与 partition 相等的 task 个数去处理当前数据,有点浪费 task 的计算资源;

  2. 每个 partition 的数据量不一样,会导致后面的每个 task 处理每个 partition 数据的时候,每个 task 要处理的数据量不同,这很有可能导致数据倾斜问题。

如上图所示,第二个分区的数据过滤后只剩 100 条,而第三个分区的数据过滤后剩下 800 条,在相同的处理逻辑下,第二个分区对应的 task 处理的数据量与第三个分区对应的 task 处理的数据量差距达到了 8 倍,这也会导致运行速度可能存在数倍的差距,这也就是数据倾斜问题

针对上述的两个问题,我们分别进行分析:

  1. 针对第一个问题,既然分区的数据量变小了,我们希望可以对分区数据进行重新分配,比如将原来 4 个分区的数据转化到 2 个分区中,这样只需要用后面的两个 task 进行处理即可,避免了资源的浪费。

  2. 针对第二个问题,解决方法和第一个问题的解决方法非常相似,对分区数据重新分配,让每个 partition 中的数据量差不多,这就避免了数据倾斜问题。

那么具体应该如何实现上面的解决思路?我们需要 coalesce 算子。

repartition 与 coalesce 都可以用来进行重分区,其中 repartition 只是 coalesce 接口中 shuffle 为 true 的简易实现,coalesce 默认情况下不进行 shuffle,但是可以通过参数进行设置。

假设我们希望将原本的分区个数 A 通过重新分区变为 B,那么有以下几种情况:

  1. A > B(多数分区合并为少数分区)

    A 与 B 相差值不大

    此时使用 coalesce 即可,无需 shuffle 过程。

    A 与 B 相差值很大

    此时可以使用 coalesce 并且不启用 shuffle 过程,但是会导致合并过程性能低下,所以推荐设置 coalesce 的第二个参数为 true,即启动 shuffle 过程。

  2. A < B(少数分区分解为多数分区)

此时使用 repartition 即可,如果使用 coalesce 需要将 shuffle 设置为 true,否则 coalesce 无效。

我们可以在 filter 操作之后,使用 coalesce 算子针对每个 partition 的数据量各不相同的情况,压缩 partition 的数量,而且让每个 partition 的数据量尽量均匀紧凑,以便于后面的 task 进行计算操作,在某种程度上能够在一定程度上提升性能

注意:local 模式是进程内模拟集群运行,已经对并行度和分区数量有了一定的内部优化,因此不用去设置并行度和分区数量。


6. 并行度设置

Spark 作业中的并行度指各个 stage 的 task 的数量

如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费,例如,20 个 Executor,每个 Executor 分配 3 个 CPU core,而 Spark 作业有 40 个 task,这样每个 Executor 分配到的 task 个数是 2 个,这就使得每个 Executor 有一个 CPU core 空闲,导致资源的浪费。

理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个 Spark 作业的性能和运行速度。

Spark 官方推荐,task 数量应该设置为 Spark 作业总 CPU core 数量的 2~3 倍。之所以没有推荐 task 数量与 CPU core 总数相等,是因为 task 的执行时间不同,有的 task 执行速度快而有的 task 执行速度慢,如果 task 数量与 CPU core 总数相等,那么执行快的 task 执行完成后,会出现 CPU core 空闲的情况。如果 task 数量设置为 CPU core 总数的 2~3 倍,那么一个 task 执行完毕后,CPU core 会立刻执行下一个 task,降低了资源的浪费,同时提升了 Spark 作业运行的效率。

Spark 作业并行度的设置如下:

val conf = new SparkConf().set("spark.default.parallelism", "500")
复制代码

原则:让 cpu 的 core(cpu 核心数) 充分利用起来, 如有 100 个 core,那么并行度可以设置为 200~300

7. repartition/coalesce 调节并行度

Spark 中虽然可以设置并行度的调节策略,但是,并行度的设置对于 Spark SQL 是不生效的,用户设置的并行度只对于 Spark SQL 以外的所有 Spark 的 stage 生效

Spark SQL 的并行度不允许用户自己指定,Spark SQL 自己会默认根据 hive 表对应的 HDFS 文件的 split 个数自动设置 Spark SQL 所在的那个 stage 的并行度,用户自己通 spark.default.parallelism 参数指定的并行度,只会在没 Spark SQL 的 stage 中生效。

由于 Spark SQL 所在 stage 的并行度无法手动设置,如果数据量较大,并且此 stage 中后续的 transformation 操作有着复杂的业务逻辑,而 Spark SQL 自动设置的 task 数量很少,这就意味着每个 task 要处理为数不少的数据量,然后还要执行非常复杂的处理逻辑,这就可能表现为第一个有 Spark SQL 的 stage 速度很慢,而后续的没有 Spark SQL 的 stage 运行速度非常快。

为了解决 Spark SQL 无法设置并行度和 task 数量的问题,我们可以使用 repartition 算子。

repartition 算子使用前后对比图如下:

repartition 算子使用前后对比图

repartition 算子使用前后对比图

Spark SQL 这一步的并行度和 task 数量肯定是没有办法去改变了,但是,对于 Spark SQL 查询出来的 RDD,立即使用 repartition 算子,去重新进行分区,这样可以重新分区为多个 partition,从 repartition 之后的 RDD 操作,由于不再涉及 Spark SQL,因此 stage 的并行度就会等于你手动设置的值,这样就避免了 Spark SQL 所在的 stage 只能用少量的 task 去处理大量数据并执行复杂的算法逻辑。使用 repartition 算子的前后对比如上图所示

8. reduceByKey 本地预聚合

reduceByKey 相较于普通的 shuffle 操作一个显著的特点就是会进行 map 端的本地聚合,map 端会先对本地的数据进行 combine 操作,然后将数据写入给下个 stage 的每个 task 创建的文件中,也就是在 map 端,对每一个 key 对应的 value,执行 reduceByKey 算子函数。

reduceByKey 算子的执行过程如下图所示:

reduceByKey 算子执行过程

reduceByKey 算子执行过程

使用 reduceByKey 对性能的提升如下:

  1. 本地聚合后,在 map 端的数据量变少,减少了磁盘 IO,也减少了对磁盘空间的占用;

  2. 本地聚合后,下一个 stage 拉取的数据量变少,减少了网络传输的数据量;

  3. 本地聚合后,在 reduce 端进行数据缓存的内存占用减少;

  4. 本地聚合后,在 reduce 端进行聚合的数据量减少。

基于 reduceByKey 的本地聚合特征,我们应该考虑使用 reduceByKey 代替其他的 shuffle 算子,例如 groupByKey。

groupByKey 与 reduceByKey 的运行原理如下图 1 和图 2 所示:

图1:groupByKey原理

图 1:groupByKey 原理

图2:reduceByKey原理

图 2:reduceByKey 原理

根据上图可知,groupByKey 不会进行 map 端的聚合,而是将所有 map 端的数据 shuffle 到 reduce 端,然后在 reduce 端进行数据的聚合操作。由于 reduceByKey 有 map 端聚合的特性,使得网络传输的数据量减小,因此效率要明显高于 groupByKey。

9. 使用持久化+checkpoint

Spark 持久化在大部分情况下是没有问题的,但是有时数据可能会丢失,如果数据一旦丢失,就需要对丢失的数据重新进行计算,计算完后再缓存和使用,为了避免数据的丢失,可以选择对这个 RDD 进行 checkpoint,也就是将数据持久化一份到容错的文件系统上(比如 HDFS)

一个 RDD 缓存并 checkpoint 后,如果一旦发现缓存丢失,就会优先查看 checkpoint 数据存不存在,如果有,就会使用 checkpoint 数据,而不用重新计算。也即是说,checkpoint 可以视为 cache 的保障机制,如果 cache 失败,就使用 checkpoint 的数据。

使用 checkpoint 的优点在于提高了 Spark 作业的可靠性,一旦缓存出现问题,不需要重新计算数据,缺点在于,checkpoint 时需要将数据写入 HDFS 等文件系统,对性能的消耗较大

持久化设置如下:

sc.setCheckpointDir(‘HDFS’)rdd.cache/persist(memory_and_disk)rdd.checkpoint
复制代码

10. 使用广播变量

默认情况下,task 中的算子中如果使用了外部的变量,每个 task 都会获取一份变量的复本,这就造成了内存的极大消耗。一方面,如果后续对 RDD 进行持久化,可能就无法将 RDD 数据存入内存,只能写入磁盘,磁盘 IO 将会严重消耗性能;另一方面,task 在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的 GC,GC 会导致工作线程停止,进而导致 Spark 暂停工作一段时间,严重影响 Spark 性能。

假设当前任务配置了 20 个 Executor,指定 500 个 task,有一个 20M 的变量被所有 task 共用,此时会在 500 个 task 中产生 500 个副本,耗费集群 10G 的内存,如果使用了广播变量, 那么每个 Executor 保存一个副本,一共消耗 400M 内存,内存消耗减少了 5 倍。

广播变量在每个 Executor 保存一个副本,此 Executor 的所有 task 共用此广播变量,这让变量产生的副本数量大大减少

在初始阶段,广播变量只在 Driver 中有一份副本。task 在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的 Executor 对应的 BlockManager 中尝试获取变量,如果本地没有,BlockManager 就会从 Driver 或者其他节点的 BlockManager 上远程拉取变量的复本,并由本地的 BlockManager 进行管理;之后此 Executor 的所有 task 都会直接从本地的 BlockManager 中获取变量。

对于多个 Task 可能会共用的数据可以广播到每个 Executor 上:

val 广播变量名= sc.broadcast(会被各个Task用到的变量,即需要广播的变量)
广播变量名.value//获取广播变量
复制代码

11. 使用 Kryo 序列化

默认情况下,Spark 使用 Java 的序列化机制。Java 的序列化机制使用方便,不需要额外的配置,在算子中使用的变量实现 Serializable 接口即可,但是,Java 序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。

Spark 官方宣称 Kryo 序列化机制比 Java 序列化机制性能提高 10 倍左右,Spark 之所以没有默认使用 Kryo 作为序列化类库,是因为它不支持所有对象的序列化,同时 Kryo 需要用户在使用前注册需要序列化的类型,不够方便,但从 Spark 2.0.0 版本开始,简单类型、简单类型数组、字符串类型的 Shuffling RDDs 已经默认使用 Kryo 序列化方式了

Kryo 序列化注册方式的代码如下:

public class MyKryoRegistrator implements KryoRegistrator{  @Override  public void registerClasses(Kryo kryo){    kryo.register(StartupReportLogs.class);  }}
复制代码

配置 Kryo 序列化方式的代码如下:

//创建SparkConf对象val conf = new SparkConf().setMaster(…).setAppName(…)//使用Kryo序列化库conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  //在Kryo序列化库中注册自定义的类集合conf.set("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator"); 
复制代码


发布于: 刚刚阅读数: 2
用户头像

InfoQ签约作者 2020.11.10 加入

文章首发于公众号:五分钟学大数据。大数据领域原创技术号,深入大数据技术

评论

发布
暂无评论
Spark性能调优-RDD算子调优