写点什么

Spark 持久化介绍(cache/persist/checkpoint)

用户头像
笨小康
关注
发布于: 2021 年 02 月 04 日

目录

  • 一、RDD 持久化介绍

  • 二、RDD 持久化级别

  • 三、持久化级别选择

  • 四、删除持久化数据

  • 五、RDD cache 和 persist

  • 六、RDD checkpoint

  • 七、DataSet cache 和 persist

一、RDD 持久化

因为 Spark 程序执行的特性,即延迟执行和基于 Lineage 最大化的 pipeline,当 Spark 中由于对某个 RDD 的 Action 操作触发了作业时,会基于 Lineage 从后往前推,找到该 RDD 的源头 RDD,然后从前往后计算出结果。


很明显,如果对某个 RDD 执行了多次 Transformation 和 Action 操作,每次 Action 操作出发了作业时都会重新从源头 RDD 出计算一遍来获得 RDD,再对这个 RDD 执行相应的操作。当 RDD 本身计算特别复杂和耗时时,这种方式性能是非常差的,此时必须考虑对计算结果的数据进行持久化。


数据持久化(或称为缓存)就是将计算出来的 RDD 根据配置的持久化级别,保存在内存或磁盘中,以后每次对该 RDD 进行算子操作时,都会直接从内存或者磁盘中提取持久化的 RDD 数据,然后执行算子操作,而不会从源头处重新计算一遍该 RDD。


二、RDD 持久化级别

Spark 的持久化级别有如下几种:


阅读 Spark 2.1.0 源码,RDD 持久化级别在 StorageLevel 类中有详细介绍,我们来详细看看。

//位置:/org/apache/spark/storage/StorageLevel.scala/** * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating * new storage levels. */object StorageLevel {  val NONE = new StorageLevel(false, false, false, false)  val DISK_ONLY = new StorageLevel(true, false, false, false)  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)  val MEMORY_ONLY = new StorageLevel(false, true, false, true)  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
复制代码


这里列出了 12 种 RDD 缓存级别,每个缓存级别后面都 new 了一个 StorageLevel 类的构造函数,什么意思呢?我么可以看看其构造函数。

// 位置:org/apache/spark/storage/StorageLevel.scalaclass StorageLevel private(    private var _useDisk: Boolean,    private var _useMemory: Boolean,    private var _useOffHeap: Boolean,    private var _deserialized: Boolean,    private var _replication: Int = 1)  extends Externalizable {
// TODO: Also add fields for caching priority, dataset ID, and flushing. private def this(flags: Int, replication: Int) { this((flags & 8) != 0, (flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication) }
def this() = this(false, true, false, false) // For deserialization
def useDisk: Boolean = _useDisk def useMemory: Boolean = _useMemory def useOffHeap: Boolean = _useOffHeap def deserialized: Boolean = _deserialized def replication: Int = _replication
复制代码

StorageLevel 类的主构造器包含了 5 个参数:

  • useDisk:使用硬盘(外存)。

  • useMemory:使用内存。

  • useOffHeap:使用堆外内存,这是 Java 虚拟机里面的概念,堆外内存意味着把内存对象分配在 Java 虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。

  • deserialized:反序列化,其逆过程序列化(Serialization)是 java 提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象。

  • replication:备份数(在多个节点上备份)。


理解了这 5 个参数,就不难理解不同缓存级别的含义了,比如 val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 缓存,表示将 RDD 的数据持久化在硬盘以及内存中,对数据进行序列化存储,并且将每个持久化的数据都复制一份副本保存到其他节点。


三、持久化级别选择

Spark 提供了这么多中持久化策略,那在实际场景中应该如何使用呢?


