写点什么

大数据 -90 Spark RDD 容错机制:Checkpoint 原理、场景与最佳实践 容错机制详解

作者:武子康
  • 2025-09-07
    山东
  • 本文字数:3680 字

    阅读完需:约 12 分钟

大数据-90 Spark RDD容错机制:Checkpoint原理、场景与最佳实践 容错机制详解

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 09 月 01 日更新到:Java-113 深入浅出 MySQL 扩容全攻略:触发条件、迁移方案与性能优化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


AI 辅助调查报告


章节内容

上节完成的内容如下:


  • Spark RDD 的依赖关系

  • 重回 WordCount

  • RDD 持久化

  • RDD 缓存


RDD 容错机制

基本概念

检查点(Checkpoint)机制详解

检查点是 Spark 中一种重要的容错机制,它通过将 RDD 数据持久化到可靠的分布式文件系统(如 HDFS)来实现容错功能。与普通的持久化操作相比,检查点具有以下特点:

工作原理

  1. 执行过程:

  2. 当对 RDD 调用 checkpoint()方法时,Spark 会向该 RDD 注册检查点标记

  3. 在后续的 Action 操作触发时,Spark 会启动专门的作业来执行检查点操作

  4. 检查点数据会被写入配置的检查点目录(通常是 HDFS 路径)

  5. 与持久化的区别:

  6. 持久化(Persist/Cache):

  7. 存储级别:内存/磁盘

  8. 数据位置:Executor 本地

  9. 生命周期:应用程序结束后自动清除

  10. 依赖链:保留完整 Lineage

  11. 检查点(Checkpoint):

  12. 存储级别:HDFS 等可靠存储

  13. 数据位置:分布式文件系统

  14. 生命周期:需要手动清理

  15. 依赖链:会被截断

典型应用场景

  1. 迭代计算:在机器学习算法中,迭代次数可能很多,使用检查点可以定期保存中间结果

  2. 示例:在 PageRank 算法中,每迭代 10 次做一次 checkpoint

  3. 复杂转换:当 RDD 经过大量转换操作导致 Lineage 过长时

  4. 示例:一个 RDD 经过 100 次 map 操作后,建议添加 checkpoint

  5. 关键数据:对计算结果可靠性要求高的场景

配置和使用

  1. 设置检查点目录:


   sc.setCheckpointDir("hdfs://namenode:8020/checkpoint")
复制代码


  1. 执行检查点:


   val rdd = sc.textFile("hdfs://...")   rdd.checkpoint()   rdd.count() // 触发检查点执行
复制代码

性能考量

  1. 优点:

  2. 提供可靠的故障恢复能力

  3. 可以截断过长的 Lineage

  4. 适用于长期运行的 Spark 应用

  5. 缺点:

  6. 写入 HDFS 会带来额外的 I/O 开销

  7. 需要额外的存储空间

  8. 不是实时生效,需要等待作业完成

最佳实践

  1. 对于需要重复使用的 RDD,建议先 persist()再 checkpoint()

  2. 在宽依赖(shuffle)之后添加 checkpoint 效果更好

  3. 合理设置检查点间隔,避免过于频繁的检查点操作影响性能


通过合理使用检查点机制,可以在保证数据可靠性的同时,提高 Spark 应用的容错能力和执行效率。

适合场景

  • DAG 中的 Lineage 过长,如果重新计算,开销会很大

  • 在宽依赖上做 checkpoint 获得的收益更大

启动 Shell

# 启动 spark-shellspark-shell --master local[*]
复制代码

checkpoint

// 设置检查点目录sc.setCheckpointDir("/tmp/checkpoint")
val rdd1 = sc.parallelize(1 to 1000)val rdd2 = rdd1.map(_*2)rdd2.checkpoint// checkpoint是lazy操作rdd2.isCheckpointed
复制代码


可以发现,返回结果是 False


RDD 依赖关系 1

checkpoint 之前的 rdd 依赖关系


  • rdd2.dependencies(0).rdd

  • rdd2.dependencies(0).rdd.collect


我们可以观察到,依赖关系是有的,关系到之前的 rdd1 的数据了:


触发 checkpoint

我们可以通过执行 Action 的方式,来触发 checkpoint 执行一次 action,触发 checkpoint 的执行


  • rdd2.count

  • rdd2.isCheckpointed


此时观察,可以发现 checkpoint 已经是 True 了:


RDD 依赖关系 2

我们再次观察 RDD 的依赖关系:再次查看 RDD 的依赖关系。可以看到 checkpoint 后,RDD 的 lineage 被截断,变成从 checkpointRDD 开始


  • rdd2.dependencies(0).rdd

  • rdd2.dependencies(0).rdd.collect


此时观察到,已经不是最开始的 rdd1 了:


查看 checkpoint

我们可以查看对应的保存的文件,查看 RDD 所依赖的 checkpoint 文件


  • rdd2.getCheckpointFile 运行的结果如下图:

RDD 的分区

基本概念

spark.default.paralleism: 默认的并发数 2
复制代码

本地模式

# 此时 spark.default.paralleism 为 Nspark-shell --master local[N]# 此时 spark.default.paralleism 为 1spark-shell --master local
复制代码

