spark 性能调优之 JVN 调优
JVM 调优
对于 JVM 调优,首先应该明确,(major)fullgc/minor gc,都会导致 JVM 的工作线程停止工作,即 stop the world。
1. 降低 cache 操作的内存占比
1. 静态内存管理机制
根据 Spark 静态内存管理机制,堆内存被划分为了两块,Storage 和 Execution。Storage 主要用于缓存 RDD 数据和 broadcast 数据,Execution 主要用于缓存在 shuffle 过程中产生的中间数据,Storage 占系统内存的 60%,Execution 占系统内存的 20%,并且两者完全独立。
在一般情况下,Storage 的内存都提供给了 cache 操作,但是如果在某些情况下 cache 操作内存不是很紧张,而 task 的算子中创建的对象很多,Execution 内存又相对较小,这回导致频繁的 minor gc,甚至于频繁的 full gc,进而导致 Spark 频繁的停止工作,性能影响会很大。
在 Spark UI 中可以查看每个 stage 的运行情况,包括每个 task 的运行时间、gc 时间等等,如果发现 gc 太频繁,时间太长,就可以考虑调节 Storage 的内存占比,让 task 执行算子函数式,有更多的内存可以使用。
Storage 内存区域可以通过 spark.storage.memoryFraction 参数进行指定,默认为 0.6,即 60%,可以逐级向下递减,如代码清单:
代码清单 Storage 内存占比设置
valconf = new SparkConf()
.set("spark.storage.memoryFraction", "0.4")
2. 统一内存管理机制
根据 Spark 统一内存管理机制,堆内存被划分为了两块,Storage 和 Execution。Storage 主要用于缓存数据,Execution 主要用于缓存在 shuffle 过程中产生的中间数据,两者所组成的内存部分称为统一内存,Storage 和 Execution 各占统一内存的 50%,由于动态占用机制的实现,shuffle 过程需要的内存过大时,会自动占用 Storage 的内存区域,因此无需手动进行调节。
2. 设置 Executor 堆外内存
Executor 的堆外内存主要用于程序的共享库、Perm Space、 线程 Stack 和一些 Memory mapping 等, 或者类 C 方式 allocate object。
有时,如果你的 Spark 作业处理的数据量非常大,达到几亿的数据量,此时运行 Spark 作业会时不时地报错,例如 shuffle outputfile cannot find,executor lost,task lost,out of memory 等,这可能是 Executor 的堆外内存不太够用,导致 Executor 在运行的过程中内存溢出。
stage 的 task 在运行的时候,可能要从一些 Executor 中去拉取 shuffle mapoutput 文件,但是 Executor 可能已经由于内存溢出挂掉了,其关联的 BlockManager 也没有了,这就可能会报出 shuffle output file cannot find,executor lost,task lost,out of memory 等错误,此时,就可以考虑调节一下 Executor 的堆外内存,也就可以避免报错,与此同时,堆外内存调节的比较大的时候,对于性能来讲,也会带来一定的提升。
默认情况下,Executor 堆外内存上限大概为 300 多 MB,在实际的生产环境下,对海量数据进行处理的时候,这里都会出现问题,导致 Spark 作业反复崩溃,无法运行,此时就会去调节这个参数,到至少 1G,甚至于 2G、4G。
Executor 堆外内存的配置需要在 spark-submit 脚本里配置,如代码清单所示:
代码清单 Executor 堆外内存配置
--conf spark.yarn.executor.memoryOverhead=2048
以上参数配置完成后,会避免掉某些 JVM OOM 的异常问题,同时,可以提升整体 Spark 作业的性能。
3. 设置连接等待时长
在 Spark 作业运行过程中,Executor 优先从自己本地关联的 BlockManager 中获取某份数据,如果本地 BlockManager 没有的话,会通过 TransferService 远程连接其他节点上 Executor 的 BlockManager 来获取数据。
如果 task 在运行过程中创建大量对象或者创建的对象较大,会占用大量的内存,这回导致频繁的垃圾回收,但是垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark 的 Executor 进程就会停止工作,无法提供相应,此时,由于没有响应,无法建立网络连接,会导致网络连接超时。
在生产环境下,有时会遇到 file not found、file lost 这类错误,在这种情况下,很有可能是 Executor 的 BlockManager 在拉取数据的时候,无法建立连接,然后超过默认的连接等待时长 60s 后,宣告数据拉取失败,如果反复尝试都拉取不到数据,可能会导致 Spark 作业的崩溃。这种情况也可能会导致 DAGScheduler 反复提交几次 stage,TaskScheduler 返回提交几次 task,大大延长了我们的 Spark 作业的运行时间。
此时,可以考虑调节连接的超时时长,连接等待时长需要在 spark-submit 脚本中进行设置,设置方式如代码清单所示:
代码清单 连接等待时长配置
--conf spark.core.connection.ack.wait.timeout=300
调节连接等待时长后,通常可以避免部分的 XX 文件拉取失败、XX 文件 lost 等报错。
评论