写点什么

大数据 -89 Spark 应用必备:进程通信、序列化机制与 RDD 执行原理

作者:武子康
  • 2025-09-06
    山东
  • 本文字数:4628 字

    阅读完需:约 15 分钟

大数据-89 Spark应用必备:进程通信、序列化机制与RDD执行原理

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 09 月 01 日更新到:Java-113 深入浅出 MySQL 扩容全攻略:触发条件、迁移方案与性能优化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节完成的内容如下:


  • Spark Super Word Count 程序 Scala 语言编写

  • 将数据写入 MySQL、不写入 MySQL 等编码方式

  • 代码的详细解释与结果


进程通信与序列化机制

Spark 作为分布式计算框架,其核心架构基于 Driver-Executor 模式。在这个架构中:


  1. SparkContext 的角色:SparkContext 是 Spark 应用程序的入口点,代表 Driver 程序与集群资源管理器(如 YARN、Mesos 或 Standalone)进行通信。它负责:

  2. 申请集群资源

  3. 将用户程序转换为任务

  4. 调度任务到 Executor 上执行

  5. 监控任务执行状态

  6. 进程通信与序列化

  7. 由于 Driver 和 Executor 运行在不同的 JVM 进程中,所有跨进程的数据传输都需要序列化

  8. Spark 使用 Java 序列化或 Kryo 序列化来传输闭包和函数对象

  9. 闭包(closure)中引用的所有外部变量都会被序列化并传输到 Executor 端

  10. 自定义 RDD 操作的注意事项

  11. Driver 端初始化工作

  12. RDD 的转换操作(如 map、filter 等)定义在 Driver 端

  13. 广播变量的创建和分发

  14. 累加器的初始化

  15. 示例:val rdd = sc.parallelize(1 to 100) // 在Driver端初始化

  16. Executor 端实际执行

  17. 真正的数据处理在 Executor 节点上执行

  18. 每个 Task 处理 RDD 的一个分区

  19. 示例:rdd.map(_ * 2) // map函数会在Executor端执行

  20. 典型问题与解决方案

  21. 序列化错误:当自定义函数引用了不可序列化的对象时


     class NonSerializable {}     val obj = new NonSerializable     rdd.map(x => x + obj) // 会导致序列化错误
复制代码


 解决方案:要么使对象可序列化,要么在函数内部创建对象
复制代码


  • 闭包陷阱:变量在 Driver 端初始化但在 Executor 端使用


     var counter = 0     rdd.foreach(x => counter += x) // 不会按预期工作
复制代码


 解决方案:使用累加器(Accumulator)来实现跨节点的计数
复制代码


  1. 最佳实践

  2. 尽量使用 Spark 内置的转换操作和动作

  3. 自定义函数应尽量简单且可序列化

  4. 避免在 RDD 操作中创建大对象

  5. 对于需要在多个操作中重用的数据,考虑使用广播变量


理解 Driver 和 Executor 的分工是编写高效 Spark 程序的关键,这有助于避免常见的分布式计算陷阱和提高程序性能。

测试代码

遇到问题

