大数据 -92 Spark 深入解析 Spark Standalone 模式:组件构成、提交流程与性能优化

点一下关注吧!!!非常感谢!!持续更新!!!
🚀 AI 篇持续更新中!(长期更新)
AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖
💻 Java 篇正式开启!(300 篇)
目前 2025 年 09 月 08 日更新到:Java-118 深入浅出 MySQL ShardingSphere 分片剖析:SQL 支持范围、限制与优化实践 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!
📊 大数据板块已完成多项干货更新(300 篇):
包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解

章节内容
上节完成的内容如下:
Spark 程序的优化:广播变量、累加器
研究广播变量
研究累加器

Standalone 提交
Standalone 组成
Standalone 模式是 Spark 自带的集群管理模式,它由四个核心组件构成,各组件之间通过 RPC 机制进行通信:
Driver(驱动程序)
作为用户编写的 Spark 应用程序的入口点,运行 main()方法
负责任务的解析、调度和监控
将用户程序转换为有向无环图(DAG)并分解为多个阶段(Stage)
在本地或集群中运行,当在集群中运行时,Driver 会向 Master 注册应用
示例:当运行 spark-submit 提交应用时,指定的主类就是 Driver 程序
Master(主节点)
作为集群的资源管理器,采用主从架构
负责接收 Driver 的注册请求
管理所有 Worker 节点,监控其存活状态
根据资源需求调度和分配集群资源
默认监听 7077 端口,可以通过 web UI(默认 8080 端口)查看集群状态
Worker(工作节点)
每个工作节点上运行一个 Worker 进程
负责向 Master 注册并汇报本节点的资源情况(CPU、内存等)
根据 Master 的指令启动/停止 Executor
监控 Executor 的运行状态
一个集群可以包含多个 Worker 节点,通常部署在不同的物理机上
Executor(执行器)
每个 Worker 节点可以运行多个 Executor
负责执行具体的 Task 任务
每个 Executor 包含多个线程池,用于并行执行 Task
提供内存存储功能,可以缓存 RDD 数据
在执行过程中会向 Driver 汇报任务状态
示例:一个拥有 8 核 CPU 的 Worker 节点可能配置 2 个 Executor,每个 Executor 使用 4 个核心
交互流程:
用户提交应用后,Driver 向 Master 注册
Master 根据资源需求,在 Worker 节点上分配资源
Worker 根据分配启动指定数量的 Executor
Executor 启动后反向注册到 Driver
Driver 将 Task 分发到 Executor 执行
资源配置示例:
SparkContext 组件
什么是 SparkContext
SparkContext 是 Spark 应用程序的主控制器,它负责与 Spark 集群的管理节点(Driver)和工作节点(Workers)进行交互。通过 SparkContext,用户可以提交作业、管理 RDD(弹性分布式数据集)和其他数据集,并执行各种操作。SparkContext 是 Spark 应用程序的基础,每个应用程序在启动时都会创建一个 SparkContext 实例。
SparkContext 的主要职责
集群连接: SparkContext 负责连接到集群管理器(如 YARN、Mesos 或 Spark 的独立集群管理器),并获取集群的资源,以便在集群上执行任务。
作业调度: SparkContext 通过 DAG(有向无环图)将用户的应用程序逻辑转换为一系列任务(Tasks),然后将这些任务分配给集群中的工作节点执行。
RDD 管理: RDD 是 Spark 的核心抽象,用于表示分布式数据集。SparkContext 提供了创建 RDD 的方法,如从外部存储系统(HDFS、S3 等)中加载数据,或者从 Scala 集合创建 RDD。
广播变量和累加器: SparkContext 提供了广播变量和累加器的支持,广播变量用于在集群中的所有节点间共享只读数据,累加器用于在集群中执行全局计数或求和操作。
检查点: 为了支持容错,SparkContext 提供了将 RDD 存储到可靠存储中的功能,这称为检查点。这样,在发生故障时,Spark 可以从检查点恢复 RDD。
SparkContext 中的三大组件:
DAGScheduler:负责将 DAG 划分若干个 Stage
TaskScheduler:将 DAGScheduler 提交的 Stage(Taskset)进行优先排序,再将 Task 发送到 Executor
SchedulerBackend:定义了许多与 Executor 事件相关的处理,包括:新的 Executor 注册进来的时候记录 Executor 的信息,增加全局的资源量(核数),Executor 更新状态,若任务完成的话,回收 Core,其他停止 Executor、Remove Executor 等事件

常用的 SparkContext 方法
parallelize: 将本地集合转换为 RDD。
textFile: 从文本文件中读取数据并创建 RDD。
stop: 停止 SparkContext。
broadcast: 创建广播变量。
accumulator: 创建累加器。
Standalone 提交
启动应用程序,完成 SparkContext 的初始化
Driver 向 Master 注册,申请资源
Master 检查集群资源状况,若集群资源满足,通知 Worker 启动 Executor
Executor 启动后向 Driver 注册(称为反向注册)
Driver 完成 DAG 的解析,得到 Tasks,然后向 Executor 发送 Task
Executor 向 Driver 汇总任务的执行情况
应用程序执行完毕,回收资源

