大数据 -89 Spark 应用必备:进程通信、序列化机制与 RDD 执行原理

点一下关注吧!!!非常感谢!!持续更新!!!
🚀 AI 篇持续更新中!(长期更新)
AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖
💻 Java 篇正式开启!(300 篇)
目前 2025 年 09 月 01 日更新到:Java-113 深入浅出 MySQL 扩容全攻略:触发条件、迁移方案与性能优化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!
📊 大数据板块已完成多项干货更新(300 篇):
包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解

章节内容
上节完成的内容如下:
Spark Super Word Count 程序 Scala 语言编写
将数据写入 MySQL、不写入 MySQL 等编码方式
代码的详细解释与结果

进程通信与序列化机制
Spark 作为分布式计算框架,其核心架构基于 Driver-Executor 模式。在这个架构中:
SparkContext 的角色:SparkContext 是 Spark 应用程序的入口点,代表 Driver 程序与集群资源管理器(如 YARN、Mesos 或 Standalone)进行通信。它负责:
申请集群资源
将用户程序转换为任务
调度任务到 Executor 上执行
监控任务执行状态
进程通信与序列化:
由于 Driver 和 Executor 运行在不同的 JVM 进程中,所有跨进程的数据传输都需要序列化
Spark 使用 Java 序列化或 Kryo 序列化来传输闭包和函数对象
闭包(closure)中引用的所有外部变量都会被序列化并传输到 Executor 端
自定义 RDD 操作的注意事项:
Driver 端初始化工作:
RDD 的转换操作(如 map、filter 等)定义在 Driver 端
广播变量的创建和分发
累加器的初始化
示例:
val rdd = sc.parallelize(1 to 100) // 在Driver端初始化
Executor 端实际执行:
真正的数据处理在 Executor 节点上执行
每个 Task 处理 RDD 的一个分区
示例:
rdd.map(_ * 2) // map函数会在Executor端执行
典型问题与解决方案:
序列化错误:当自定义函数引用了不可序列化的对象时
闭包陷阱:变量在 Driver 端初始化但在 Executor 端使用
最佳实践:
尽量使用 Spark 内置的转换操作和动作
自定义函数应尽量简单且可序列化
避免在 RDD 操作中创建大对象
对于需要在多个操作中重用的数据,考虑使用广播变量
理解 Driver 和 Executor 的分工是编写高效 Spark 程序的关键,这有助于避免常见的分布式计算陷阱和提高程序性能。
测试代码
遇到问题
解决方案 1
解决方案 2
解决方案 3
完整代码
注意事项
如果在方法、函数的定义中引入了不可序列化的对象,也会导致任务不能够序列化
延迟创建的解决方案比较简单,且实用性广
RDD 依赖关系
基本概念
RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,可根据这些信息来重新运算和恢复丢失的数据分区。

RDD 和它的依赖的父 RDDs 的关系有两种不同的类型:
窄依赖(narrow dependency):1:1 或 n:1
宽依赖(wide dependency):n:m 意味着有 shuflle

RDD 任务切分中间分为:Driver program、Job、Stage(TaskSet) 和 Task
Driver program:初始化一个 SparkContext 即生成一个 Spark 应用
Job:一个 Action 算子就会生成一个 Job
Stage:根据 RDD 之间的依赖关系不同将 Job 划分成不同的 Stage,遇到一个宽依赖则划分一个 Stage
Task:Stage 是一个 TaskSet,将 Stage 划分的结果发送到不同的 Executor 执行即为一个 Task
Task 是 Spark 中任务调度的最小单位,每个 Stage 包含许多 Task,这些 Task 执行的计算逻辑是相同的,计算的数据是不同的
DriverProgram -> Job -> Stage -> Task 每一层都是 1 对 N 的关系
再回 WordCount
代码部分
你可以用代码执行,也可以在 SparkShell 中执行。
提出问题
上面的 WordCount 中,一共有几个 Job,几个 Stage,几个 Task?