class MyClass1(x: Int) {  val num = x}
object SerializableDemo { def main (args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("SerializableDemo") .setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN")
val rdd1 = sc.makeRDD(1 to 20) def add1(x: Int) = x + 10 val add2 = add1 _
// 过程和方法 都具备序列化的能力 rdd1.map(add1(_)).foreach(println) rdd1.map(add2(_)).foreach(println)
// 普通的类不具备序列化能力 val object1 = new MyClass1(10) // 报错 提示无法序列化 // rdd1.map(x => object1.num + x).foreach(println) }}
复制代码

解决方案 1

case class MyClass2(num: Int)
val object2 = MyClass2(20)rdd1.map(x => object2.num + x).foreach(println)
复制代码

解决方案 2

class MyClass3(x: Int) extends Serializable {  val num = x}
val object3 = new MyClass3(30)rdd1.map(x => object3.num + x).foreach(println)
复制代码

解决方案 3

class MyClass1(x: Int) {  val num = x}
lazy val object4 = new MyClass1(40)rdd1.map(x => object4.num + x).foreach(println)
复制代码

完整代码

package icu.wzk
import org.apache.spark.{SparkConf, SparkContext}
class MyClass1(x: Int) { val num = x}
case class MyClass2(num: Int)
class MyClass3(x: Int) extends Serializable { val num = x}
object SerializableDemo { def main (args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("SerializableDemo") .setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN")
val rdd1 = sc.makeRDD(1 to 20) def add1(x: Int) = x + 10 val add2 = add1 _
// 过程和方法 都具备序列化的能力 rdd1.map(add1(_)).foreach(println) rdd1.map(add2(_)).foreach(println)
// 普通的类不具备序列化能力 val object1 = new MyClass1(10) // 报错 提示无法序列化 // rdd1.map(x => object1.num + x).foreach(println)
// 解决方案1 使用 case class val object2 = MyClass2(20) rdd1.map(x => object2.num + x).foreach(println)
// 解决方案2 实现 Serializable val object3 = new MyClass3(30) rdd1.map(x => object3.num + x).foreach(println)
// 解决方法3 延迟创建 lazy val object4 = new MyClass1(40) rdd1.map(x => object4.num + x).foreach(println)
sc.stop() }}
复制代码

注意事项

  • 如果在方法、函数的定义中引入了不可序列化的对象,也会导致任务不能够序列化

  • 延迟创建的解决方案比较简单,且实用性广

RDD 依赖关系

基本概念

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,可根据这些信息来重新运算和恢复丢失的数据分区。



RDD 和它的依赖的父 RDDs 的关系有两种不同的类型:


  • 窄依赖(narrow dependency):1:1 或 n:1

  • 宽依赖(wide dependency):n:m 意味着有 shuflle



RDD 任务切分中间分为:Driver program、Job、Stage(TaskSet) 和 Task


  • Driver program:初始化一个 SparkContext 即生成一个 Spark 应用

  • Job:一个 Action 算子就会生成一个 Job

  • Stage:根据 RDD 之间的依赖关系不同将 Job 划分成不同的 Stage,遇到一个宽依赖则划分一个 Stage

  • Task:Stage 是一个 TaskSet,将 Stage 划分的结果发送到不同的 Executor 执行即为一个 Task

  • Task 是 Spark 中任务调度的最小单位,每个 Stage 包含许多 Task,这些 Task 执行的计算逻辑是相同的,计算的数据是不同的

  • DriverProgram -> Job -> Stage -> Task 每一层都是 1 对 N 的关系

再回 WordCount

代码部分

你可以用代码执行,也可以在 SparkShell 中执行。


package icu.wzk
import org.apache.spark.{SparkConf, SparkContext}
object ReWordCount {
def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("SparkFindFriends") .setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN")
val rdd1 = sc.textFile("goodtbl.java") val rdd2 = rdd1.flatMap(_.split("\\+")) val rdd3 = rdd2.map((_, 1)) val rdd4 = rdd3.reduceByKey(_ + _) val rdd5 = rdd4.sortByKey() rdd5.count()
// 查看RDD的血缘关系 rdd1.toDebugString rdd5.toDebugString
// 查看依赖 rdd1.dependencies rdd1.dependencies(0).rdd
rdd5.dependencies rdd5.dependencies(0).rdd sc.stop() }
}
复制代码

提出问题

上面的 WordCount 中,一共有几个 Job,几个 Stage,几个 Task?



答案:1 个 Job,3 个 Stage,6 个 Task

RDD 持久化/缓存

基本概念

涉及到的算子:persist、cache、unpersist 都是 Transformation 算子

缓存机制详解

缓存是将计算结果写入不同的存储介质的过程,用户可以通过定义存储级别(StorageLevel)来指定缓存的具体方式。Spark 目前支持以下几种存储级别:


  • MEMORY_ONLY:只存储在内存中(默认级别)

  • MEMORY_AND_DISK:优先存储在内存,内存不足时溢出到磁盘

  • MEMORY_ONLY_SER:序列化存储在内存中

  • MEMORY_AND_DISK_SER:序列化存储在内存,内存不足时溢出到磁盘

  • DISK_ONLY:只存储在磁盘上

