大数据 -90 Spark RDD 容错机制:Checkpoint 原理、场景与最佳实践 容错机制详解

点一下关注吧!!!非常感谢!!持续更新!!!
🚀 AI 篇持续更新中!(长期更新)
AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖
💻 Java 篇正式开启!(300 篇)
目前 2025 年 09 月 01 日更新到:Java-113 深入浅出 MySQL 扩容全攻略:触发条件、迁移方案与性能优化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!
📊 大数据板块已完成多项干货更新(300 篇):
包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解
AI 辅助调查报告

章节内容
上节完成的内容如下:
Spark RDD 的依赖关系
重回 WordCount
RDD 持久化
RDD 缓存

RDD 容错机制
基本概念
检查点(Checkpoint)机制详解
检查点是 Spark 中一种重要的容错机制,它通过将 RDD 数据持久化到可靠的分布式文件系统(如 HDFS)来实现容错功能。与普通的持久化操作相比,检查点具有以下特点:
工作原理
执行过程:
当对 RDD 调用 checkpoint()方法时,Spark 会向该 RDD 注册检查点标记
在后续的 Action 操作触发时,Spark 会启动专门的作业来执行检查点操作
检查点数据会被写入配置的检查点目录(通常是 HDFS 路径)
与持久化的区别:
持久化(Persist/Cache):
存储级别:内存/磁盘
数据位置:Executor 本地
生命周期:应用程序结束后自动清除
依赖链:保留完整 Lineage
检查点(Checkpoint):
存储级别:HDFS 等可靠存储
数据位置:分布式文件系统
生命周期:需要手动清理
依赖链:会被截断
典型应用场景
迭代计算:在机器学习算法中,迭代次数可能很多,使用检查点可以定期保存中间结果
示例:在 PageRank 算法中,每迭代 10 次做一次 checkpoint
复杂转换:当 RDD 经过大量转换操作导致 Lineage 过长时
示例:一个 RDD 经过 100 次 map 操作后,建议添加 checkpoint
关键数据:对计算结果可靠性要求高的场景
配置和使用
设置检查点目录:
执行检查点:
性能考量
优点:
提供可靠的故障恢复能力
可以截断过长的 Lineage
适用于长期运行的 Spark 应用
缺点:
写入 HDFS 会带来额外的 I/O 开销
需要额外的存储空间
不是实时生效,需要等待作业完成
最佳实践
对于需要重复使用的 RDD,建议先 persist()再 checkpoint()
在宽依赖(shuffle)之后添加 checkpoint 效果更好
合理设置检查点间隔,避免过于频繁的检查点操作影响性能
通过合理使用检查点机制,可以在保证数据可靠性的同时,提高 Spark 应用的容错能力和执行效率。
适合场景
DAG 中的 Lineage 过长,如果重新计算,开销会很大
在宽依赖上做 checkpoint 获得的收益更大
启动 Shell
checkpoint
可以发现,返回结果是 False

RDD 依赖关系 1
checkpoint 之前的 rdd 依赖关系
rdd2.dependencies(0).rdd
rdd2.dependencies(0).rdd.collect
我们可以观察到,依赖关系是有的,关系到之前的 rdd1 的数据了:

触发 checkpoint
我们可以通过执行 Action 的方式,来触发 checkpoint 执行一次 action,触发 checkpoint 的执行
rdd2.count
rdd2.isCheckpointed
此时观察,可以发现 checkpoint 已经是 True 了:

RDD 依赖关系 2
我们再次观察 RDD 的依赖关系:再次查看 RDD 的依赖关系。可以看到 checkpoint 后,RDD 的 lineage 被截断,变成从 checkpointRDD 开始
rdd2.dependencies(0).rdd
rdd2.dependencies(0).rdd.collect
此时观察到,已经不是最开始的 rdd1 了:

查看 checkpoint
我们可以查看对应的保存的文件,查看 RDD 所依赖的 checkpoint 文件
rdd2.getCheckpointFile 运行的结果如下图:
RDD 的分区
基本概念
本地模式
伪分布式
x 为本机上启动的 Executor 数
y 为每个 Executor 使用的 core 数
z 为每个 Executor 使用的内存
spark.default.paralleism 为 x * y
分布式模式
创建 RDD 方式
集合创建
简单的说,RDD 分区数等于 cores 总数
textFile 创建
如果没有指定分区数:
本地文件: rdd 的分区数 = max(本地文件分片数,sc.defaultMinPartitions)
HDFS 文件:rdd 的分区数 = max(HDFS 文件 block 数,sc.defaultMinPartitions)
需要额外注意的是:
本地文件分片数 = 本地文件大小 / 32M
读取 HDFS 文件,同时指定了分区数 < HDFS 文件的 Block 数,指定的数将不会生效
RDD 分区器
判断分区器
以下 RDD 分别是否有分区器,是什么类型的分区器
分区器作用与分类
在 PairRDD(key,value)中,很多操作都是基于 Key 的,系统会按照 Key 对数据进行重组,如 GroupByKey 数据重组需要规则,最常见的就是基于 Hash 的分区,此外还有一种复杂的基于抽样 Range 分区方法:

HashPartitioner
最简单、最常用,也是默认提供的分区器。对于给定的 Key,计算 HashCode,并除以分区的个数取余,如果余数小于 0,则用余数+分区的个数,最后返回的值就是这个 Key 所属的分区 ID。该分区方法可以保证 Key 相同的数据出现在同一个分区中。用户可以通过 partitionBy 主动使用分区器,通过 partitions 参数指定想要分区的数量。
默认情况下的分区情况是:
执行结果如下图所示:

执行结果如下图所示,分区已经让我们手动控制成 10 个了:
RangePartitioner
简单来说就是将一定范围内的数映射到某个分区内,在实现中,分界的算法尤为重要,用到了水塘抽样算法。sortByKey 会使用 RangePartitioner。

进行代码的测试:
执行结果如下图所示:

但是现在的问题是:在执行分区之前其实并不知道数据的分布情况,如果想知道数据的分区就需要对数据进行采样。
Spark 中的 RangePartitioner 在对数据采样的过程中使用了 “水塘采样法”
水塘采样法是:在包含 N 个项目的集合 S 中选取 K 个样本,其中 N 为 1 或者很大的未知的数量,尤其适用于不能把所有 N 个项目都存放到主内存的情况。
在采样过程中执行了 collect() 操作,引发了 Action 操作。
自定义分区器
Spark 允许用户通过自定义的 Partitioner 对象,灵活的来控制 RDD 的分区方式。我们需要实现自定义分区器,按照以下的规则进行分区:
分区 0 < 100
100 <= 分区 1 < 200
200 <= 分区 2 < 300
300 <= 分区 3 < 400
.......
900 <= 分区 9 < 1000
编写代码
打包上传
这里之前已经重复过多次,就跳过了
运行测试
可以看到如下的运行结果:

版权声明: 本文为 InfoQ 作者【武子康】的原创文章。
原文链接:【http://xie.infoq.cn/article/84580474c095938cf540b5075】。文章转载请联系作者。
评论