写点什么

大数据 -85 Spark Action 操作详解:从 Collect 到存储的全景解析

作者:武子康
  • 2025-09-02
    山东
  • 本文字数:4188 字

    阅读完需:约 14 分钟

大数据-85 Spark Action 操作详解:从 Collect 到存储的全景解析

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 09 月 01 日更新到:Java-113 深入浅出 MySQL 扩容全攻略:触发条件、迁移方案与性能优化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下的内容:


  • RDD 的创建

  • 从集合创建 RDD、从文件创建 RDD、从 RDD 创建 RDD

  • RDD 操作算子:Transformation 详细解释


Transformation


RDD 的操作算子分为两类:


  • Transformation,用来对 RDD 进行转换,这个操作时延迟执行的(或者是 Lazy),Transformation,返回一个新的 RDD

  • Action,用来触发 RDD 的计算,得到相关计算结果或者将结果保存到外部系统中,Action:返回 int、double、集合(不会返回新的 RDD)


续接上节


上节完成了 Transformation

Action 详解

Action 是 Spark 中触发实际计算的操作,它会触发 RDD 的转换操作并返回结果到驱动程序或存储到外部系统。以下是常见的 Action 操作分类和详细说明:

1. 数据收集类操作

这些操作将 RDD 中的数据收集到驱动程序中:


  • collect():将 RDD 中的所有数据以数组形式返回给驱动程序

  • 示例:val data = rdd.collect()

  • 注意:当数据量很大时可能导致内存溢出

  • collectAsMap():针对键值对 RDD,将结果作为 Map 返回

  • 示例:val mapData = pairRdd.collectAsMap()

  • 特点:如果键有重复,后面的值会覆盖前面的

2. 统计类操作

这些操作对 RDD 中的数据进行统计分析:


  • count():返回 RDD 中元素的总数

  • 示例:val total = rdd.count()

  • stats():返回包含 count、mean、stdev、max、min 的 StatCounter 对象

  • 应用场景:快速获取数据集的基本统计信息

  • 独立统计方法:

  • mean():计算平均值

  • stdev():计算标准差

  • max()/min():返回最大/最小值

3. 聚合类操作

这些操作对 RDD 中的元素进行聚合:


  • reduce(func):使用给定的函数聚合 RDD 中的元素

  • 示例:val sum = rdd.reduce((a,b) => a + b)

  • 要求:函数必须满足结合律和交换律

  • fold(zeroValue)(func):类似 reduce,但提供初始值

  • 示例:val sum = rdd.fold(0)(_ + _)

  • 特点:每个分区都会从初始值开始计算

  • aggregate(zeroValue)(seqOp, combOp):更灵活的聚合操作

  • 示例:计算平均值:


    val result = rdd.aggregate((0, 0))(      (acc, value) => (acc._1 + value, acc._2 + 1),      (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)    )    val avg = result._1 / result._2.toDouble
复制代码

4. 元素获取操作

这些操作用于获取 RDD 中的特定元素:


  • first():返回 RDD 中的第一个元素

  • 应用场景:快速查看数据格式或内容

  • take(n):返回 RDD 中的前 n 个元素

  • 示例:val samples = rdd.take(5)

  • top(n):返回前 n 个最大的元素(按自然顺序或指定排序规则)

  • 示例:val top10 = rdd.top(10)

  • 扩展:可以使用 top(n)(ordering) 指定排序规则

  • takeSample(withReplacement, num, seed):随机采样

  • 参数说明:

  • withReplacement:是否放回采样

  • num:采样数量

  • seed:随机种子(可选)

5. 遍历操作