伪分布式

  • x 为本机上启动的 Executor 数

  • y 为每个 Executor 使用的 core 数

  • z 为每个 Executor 使用的内存

  • spark.default.paralleism 为 x * y


spark-shell --master local-cluster[x,y,z]
复制代码

分布式模式

spark.default.paralleism = max(应用程序持有Executor的core总数, 2)
复制代码

创建 RDD 方式

集合创建

简单的说,RDD 分区数等于 cores 总数


val rdd1 = sc.paralleize(1 to 100)rdd.getNumPartitions
复制代码

textFile 创建

如果没有指定分区数:


  • 本地文件: rdd 的分区数 = max(本地文件分片数,sc.defaultMinPartitions)

  • HDFS 文件:rdd 的分区数 = max(HDFS 文件 block 数,sc.defaultMinPartitions)


需要额外注意的是:


  • 本地文件分片数 = 本地文件大小 / 32M

  • 读取 HDFS 文件,同时指定了分区数 < HDFS 文件的 Block 数,指定的数将不会生效


val rdd = sc.textFile("data/1.txt")rdd.getNumPartitions
复制代码

RDD 分区器

判断分区器

以下 RDD 分别是否有分区器,是什么类型的分区器


val rdd1 = sc.textFile("/wcinput/wc.txt")rdd1.partitioner
val rdd2 = sc.flatMap(_.split("\\s+"))rdd2.partitioner
val rdd3 = rdd2.map((_, 1))rdd3.partitioner
val rdd4 = rdd3.reduceByKey(_ + _)rdd4.partitioner
val rdd5 = rdd4.sortByKey()rdd5.partitioner
复制代码

分区器作用与分类

在 PairRDD(key,value)中,很多操作都是基于 Key 的,系统会按照 Key 对数据进行重组,如 GroupByKey 数据重组需要规则,最常见的就是基于 Hash 的分区,此外还有一种复杂的基于抽样 Range 分区方法:


HashPartitioner

最简单、最常用,也是默认提供的分区器。对于给定的 Key,计算 HashCode,并除以分区的个数取余,如果余数小于 0,则用余数+分区的个数,最后返回的值就是这个 Key 所属的分区 ID。该分区方法可以保证 Key 相同的数据出现在同一个分区中。用户可以通过 partitionBy 主动使用分区器,通过 partitions 参数指定想要分区的数量。


默认情况下的分区情况是:


val rdd1 = sc.makeRDD(1 to 100).map((_, 1))rdd1.getNumPartitions
复制代码


执行结果如下图所示:



执行结果如下图所示,分区已经让我们手动控制成 10 个了:


val rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(10))rdd2.getNumPartitionsrdd2.glom.collect.foreach(x => println(x.toBuffer))
复制代码

RangePartitioner

简单来说就是将一定范围内的数映射到某个分区内,在实现中,分界的算法尤为重要,用到了水塘抽样算法。sortByKey 会使用 RangePartitioner。



进行代码的测试:


val rdd3 = rdd1.partitionBy(new org.apache.spark.RangePartitioner(10, rdd1))rdd3.glom.collect.foreach(x => println(x.toBuffer))
复制代码


执行结果如下图所示:



但是现在的问题是:在执行分区之前其实并不知道数据的分布情况,如果想知道数据的分区就需要对数据进行采样。


  • Spark 中的 RangePartitioner 在对数据采样的过程中使用了 “水塘采样法”

  • 水塘采样法是:在包含 N 个项目的集合 S 中选取 K 个样本,其中 N 为 1 或者很大的未知的数量,尤其适用于不能把所有 N 个项目都存放到主内存的情况。

  • 在采样过程中执行了 collect() 操作,引发了 Action 操作。

自定义分区器

Spark 允许用户通过自定义的 Partitioner 对象,灵活的来控制 RDD 的分区方式。我们需要实现自定义分区器,按照以下的规则进行分区:


  • 分区 0 < 100

  • 100 <= 分区 1 < 200

  • 200 <= 分区 2 < 300

  • 300 <= 分区 3 < 400

  • .......

  • 900 <= 分区 9 < 1000

编写代码

package icu.wzk
import org.apache.spark.rdd.RDDimport org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.immutable

class MyPartitioner(n: Int) extends Partitioner {
override def numPartitions: Int = n
override def getPartition(key: Any): Int = { val k = key.toString.toInt k / 100 }}
object UserDefinedPartitioner {
def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("UserDefinedPartitioner") .setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN")
val random = scala.util.Random val arr: immutable.IndexedSeq[Int] = (1 to 100) .map(idx => random.nextInt(1000))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(arr).map((_, 1)) rdd1.glom.collect.foreach(x => println(x.toBuffer))
println("=========================================")
val rdd2 = rdd1.partitionBy(new MyPartitioner(10)) rdd2.glom.collect().foreach(x => println(x.toBuffer)) sc.stop() }
}
复制代码

打包上传

这里之前已经重复过多次,就跳过了


mvn clean package
复制代码

运行测试

spark-submit --master local[*] --class icu.wzk.UserDefinedPartitioner spark-wordcount-1.0-SNAPSHOT.jar
复制代码


可以看到如下的运行结果:



发布于: 刚刚阅读数: 3
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-90 Spark RDD容错机制:Checkpoint原理、场景与最佳实践 容错机制详解_Java_武子康_InfoQ写作社区