Shuffle 原理
基本概念
Shuffle 的本意是洗牌,目的是为了把牌弄乱。在数据处理领域,Shuffle 是一个关键的操作过程:
基本定义
Spark、Hadoop 中的 Shuffle 可不是为了把数据弄乱,而是为了将随机排列的数据转换成具有一定规则的数据。这是一个数据重新分配和重组的过程,确保相同 key 的数据能够被发送到同一个 reduce 任务进行处理。
在分布式计算中的定位
Shuffle 是 MapReduce 计算框架中的一个特殊的阶段,介于 Map 和 Reduce 之间。它负责将 Map 阶段产生的中间结果按照 key 进行重新分区和排序,为 Reduce 阶段做好准备。例如,在单词计数应用中,Shuffle 阶段会将所有相同单词的计数发送到同一个 reduce 任务。
技术实现特点
Shuffle 涉及到了本地磁盘(非 HDFS)的读写和网络传输。具体来说:
Map 任务会将输出写入本地磁盘的临时文件
Reduce 任务通过网络拉取这些中间数据
数据在传输前通常会被压缩以减少网络开销
性能影响
大多数 Spark 作业的性能都消耗在了 Shuffle 阶段,因此 Shuffle 性能的高低直接影响到了整个程序的运行效率。主要体现在:
磁盘 I/O 开销(特别是当数据量很大时)
网络传输开销(跨节点数据传输)
内存使用(需要缓存中间结果)
实际应用中,优化 Shuffle 过程(如调整分区数量、选择合适的序列化方式等)往往能显著提升作业性能。例如,在 Spark 中可以通过设置 spark.shuffle.file.buffer 参数来调整 shuffle 写缓冲区大小,从而改善性能。
Shuffle 历史
Spark 0.8 及以前 Hash Based Shuflle
Spark 0.8.1 为 Hash Based Shuflle 引入 File Consolidation 机制
Spark 0.9 引入 External Append Only Map
Spark 1.1 引入 Sort Based Shuffle,但默认仍为 Hash Based Shuffle
Spark 1.2 默认的 Shuffle 方式改为 Sort Based Shuffle
Spark 1.4 引入 Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-Sort 并入 Sort Based Shuffle
Spark 2.0 Hash Based Shuffle 退出历史舞台

Hash Base Shuffle V1
简单介绍
Hash-based Shuffle 是 Apache Spark 中数据分布和重新排序的一种方式。Shuffle 是指在不同阶段的任务之间重新分配数据的过程。Hash-based Shuffle 在 Spark 1.x 版本中引入,被称为 Shuffle V1。Shuffle V1 是 Spark 最初版本使用的 Shuffle 机制,基于 Hash 方法实现数据分布。它的主要特点是通过对数据的键进行哈希处理,将数据分配到相应的 reducer 节点上。Shuffle V1 的实现相对简单,但在大规模数据处理时存在一些局限性,如磁盘 I/O 过多、垃圾回收压力大等。
每个 Shuffle Map Task 需要为每个下游的 Task 创建一个单独的文件
Shuffle 过程中会生成海量的小文件,同时打开过多的文件、IO 效率低

工作原理
Map 端处理:
每个 map 任务在完成后,会根据键的哈希值将数据划分到不同的 bucket 中,这些 bucket 对应下游的 reduce 任务。
Map 任务会将这些数据块(称为 partition)写入本地磁盘,并为每个 reduce 任务生成一个文件(包括索引文件和数据文件)。
Reduce 端处理:
当 reduce 任务启动时,它会从所有 map 任务生成的输出中拉取对应的数据块。
Reduce 任务根据 map 任务输出的索引文件来读取相应的 partition 数据,并在本地进行聚合或其他处理。
局限性
磁盘 I/O: 每个 map 任务为每个 reduce 任务生成单独的文件,这会导致大量的小文件和频繁的磁盘 I/O 操作。当集群规模和数据量增大时,I/O 开销变得非常大。
垃圾回收: Shuffle V1 在处理过程中会产生大量的中间结果,导致 JVM 内存中会积累大量对象,增加了垃圾回收的压力,可能导致频繁的 GC 暂停(Stop-the-world)。
容错性: 如果某个任务失败,Spark 需要重新计算该任务的所有中间结果,Shuffle V1 没有很好的机制来优化这一过程。
适用场景
尽管 Shuffle V1 存在一些问题,但在小规模数据处理或集群中,Shuffle V1 的性能表现还是可以接受的,特别是对资源消耗较少的作业。不过,随着数据规模的增大,Shuffle V1 的局限性会变得明显,因此后续的 Spark 版本引入了更优化的 Shuffle 机制(Shuffle V2 和 Tungsten-Sort Based Shuffle)。
Hash Base Shuffle V2
简单介绍
Hash-Based Shuffle V2 是 Apache Spark 中对最初版本的 Hash-Based Shuffle 进行的改进,旨在解决 Shuffle V1 中存在的一些性能和稳定性问题。Shuffle 是分布式计算中数据重新分布的重要机制,而 Shuffle V2 的引入大大提高了 Spark 在处理大规模数据集时的性能和效率。
核心思想
Hash Base Shuffle V2 核心思想:允许不同 Task 复用同一批磁盘文件,有效将多个 Task 的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升 ShuffleWrite 的性能,一定程度上解决了 HashV1 中的问题,但不彻底。Hash Shuffle 规避了排序,提高了性能,总的来说在 Hash Shuffle 过程中生成了海量的小文件

