写点什么

Spark 任务 OOM 问题如何解决?

作者:EquatorCoco
  • 2024-12-05
    福建
  • 本文字数:2530 字

    阅读完需:约 8 分钟

在实际的业务场景中,Spark 任务出现 OOM(Out of Memory) 问题通常是由于任务处理的数据量过大、资源分配不合理或者代码存在性能瓶颈等原因造成的。针对不同的业务场景和原因,可以从以下几个方面进行优化和解决。


一、业务场景及可能的 OOM 原因分析


1、数据量过大

业务场景:处理海量数据集(例如,数亿行日志数据或数十 TB 的数据集),任务执行过程中需要对数据进行大规模的聚合、排序、连接等操作。

OOM 原因:数据无法完全放入内存,导致溢出,尤其是在shufflejoin操作时,数据量暴增。


2、数据倾斜

业务场景:处理的数据分布不均匀(如某个用户或产品的数据量过多),导致部分节点上出现计算或内存瓶颈。

OOM 原因:由于部分节点需要处理大量的数据,某些节点的任务会使用超出可用内存的资源,而其他节点的负载较轻。


3、不合理的资源分配

业务场景:资源分配过低,导致单个任务分配到的内存、CPU 等资源不足。

OOM 原因:Executor 的内存设置太小,或者数据过度缓存,导致内存不足。


4、代码中存在缓存过多或内存使用不合理

业务场景:频繁使用cache()persist(),或对数据结构进行不必要的操作,导致内存过度消耗。

OOM 原因:数据缓存没有及时释放,导致内存占用过多。


二、针对 OOM 问题的解决方案


1. 调整 Executor 的内存和 CPU 资源


通过合理的资源分配,确保每个Executor有足够的内存处理数据。


1、增加 Executor 的内存


Spark 中的Executor负责在集群节点上执行任务,默认每个Executor的内存可能不足以处理大数据集。可以增加Executor的内存以缓解 OOM 问题。


   --executor-memory 8G
复制代码


可以通过--executor-memory选项来设置每个Executor的内存。例如,将内存设置为 8GB。如果数据量很大,可以根据情况设置更大的内存。


2、调整堆外内存


Spark 还使用了一部分堆外内存(off-heap memory)。如果涉及大量的堆外内存操作,可以通过以下配置增加堆外内存:


   --conf spark.memory.offHeap.enabled=true   --conf spark.memory.offHeap.size=4G
复制代码


3、调整 Executor 的 CPU 核心数


为每个Executor分配更多的 CPU 核心,以加快任务的处理速度,防止长时间占用内存。


   --executor-cores 4
复制代码


通过--executor-cores设置每个Executor使用的核心数。例如,可以将核心数设置为 4,以提升并发计算能力。


2. 调整内存管理策略


Spark 的内存管理策略主要涉及以下几个关键参数,它们的优化配置可以帮助减少 OOM 问题。


1、调整内存管理比例


Spark 2.x 及以上版本采用统一的内存管理模型,可以通过调节以下参数优化内存使用:


   --conf spark.memory.fraction=0.8   --conf spark.memory.storageFraction=0.5
复制代码


  • spark.memory.fraction:该参数控制了存储与执行内存的总占比,默认是 0.6,可以适当调高。

  • spark.memory.storageFraction:该参数决定了在memory.fraction的基础上,存储内存的占比。如果需要更多执行内存,可以适当减小该值。


2、减少缓存数据的存储占用及时清理缓存


  • 对于不再需要的数据,及时调用unpersist()来清理缓存,释放内存。

   rdd.unpersist()
复制代码


  • 调整缓存级别:在缓存时,使用StorageLevel.DISK_ONLYStorageLevel.MEMORY_AND_DISK,以减少内存占用。

   rdd.persist(StorageLevel.MEMORY_AND_DISK)
复制代码


3. 数据切分与优化操作


Spark 任务中的shufflejoingroupBy等操作通常会引起大量内存消耗,以下优化可以减轻这些操作带来的 OOM 风险。


1、调整分区数


  • 对于大规模数据操作如joinshuffle等,分区数的设置至关重要。如果分区数过少,可能会导致某些分区数据量过大,进而导致内存溢出。

   rdd.repartition(200)
复制代码


或者在执行某些操作时,显式指定分区数:


   rdd.reduceByKey(_ + _, numPartitions = 200)
复制代码


  • 通常的经验是将分区数量设置为比 Executor 数量高出数倍(例如,每个核心处理 2-4 个分区)。


2、避免过多的宽依赖


  • 宽依赖(如groupByKey)会在 shuffle 时造成内存的压力,特别是数据量较大时,应该尽量避免。可以通过替换为reduceByKey等具有预聚合功能的操作来减少内存消耗:


   rdd.reduceByKey(_ + _)
复制代码


3、避免数据倾斜


如果存在数据倾斜,部分节点处理大量数据,容易导致 OOM。以下是常见的解决方法:


  • 随机键拆分:可以为数据加上随机前缀,以打散数据,避免部分节点数据量过大。


   rdd.map(x => ((x._1 + new Random().nextInt(10)), x._2))
复制代码


  • 广播小表:在join操作中,如果一张表很小,可以使用广播变量,将小表广播到每个节点,减少数据传输和内存占用:


   val broadcastVar = sc.broadcast(smallTable)   largeTable.mapPartitions { partition =>     val small = broadcastVar.value     partition.map(largeRow => ...)   }
复制代码


4. 调整 Spark 的并行度和 Shuffle 机制


Spark 的 shuffle 操作(如groupByKeyjoin)会导致大量数据需要在不同的节点之间传输。如果并行度设置过低,容易导致某个节点处理的数据量过大,从而引发 OOM。


1、增加并行度


   --conf spark.sql.shuffle.partitions=200
复制代码


或者在代码中显式设置:


   spark.conf.set("spark.sql.shuffle.partitions", "200")
复制代码


  • 默认情况下,spark.sql.shuffle.partitions的值可能偏小(例如 200),根据数据规模适当调整该值可以减轻单个节点的负载。


2、调整 Shuffle 合并机制


Spark 3.0 引入了 Adaptive Query Execution (AQE),可以在执行时动态调整 shuffle 的分区数,避免某些分区数据量过大:


   --conf spark.sql.adaptive.enabled=true   --conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=64M
复制代码


AQE 可以根据任务的执行情况自动调整 shuffle 的分区数,从而避免 OOM。


五、小结一下


Spark 任务中的 OOM 问题常常由于数据量过大、数据倾斜、资源分配不合理等问题引起,针对不同的业务场景,可以采取以下措施进行优化:


  1. 合理分配内存和 CPU:增加 Executor 的内存和 CPU 核心数,合理配置内存管理参数。

  2. 调整分区数和优化操作:通过调整分区数、减少宽依赖等方式减少内存占用。

  3. 处理数据倾斜:通过随机键拆分、广播小表等方法避免数据倾斜。

  4. 使用缓存优化内存:减少不必要的cache()persist()操作,并及时释放缓存数据。


好了,今天的内容就写到这里,这些优化方法结合使用,可以有效解决 Spark 任务中的 OOM 问题。


文章转载自:威哥爱编程

原文链接:https://www.cnblogs.com/wgjava/p/18463484

体验地址:http://www.jnpfsoft.com/?from=infoq

用户头像

EquatorCoco

关注

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
Spark任务OOM问题如何解决?_Java_EquatorCoco_InfoQ写作社区