  • OFF_HEAP:存储在堆外内存(Tachyon)

缓存的重要性

通过缓存机制,Spark 避免了 RDD 上的重复计算,能够极大提升计算的速度。例如,在一个迭代算法中,如果某个 RDD 被多次使用,缓存它可以避免每次使用时都重新计算。


RDD 持久化或缓存是 Spark 最重要的特征之一,也是 Spark 构建迭代算法和快速交互式查询的关键所在。典型的应用场景包括:


  1. 机器学习中的迭代算法(如梯度下降)

  2. 交互式数据查询(如 Spark SQL)

  3. 图计算算法(如 PageRank)

性能优势

Spark 之所以非常快,一个重要原因就是支持在内存、缓存中持久化数据。当持久化一个 RDD 后:


  • 每个计算节点都会把计算的分片结果保存在内存中

  • 对该数据集进行后续操作(Action)时,可以直接使用缓存的数据

  • 避免了重复计算带来的性能损耗

持久化执行原理

使用 persist()方法时需要注意:


  1. 调用 persist()只是将一个 RDD 标记为需要持久化,并不会立即执行计算

  2. 真正的计算和持久化操作会在遇到第一个行动操作(Action)时触发

  3. 持久化是惰性执行的,与 Spark 的整体计算模型一致


例如:


rdd = sc.parallelize(range(1, 100))# 标记持久化rdd.persist(StorageLevel.MEMORY_ONLY) # 此时还未真正计算和缓存print(rdd.count())  # 第一次行动操作,触发计算和缓存print(rdd.sum())   # 直接使用缓存数据,无需重新计算
复制代码

相关操作

  • cache():等同于 persist(StorageLevel.MEMORY_ONLY)

  • unpersist():手动移除缓存

  • 注意:缓存占用内存空间,不再需要时应及时释放



  • RDD 缓存的重要性:在 Spark 应用中,如果一个 RDD 会被多次使用(例如在多个 action 操作中被调用),且该 RDD 的计算过程涉及复杂的转换操作(如多表 join、聚合计算等),那么将其缓存(通过persist()cache()方法)可以显著提升性能。例如,在对一个大型日志数据集先进行filter操作后,如果后续需要多次执行countcollect等操作,缓存过滤后的 RDD 能避免重复执行耗时的过滤计算。

  • 缓存的容错机制详解

  • Spark 的 RDD 缓存采用"血缘关系(Lineage)+重算"的容错机制。当缓存的 RDD 分区因节点故障或内存不足被清除时:

  • Spark 会通过 RDD 的血缘关系图(记录所有转换操作的 DAG)确定需要重算的分区

  • 仅重新执行从原始数据到该分区的转换链(如textFile→map→filter

  • 重算过程是自动触发的,对用户透明。例如,若缓存了经过 10 次转换的 RDD,丢失后不需要手动重新计算,系统会自动从最近的持久化点开始重算。

  • 分区级重算的优势

  • RDD 的分区特性(Partition)使得故障恢复非常高效:

  • 每个分区独立存储和计算,如 200 个分区的 RDD 只丢失 2 个分区时,仅需重算这 2 个分区

  • 重算过程可以并行执行,不同 worker 节点可以同时计算不同丢失分区

  • 与完整重算相比,分区级恢复能节省 90%以上的计算资源(假设仅有 10%分区丢失)

  • 实际案例:在 TB 级数据分析中,某个节点故障导致 5%分区丢失,系统在 30 秒内就完成了受影响分区的重算,而完整重算需要 10 分钟

持久化级别

使用 cache() 方法时,会调用 persist(MEMORY_ONLY),即


cache() == persist(StorageLevel.Memory.ONLY)
复制代码


对于其他的存储级别,如下图:


  • MEMORY_ONLY

  • MEMORY_AND_DISK

  • MEMORY_ONLY_SER

  • MEMORY_AND_DISK_SER

  • DISK_ONLY

  • DISK_ONLY_2



发布于: 刚刚阅读数: 5
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-89 Spark应用必备:进程通信、序列化机制与RDD执行原理_Java_武子康_InfoQ写作社区