答案:1 个 Job,3 个 Stage,6 个 Task
RDD 持久化/缓存
基本概念
涉及到的算子:persist、cache、unpersist 都是 Transformation 算子
缓存机制详解
缓存是将计算结果写入不同的存储介质的过程,用户可以通过定义存储级别(StorageLevel)来指定缓存的具体方式。Spark 目前支持以下几种存储级别:
MEMORY_ONLY:只存储在内存中(默认级别)
MEMORY_AND_DISK:优先存储在内存,内存不足时溢出到磁盘
MEMORY_ONLY_SER:序列化存储在内存中
MEMORY_AND_DISK_SER:序列化存储在内存,内存不足时溢出到磁盘
DISK_ONLY:只存储在磁盘上
OFF_HEAP:存储在堆外内存(Tachyon)
缓存的重要性
通过缓存机制,Spark 避免了 RDD 上的重复计算,能够极大提升计算的速度。例如,在一个迭代算法中,如果某个 RDD 被多次使用,缓存它可以避免每次使用时都重新计算。
RDD 持久化或缓存是 Spark 最重要的特征之一,也是 Spark 构建迭代算法和快速交互式查询的关键所在。典型的应用场景包括:
机器学习中的迭代算法(如梯度下降)
交互式数据查询(如 Spark SQL)
图计算算法(如 PageRank)
性能优势
Spark 之所以非常快,一个重要原因就是支持在内存、缓存中持久化数据。当持久化一个 RDD 后:
每个计算节点都会把计算的分片结果保存在内存中
对该数据集进行后续操作(Action)时,可以直接使用缓存的数据
避免了重复计算带来的性能损耗
持久化执行原理
使用 persist()方法时需要注意:
调用 persist()只是将一个 RDD 标记为需要持久化,并不会立即执行计算
真正的计算和持久化操作会在遇到第一个行动操作(Action)时触发
持久化是惰性执行的,与 Spark 的整体计算模型一致
例如:
相关操作
cache():等同于 persist(StorageLevel.MEMORY_ONLY)
unpersist():手动移除缓存
注意:缓存占用内存空间,不再需要时应及时释放

RDD 缓存的重要性:在 Spark 应用中,如果一个 RDD 会被多次使用(例如在多个 action 操作中被调用),且该 RDD 的计算过程涉及复杂的转换操作(如多表 join、聚合计算等),那么将其缓存(通过
persist()
或cache()
方法)可以显著提升性能。例如,在对一个大型日志数据集先进行filter
操作后,如果后续需要多次执行count
和collect
等操作,缓存过滤后的 RDD 能避免重复执行耗时的过滤计算。缓存的容错机制详解:
Spark 的 RDD 缓存采用"血缘关系(Lineage)+重算"的容错机制。当缓存的 RDD 分区因节点故障或内存不足被清除时:
Spark 会通过 RDD 的血缘关系图(记录所有转换操作的 DAG)确定需要重算的分区
仅重新执行从原始数据到该分区的转换链(如
textFile→map→filter
)重算过程是自动触发的,对用户透明。例如,若缓存了经过 10 次转换的 RDD,丢失后不需要手动重新计算,系统会自动从最近的持久化点开始重算。
分区级重算的优势:
RDD 的分区特性(Partition)使得故障恢复非常高效:
每个分区独立存储和计算,如 200 个分区的 RDD 只丢失 2 个分区时,仅需重算这 2 个分区
重算过程可以并行执行,不同 worker 节点可以同时计算不同丢失分区
与完整重算相比,分区级恢复能节省 90%以上的计算资源(假设仅有 10%分区丢失)
实际案例:在 TB 级数据分析中,某个节点故障导致 5%分区丢失,系统在 30 秒内就完成了受影响分区的重算,而完整重算需要 10 分钟
持久化级别
使用 cache() 方法时,会调用 persist(MEMORY_ONLY),即
对于其他的存储级别,如下图:
MEMORY_ONLY
MEMORY_AND_DISK
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
DISK_ONLY
DISK_ONLY_2

版权声明: 本文为 InfoQ 作者【武子康】的原创文章。
原文链接:【http://xie.infoq.cn/article/411c25c6e65349fb5f1db5d3e】。文章转载请联系作者。
评论