通常遵循的准则是,优先考虑内存,内存放不下就考虑序列化后放到内存中,尽量不要存储到磁盘中,因为一般 RDD 的重新计算要比从磁盘中读取更快,只有在需要更快的恢复时才使用备份级别(所有的存储级别都可以通过重新计算来提供全面的容错性,但是备份级别允许用于在 RDD 的备份上执行任务,而无须重新计算丢失的分区)。具体的选取方式如下:

  • 采用默认情况下性能最高的 MEMORY_ONLY。该持续化级别下,对 RDD 的后续算子操作,都是基于纯内存中的数据操作,不需要从磁盘文件中读取数据,性能很高,也不需要进行序列化与反序列化操作,避免了这部分开销。但要注意的是,如果 RDD 的数据比较多(比如几十亿条),直接用这种持久化级别可能会导致 JVM 的 OOM 内存溢出异常。

  • 如果使用 MEMROYONLY 级别发生了内存溢出,建议尝试使用 MEMROYONLY_SER 级别。该级别会将 RDD 数据序列化后再保存到内存中,此时每个 partition 仅仅是一个字节数组,减少了对象数量,并降低内存占用,该级别比 MEMORY_ONLY 多出来的性能开销,主要就是序列化和反序列化的开销。同样,如果 RDD 数据过多仍然会导致 OOM 内存溢出异常。

  • 如果纯内存级别都无法使用,则建议使用 MEMROYANDDISK_SER 策略,而不是 MEMORY_AND_DISK 策略。该策略会优先将数据缓存到内存中,内存缓存不下时才会写入磁盘。

  • 通常不建议使用 DISK_ONLY 级别。因为完全基于磁盘文件进行数据的读写,会导致性能急剧下降,有时还不如重新计算一次该 RDD。

  • 通常不建议使用后缀为_2 的备份级别。因为该级别必须将所有的数据都复制一份副本,并发送到其他节点上,而数据复制和网络传输会导致较大的性能开销。除非是作业的高可用性能要求很高,否则不建议使用。

  • OFF_HEAP 级别一般使用得少,但优势也比较明显。该方式将 RDD 数据持久化到 Alluxio 而不是 Executor 内存中,可以避免 Executors 崩溃缓存数据丢失的情况。


四、删除持久化数据

Spark 的机制可以自动监控各个节点上的缓存使用率,并以 LRU (Least Recently Used,近期最少使用)算法删除过时的缓存数据。当然,如果想手动删除一个 RDD 数据的缓存,而不是等待该 RDD 被 Spark 自动移除,可以使用 RDD.unpersist()方法。


五、 RDD cache 和 persist

我们先来看一个 persist 的示例。

// cache 使用示例:val rdd1 = sc.textFile("hdfs://nameservice/data/README.md").cache()rdd1.map(...)rdd1.reduce(...)
// persist 使用示例val rdd1 = sc.textFile("hdfs://nameservice/data/README.md").persist(StorageLevel.MEMORY_AND_DISK_SER)rdd1.map(...)rdd1.reduce(...)
复制代码

唯一可以看出的是 persist() 持久化是可以手动指定持久化类型的,而 cache() 无须指定。那它们之间到底有什么区别呢?


通过阅读 Spark 2.1.0 源码,可以看到 cache() 方法调用了无参的 persist() 方法。想知道两者的区别,还需要进一步看看 persist() 方法逻辑。

//位置:org/apache/spark/rdd/RDD.scala  /**   * Persist this RDD with the default storage level (`MEMORY_ONLY`).   */  def cache(): this.type = persist()
复制代码


可以看到 persist() 方法调用了 persist(StorageLevel.MEMORYONLY) 方法,即默认缓存方式采用 MEMORYONLY 级别。

//位置:org/apache/spark/rdd/RDD.scala  /**   * Persist this RDD with the default storage level (`MEMORY_ONLY`).   */  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
复制代码


继续往下看,persist() 方法有一个 StorageLevel 类型的参数,该参数表示 RDD 的缓存级别。至此,也就能看出 cache 和 persist 的区别了:即 cache 只有一个默认的缓存级别 MEMORY_ONLY,而 persist 可以根据情况设置其他的缓存级别。

//位置:org/apache/spark/rdd/RDD.scala  /**   * Mark this RDD for persisting using the specified level.   *   * @param newLevel the target storage level   * @param allowOverride whether to override any existing level with the new one   */  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {    // TODO: Handle changes of StorageLevel    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {      throw new UnsupportedOperationException(        "Cannot change storage level of an RDD after it was already assigned a level")    }    // If this is the first time this RDD is marked for persisting, register it    // with the SparkContext for cleanups and accounting. Do this only once.    if (storageLevel == StorageLevel.NONE) {      sc.cleaner.foreach(_.registerRDDForCleanup(this))      sc.persistRDD(this)    }    storageLevel = newLevel    this  }
复制代码