Shuffle V2 的改进点
合并输出文件:
在 Shuffle V2 中,map 任务不再为每个 reduce 任务生成一个单独的文件,而是将多个 partition 的输出合并到一个文件中。这样,每个 map 任务只生成一个数据文件和一个索引文件,大大减少了生成的小文件数量。
索引文件记录了每个 reduce 任务的数据在数据文件中的偏移量和长度,reduce 任务可以根据这个索引文件来定位它所需的数据。
磁盘 I/O 优化:
通过合并输出文件,Shuffle V2 大幅减少了磁盘 I/O 操作,减少了文件系统的压力,并且降低了与小文件相关的元数据管理开销。
内存消耗优化:
由于减少了文件数量,Shuffle V2 对 JVM 的内存压力也有所降低,垃圾回收(GC)的频率和时长得到了优化。
容错性改进:
Shuffle V2 采用了更加高效的数据管理机制,使得在任务失败时,重新拉取数据的开销更小。此外,数据文件的合并也使得在节点故障时可以更容易地恢复数据。工作原理
Shuffle V2 的工作原理
Map 端处理:
每个 map 任务在处理数据时,基于键的哈希值将数据分配到不同的 partition。与 Shuffle V1 不同的是,Shuffle V2 将多个 partition 的数据写入同一个文件。
同时生成一个索引文件,记录每个 partition 在数据文件中的位置和长度。Reduce 端处理:
Reduce 任务通过索引文件,定位需要处理的数据块,并从 Map 任务的输出文件中读取相应的数据。
通过这种方式,减少了 I/O 开销,并优化了数据拉取的效率。
适用场景
Shuffle V2 适用于绝大多数的 Spark 作业,特别是在处理大规模数据集时效果尤为明显。它减少了磁盘 I/O 操作,优化了内存消耗,并提高了系统的容错性。对于需要高性能和稳定性的场景,Shuffle V2 是更好的选择。
Sort Base Shuffle
Sort Base Shuffle 大大减少了 Shuffle 过程中产生的文件数,提高 Shuffle 的效率。
Spark Shuffle 与 Hadoop Shuffle 从目的、意义、功能上看是类似的,实现上有区别。

RDD 编程优化
RDD 复用
避免创建重复的 RDD,在开发过程中要注意,对于同一份数据,只应该创建一个 RDD,不要创建过多个 RDD 来表示同一份数据。
RDD 缓存/持久化
当多次对同一个 RDD 执行算子操作时,每一次都会对这个 RDD 以之前的父 RDD 重新计算一次,这种情况是必须要避免的,对同一个 RDD 的重复计算是对资源的极大浪费
对多次使用的 RDD 进行持久化,通过持久化将公共 RDD 的数据缓存到内存/磁盘中,之后对于公共 RDD 的计算都会从内存/磁盘中直接获取 RDD 数据
RDD 的持久化是可以进行序列化的,当内存无法将 RDD 的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据的体积,将数据完整存储在内存中
巧用 filter
尽可能过早地执行 filter 操作,过滤无用数据
在 filter 过滤较多数据后,使用 coalesce 对数据进行重分区
使用高性能算子
避免使用 groupByKey,根据场景选择使用高性能的聚合算子:reduceByKey、aggregateByKey
coalesce、repartition,在可能得情况下优先选择没有 Shuffle 的操作
foreachPartition 优化输出操作
map、mapPartition,选择合理的选择算子,mapPartitions 性能更好,但数据量过大时可能会 OOM
用 repartitionAndSortWithinPartitions 替代 repartition + Sort 操作
合理使用 cache、persist、checkpoint,选择合理的数据存储级别
filter 的使用
减少对数据源的扫描(算法复杂)
设置合理的并行度
Spark 作业中的并行度指各个 Stage 的 Task 的数量
设置合理的并行度,让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度尽可能大,达到充分利用集群资源。
广播大变量
默认情况下,Task 中的算子中如果使用了外部变量,每个 Task 都会获取一份变量的副本,这会造多余的网络传输和内存消耗
使用广播变量,只会在每个 Executor 保存一个副本,Executor 的所有 Task 共用此广播变量,这样就节约了网络及内存资源
版权声明: 本文为 InfoQ 作者【武子康】的原创文章。
原文链接:【http://xie.infoq.cn/article/2096d78213a4141cb478571cd】。文章转载请联系作者。
评论