大数据 -85 Spark Action 操作详解:从 Collect 到存储的全景解析

点一下关注吧!!!非常感谢!!持续更新!!!
🚀 AI 篇持续更新中!(长期更新)
AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖
💻 Java 篇正式开启!(300 篇)
目前 2025 年 09 月 01 日更新到:Java-113 深入浅出 MySQL 扩容全攻略:触发条件、迁移方案与性能优化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!
📊 大数据板块已完成多项干货更新(300 篇):
包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解

章节内容
上节我们完成了如下的内容:
RDD 的创建
从集合创建 RDD、从文件创建 RDD、从 RDD 创建 RDD
RDD 操作算子:Transformation 详细解释
Transformation
RDD 的操作算子分为两类:
Transformation,用来对 RDD 进行转换,这个操作时延迟执行的(或者是 Lazy),Transformation,返回一个新的 RDD
Action,用来触发 RDD 的计算,得到相关计算结果或者将结果保存到外部系统中,Action:返回 int、double、集合(不会返回新的 RDD)
续接上节

上节完成了 Transformation
Action 详解
Action 是 Spark 中触发实际计算的操作,它会触发 RDD 的转换操作并返回结果到驱动程序或存储到外部系统。以下是常见的 Action 操作分类和详细说明:
1. 数据收集类操作
这些操作将 RDD 中的数据收集到驱动程序中:
collect():将 RDD 中的所有数据以数组形式返回给驱动程序
示例:
val data = rdd.collect()
注意:当数据量很大时可能导致内存溢出
collectAsMap():针对键值对 RDD,将结果作为 Map 返回
示例:
val mapData = pairRdd.collectAsMap()
特点:如果键有重复,后面的值会覆盖前面的
2. 统计类操作
这些操作对 RDD 中的数据进行统计分析:
count():返回 RDD 中元素的总数
示例:
val total = rdd.count()
stats():返回包含 count、mean、stdev、max、min 的 StatCounter 对象
应用场景:快速获取数据集的基本统计信息
独立统计方法:
mean():计算平均值
stdev():计算标准差
max()/min():返回最大/最小值
3. 聚合类操作
这些操作对 RDD 中的元素进行聚合:
reduce(func):使用给定的函数聚合 RDD 中的元素
示例:
val sum = rdd.reduce((a,b) => a + b)
要求:函数必须满足结合律和交换律
fold(zeroValue)(func):类似 reduce,但提供初始值
示例:
val sum = rdd.fold(0)(_ + _)
特点:每个分区都会从初始值开始计算
aggregate(zeroValue)(seqOp, combOp):更灵活的聚合操作
示例:计算平均值:
4. 元素获取操作
这些操作用于获取 RDD 中的特定元素:
first():返回 RDD 中的第一个元素
应用场景:快速查看数据格式或内容
take(n):返回 RDD 中的前 n 个元素
示例:
val samples = rdd.take(5)
top(n):返回前 n 个最大的元素(按自然顺序或指定排序规则)
示例:
val top10 = rdd.top(10)
扩展:可以使用
top(n)(ordering)
指定排序规则takeSample(withReplacement, num, seed):随机采样
参数说明:
withReplacement:是否放回采样
num:采样数量
seed:随机种子(可选)
5. 遍历操作
这些操作对 RDD 中的每个元素执行操作:
foreach(func):对每个元素应用函数
示例:
rdd.foreach(println)
应用场景:将数据写入外部系统
foreachPartition(func):对每个分区应用函数
示例:数据库写入优化:
优势:减少连接创建开销
6. 存储操作
这些操作将 RDD 存储到外部系统:
saveAsTextFile(path):将 RDD 保存为文本文件
特点:每个分区保存为一个文件
saveAsSequenceFile(path):将键值对 RDD 保存为 Hadoop SequenceFile
要求:键值都必须实现 Writable 接口
saveAsObjectFile(path):使用 Java 序列化保存 RDD
特点:可以保存任意类型的 RDD
使用注意事项
Action 操作会触发实际计算,应谨慎使用
大数据集避免使用 collect(),可能导致驱动程序内存溢出
存储操作是惰性的,只有在 Action 被调用时才会执行
聚合操作应注意数据倾斜问题
Key-Value RDD
RDD 整体上分为 Value 类型 和 Key-Value 类型前面介绍是 Value 类型的 RDD 操作,实际上使用更多的是 Key-Value 类型的 RDD,也称为 PairRDD
Value 类型的 RDD 操作基本集中在 RDD.scala 中
Key-Value 类型的 RDD 操作集中在 PairRDDFunctions.scala 中
创建 PairRDD
运行查看如下的结果:

Transformation 操作
mapValues
运行结果如下图:

flatMapValues
将 values 压平、拍平
执行结果如下图所示:

groupByKey
键值对的 key 表示图书名称,value 表示某天图书销量。计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
执行结果如下图所示:

reduceByKey
这种方式也可以 rdd.mapValues((_, 1)).reduceByKey((x, y)=> (x._1+y._1, x._2+y._2)).mapValues(x => (x._1.toDouble / x._2)).collect()
foldByKey
执行结果如下图所示:

sortByKey
根据 key 来进行排序
执行如下图所示:

cogroup
执行结果如下图所示:

outerjoin
执行结果如下图所示:

lookup
执行结果如下图所示:

文件输入输出
文本文件
数据读取:
使用
textFile(String path)
方法读取文本文件可以指定单个文件路径,如
hdfs://path/to/file.txt
支持通配符匹配多个文件,如
hdfs://path/to/*.txt
可以指定最小分区数作为第二个参数:
textFile(path, minPartitions)
返回值:
RDD[(String, String)]
,其中:Key 是文件的完整路径名
Value 是文件的全部内容
示例:
val rdd = sc.textFile("data/README.md")
数据保存:
使用
saveAsTextFile(String path)
方法保存 RDD 内容会按分区将数据保存到指定目录下的多个文件中
示例:
rdd.saveAsTextFile("output/result")
csv 文件
数据读取:
首先使用
textFile()
方法将文件作为普通文本读取然后对每行数据进行解析:
使用字符串分割方法(如
split(",")
)或使用专门的 CSV 解析库(如 OpenCSV)
对于包含表头的 CSV 文件,需要先处理首行
示例:
数据保存:
将结构化 RDD 转换为字符串格式
使用
saveAsTextFile()
方法输出可以添加表头行作为第一个元素
示例:
JSON 文件
数据读取:
使用
textFile()
逐行读取 JSON 文件使用 JSON 解析库(如 Jackson、Gson 等)解析每行数据
对于多行 JSON 记录,需要特殊处理
示例:
数据保存:
将结构化数据转换为 JSON 字符串
使用
saveAsTextFile()
方法输出示例:
SparkSQL 处理:
使用
spark.read.json()
方法直接读取 JSON 文件自动推断 Schema 并创建 DataFrame
示例:
SequenceFile
特点:
Hadoop 设计的二进制键值对存储格式
支持压缩,适合大数据存储
键值类型需要实现 Hadoop 的 Writable 接口
数据读取:
使用
sequenceFile[KeyClass, ValueClass](path)
方法需要指定键值类型
示例:
数据保存:
使用
saveAsSequenceFile(path)
方法需要 PairRDD(键值对 RDD)
示例:
对象文件
特点:
使用 Java 序列化机制
适合存储任意 Java/Scala 对象
序列化效率较低,不适合大规模数据
数据读取:
使用
objectFile[T](path)
方法需要指定对象类型
示例:
数据保存:
使用
saveAsObjectFile(path)
方法RDD 元素需要可序列化
示例:
版权声明: 本文为 InfoQ 作者【武子康】的原创文章。
原文链接:【http://xie.infoq.cn/article/25004f4cc7622261853157bbd】。文章转载请联系作者。
评论