六、RDD 的 checkpoint

把数据通过 cache 或 persist 持久化到内存或磁盘中,虽然是快速的但却不是最可靠的,checkpoint 机制的产生就是为了更加可靠地持久化数据以复用 RDD 计算数据,通常针对整个 RDD 计算链路中特别需要数据持久化的缓解,启用 checkpoint 机制来确保高容错和高可用性。


可以通过调用 SparkContext.setCheckpointDir() 方法来指定 checkpoint 是持久化的 RDD 数据的存放位置,这里可以存在本地或 HDFS 中(生产环境通常是放在 HDFS 上,借助 HDFS 本身的高容错和高可靠的特性完成数据的持久化),同时为了提高效率,可以指定多个目录。


需要说明的是,checkpoint 和 persist 一样是惰性执行的,在对某个 RDD 标记了需要 checkpoint 后,并不会立即执行,只有在后续有 Action 触发 Job 从而导致该 RDD 的计算,且在这个 Job 执行完成后,才会从后往前回溯找到标记了 checkpoint 的 RDD,然后重新启动一个 Job 来执行具体的 checkpoint 操作,所以一般都会对需要进行 checkpoint 的 RDD 先进行 persist 标记,从而把该 RDD 的计算结果持久化到内存或者磁盘上,以备 checkpoint 复用


下面是 checkpoint 使用的一个示例:

// 配置 checkpointDirsc.setCheckpointDir("hdfs://nameservice/spark/checkpoint")val rdd1 = sc.textFile("hdfs://nameservice/data/README.md").cache()// 对 rdd1 标记 checkpointrdd1.checkpoint()// action 触发了 Job 才能导致 checkpoint 的真正执行rdd1.count()
复制代码


七、DataSet 的 cache 和 persist

阅读源码中无意中看到 DataSet 也支持 cache 和 persist 持久化方式,和 RDD 的持久化还是不太一样,我们来看看代码。

// 位置:org/apache/spark/sql/Dataset.scala  /**   * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).   *   * @group basic   * @since 1.6.0   */  // DataSet 的 cache 持久化调用  def cache(): this.type = persist()
/** * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`). * * @group basic * @since 1.6.0 */ def persist(): this.type = { sparkSession.sharedState.cacheManager.cacheQuery(this) this }
// 位置:org/apache/spark/sql/Dataset.scala /** * Persist this Dataset with the given storage level. * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, * `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`, * `MEMORY_AND_DISK_2`, etc. * * @group basic * @since 1.6.0 */ // DataSet 的 persist 持久化调用 def persist(newLevel: StorageLevel): this.type = { sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel) this }
// 位置:org/apache/spark/sql/execution/CacheManager.scala /** * Caches the data produced by the logical representation of the given [[Dataset]]. * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because * recomputing the in-memory columnar representation of the underlying table is expensive. */ // cache 和 persist 持久化调用的方法 def cacheQuery( query: Dataset[_], tableName: Option[String] = None, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock
复制代码

通过源码看到 cache() 调用的是无参的 persist() 方法,而 persist 调用 cacheQuery 方法,虽然 cache 和 persist 两者最终调用都是 cacheQuery 方法,但 cache 是采用默认的持久化级别 MEMORY_ADN_DISK,而 persist 则是用户自定义,这里默认的持久化持久和 RDD 是不一样的。


参考连接

  1. https://blog.csdn.net/houmou/article/details/52491419

  2. https://dongkelun.com/2018/06/03/sparkCacheAndPersist/

  3. https://blog.csdn.net/qq_27639777/article/details/82319560


发布于: 2021 年 02 月 04 日阅读数: 17
用户头像

笨小康

关注

万物之中,希望至美! 2018.09.17 加入

还未添加个人简介

评论

发布
暂无评论
Spark 持久化介绍(cache/persist/checkpoint)