这些操作对 RDD 中的每个元素执行操作:


  • foreach(func):对每个元素应用函数

  • 示例:rdd.foreach(println)

  • 应用场景:将数据写入外部系统

  • foreachPartition(func):对每个分区应用函数

  • 示例:数据库写入优化:


    rdd.foreachPartition { partition =>      val conn = createConnection()      partition.foreach { element =>        // 写入数据库      }      conn.close()    }
复制代码


  • 优势:减少连接创建开销

6. 存储操作

这些操作将 RDD 存储到外部系统:


  • saveAsTextFile(path):将 RDD 保存为文本文件

  • 特点:每个分区保存为一个文件

  • saveAsSequenceFile(path):将键值对 RDD 保存为 Hadoop SequenceFile

  • 要求:键值都必须实现 Writable 接口

  • saveAsObjectFile(path):使用 Java 序列化保存 RDD

  • 特点:可以保存任意类型的 RDD

使用注意事项

  1. Action 操作会触发实际计算,应谨慎使用

  2. 大数据集避免使用 collect(),可能导致驱动程序内存溢出

  3. 存储操作是惰性的,只有在 Action 被调用时才会执行

  4. 聚合操作应注意数据倾斜问题

Key-Value RDD

RDD 整体上分为 Value 类型 和 Key-Value 类型前面介绍是 Value 类型的 RDD 操作,实际上使用更多的是 Key-Value 类型的 RDD,也称为 PairRDD


  • Value 类型的 RDD 操作基本集中在 RDD.scala 中

  • Key-Value 类型的 RDD 操作集中在 PairRDDFunctions.scala 中

创建 PairRDD

val arr = (1 to 10).toArrayval arr1 = arr.map(x => (x, x*10, x*100))
# rdd1 不是 Pair RDDval rdd1 = sc.makeRDD(arr1)
# rdd2 是 Pair RDDval arr2 = arr.map(x => (x, (x*10, x*100)))val rdd2 = sc.makeRDD(arr2)
复制代码


运行查看如下的结果:


Transformation 操作

mapValues

# mapValues代码更简洁val a = sc.parallelize(List((1,2),(3,4),(5,6)))val b = a.mapValues(x => 1 to x)b.collect
复制代码


运行结果如下图:


flatMapValues

将 values 压平、拍平


val c = a.flatMapValues(x => 1 to x)c.collect
复制代码


执行结果如下图所示:


groupByKey

键值对的 key 表示图书名称,value 表示某天图书销量。计算每个键对应的平均值,也就是计算每种图书的每天平均销量。


val rdd = sc.makeRDD(Array(("spark", 12), ("hadoop", 26),("hadoop", 23), ("spark", 15), ("scala", 26), ("spark", 25),("spark", 23), ("hadoop", 16), ("scala", 24), ("spark", 16)))
# 三种写法rdd.groupByKey().map(x => (x._1, x._2.sum.toDouble/x._2.size)).collect
rdd.groupByKey().map{case (k, v) => (k,v.sum.toDouble/v.size)}.collect
rdd.groupByKey.mapValues(v => v.sum.toDouble/v.size).collect
复制代码


执行结果如下图所示:


reduceByKey

这种方式也可以 rdd.mapValues((_, 1)).reduceByKey((x, y)=> (x._1+y._1, x._2+y._2)).mapValues(x => (x._1.toDouble / x._2)).collect()

foldByKey

rdd.mapValues((_, 1)).foldByKey((0, 0))((x, y) => {(x._1+y._1, x._2+y._2)}).mapValues(x=>x._1.toDouble/x._2).collect
复制代码


执行结果如下图所示:


sortByKey

根据 key 来进行排序


val a = sc.parallelize(List("wyp", "iteblog", "com","397090770", "test"))val b = sc.parallelize(1 to a.count.toInt)val c = a.zip(b)c.sortByKey().collectc.sortByKey(false).collect
复制代码


执行如下图所示:


cogroup

val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"),(3,"Kylin"), (4,"Flink")))val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"赵六"),(6,"冯七")))
# joinval rdd3 = rdd1.cogroup(rdd2)rdd3.collect.foreach(println)
rdd3.filter{case (_, (v1, v2)) => v1.nonEmpty & v2.nonEmpty}.collect
复制代码


执行结果如下图所示:


outerjoin

# 不同的JOIN操作rdd1.join(rdd2).collectrdd1.leftOuterJoin(rdd2).collectrdd1.rightOuterJoin(rdd2).collectrdd1.fullOuterJoin(rdd2).collect
复制代码


执行结果如下图所示:


lookup

rdd1.lookup("1")rdd1.lookup(3)
复制代码


执行结果如下图所示:


文件输入输出

文本文件

  • 数据读取

  • 使用textFile(String path)方法读取文本文件

  • 可以指定单个文件路径,如hdfs://path/to/file.txt

  • 支持通配符匹配多个文件,如hdfs://path/to/*.txt

  • 可以指定最小分区数作为第二个参数:textFile(path, minPartitions)

  • 返回值RDD[(String, String)],其中:

  • Key 是文件的完整路径名

  • Value 是文件的全部内容

  • 示例:val rdd = sc.textFile("data/README.md")

  • 数据保存

  • 使用saveAsTextFile(String path)方法保存 RDD 内容

  • 会按分区将数据保存到指定目录下的多个文件中

  • 示例:rdd.saveAsTextFile("output/result")

csv 文件

  • 数据读取

  • 首先使用textFile()方法将文件作为普通文本读取

  • 然后对每行数据进行解析:

  • 使用字符串分割方法(如split(",")

  • 或使用专门的 CSV 解析库(如 OpenCSV)

  • 对于包含表头的 CSV 文件,需要先处理首行

  • 示例:


     val csv = sc.textFile("data.csv")     val data = csv.map(_.split(","))
复制代码


  • 数据保存

  • 将结构化 RDD 转换为字符串格式

  • 使用saveAsTextFile()方法输出

  • 可以添加表头行作为第一个元素

  • 示例:


     val output = data.map(_.mkString(","))     output.saveAsTextFile("output.csv")
复制代码

JSON 文件

  • 数据读取

  • 使用textFile()逐行读取 JSON 文件

  • 使用 JSON 解析库(如 Jackson、Gson 等)解析每行数据

  • 对于多行 JSON 记录,需要特殊处理

  • 示例:


     import org.json4s._     import org.json4s.jackson.JsonMethods._          val json = sc.textFile("data.json")     val parsed = json.map(parse(_))
复制代码


  • 数据保存

  • 将结构化数据转换为 JSON 字符串

  • 使用saveAsTextFile()方法输出

  • 示例:


     val jsonStrings = data.map(compact(render(_)))     jsonStrings.saveAsTextFile("output.json")
复制代码


  • SparkSQL 处理

  • 使用spark.read.json()方法直接读取 JSON 文件

  • 自动推断 Schema 并创建 DataFrame

  • 示例:


 val df = spark.read.json("data.json") df.write.json("output.json")
复制代码

SequenceFile

  • 特点

  • Hadoop 设计的二进制键值对存储格式

  • 支持压缩,适合大数据存储

  • 键值类型需要实现 Hadoop 的 Writable 接口

  • 数据读取

  • 使用sequenceFile[KeyClass, ValueClass](path)方法

  • 需要指定键值类型

  • 示例:


    val data = sc.sequenceFile[IntWritable, Text]("hdfs://path/to/file")
复制代码


  • 数据保存

  • 使用saveAsSequenceFile(path)方法

  • 需要 PairRDD(键值对 RDD)

  • 示例:


    val pairs = rdd.map(x => (new IntWritable(x._1), new Text(x._2)))    pairs.saveAsSequenceFile("output/seq")
复制代码

对象文件

  • 特点

  • 使用 Java 序列化机制

  • 适合存储任意 Java/Scala 对象

  • 序列化效率较低,不适合大规模数据

  • 数据读取

  • 使用objectFile[T](path)方法

  • 需要指定对象类型

  • 示例:


    val data = sc.objectFile[MyClass]("objdata")
复制代码


  • 数据保存

  • 使用saveAsObjectFile(path)方法

  • RDD 元素需要可序列化

  • 示例:


    case class Person(name: String, age: Int)    val people = sc.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))    people.saveAsObjectFile("people.obj")
复制代码


发布于: 刚刚阅读数: 5
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-85 Spark Action 操作详解:从 Collect 到存储的全景解析_Java_武子康_InfoQ写作社区