写点什么

大数据 -84 Spark RDD 创建全攻略:从集合、文件到转换操作详解

作者:武子康
  • 2025-09-01
    山东
  • 本文字数:4018 字

    阅读完需:约 13 分钟

大数据-84 Spark RDD创建全攻略:从集合、文件到转换操作详解

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 09 月 01 日更新到:Java-113 深入浅出 MySQL 扩容全攻略:触发条件、迁移方案与性能优化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下的内容:


  • RDD 的介绍

  • RDD 的特点、特点介绍

  • Spark 编程模型的介绍


RDD 的创建

SparkContext

SparkContext 是 Spark 应用程序的核心组件,也是编写 Spark 程序时需要用到的第一个类。作为 Spark 的主要入口点,它承担着与整个集群交互的重要职责。

核心功能与定位

  1. 客户端与服务端模型

  2. 如果把 Spark 集群比作服务端,那么 Driver 程序就是客户端,而 SparkContext 就是这个客户端的核心引擎

  3. 例如:当提交一个 Spark 作业时,Driver 程序中的 SparkContext 会负责与集群管理器(如 YARN、Mesos 或 Standalone)建立连接

  4. 功能接口

  5. SparkContext 是 Spark 对外的统一接口,为开发者提供访问 Spark 各种功能的入口

  6. 具体功能包括:

  7. 创建 RDD(弹性分布式数据集)

  8. 管理累加器(Accumulators)

  9. 处理广播变量(Broadcast Variables)

  10. 配置 Spark 运行参数

  11. 作业调度与任务分配

  12. 集群连接

  13. 负责建立与 Spark 集群的连接

  14. 管理应用程序与集群资源管理器(如 YARN ResourceManager)的通信

  15. 示例:在初始化时会指定 master URL(如 spark://host:port, local 等)

典型使用场景

  1. RDD 操作

  2. 通过 SparkContext 可以:

  3. 从外部存储系统(如 HDFS、S3)创建 RDD:sc.textFile("hdfs://path/to/file")

  4. 并行化集合:sc.parallelize(Seq(1,2,3))

  5. 共享变量管理

  6. 累加器(用于聚合信息):


    val accum = sc.longAccumulator("My Accumulator")
复制代码


  • 广播变量(高效分发大对象):


    val broadcastVar = sc.broadcast(Array(1, 2, 3))
复制代码


  1. 资源配置

  2. 设置应用程序配置:


    val conf = new SparkConf()      .setAppName("MyApp")      .setMaster("local[4]")    val sc = new SparkContext(conf)
复制代码


注意:在 Spark 2.0+版本中,SparkSession 已成为新的入口点,但在底层仍会创建 SparkContext。对于 RDD 操作,仍然需要直接使用 SparkContext。

从集合创建 RDD

我们在集群的节点上启动 Spark-Shell 进行学习和测试


spark-shell --master local[*]
复制代码


如果顺利启动,你就可以看到如下的画面:



尝试运行如下的指令,感受一下


Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_412)Type in expressions to have them evaluated.Type :help for more information.
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd2.getNumPartitionsres1: Int = 2
scala> rdd2.partitions.lengthres2: Int = 2
scala> val rdd3 = sc.makeRDD(List(1,2,3,4,5))rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24
scala> val rdd4 = sc.makeRDD(1 to 100)rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:24
scala> rdd4.getNumPartitionsres3: Int = 2
scala>
复制代码


对应的截图如下:


从文件系统创建 RDD

用 textFile() 方法来从文件系统中加载数据创建 RDD,方法将文件的 URI 作为参数:


  • 本地文件系统

  • 分布式文件系统 HDFS

  • Amazon S3 的地址


# 本地系统 注意文件要确保存在val lines = sc.textFile("file:///opt/wzk/1.txt")# 从分布式文件系统加载val lines = sc.textFile("hdfs://h121.wzk.icu:9000/wcinput/wordcount.txt")
复制代码


运行结果如下图所示:


从 RDD 创建 RDD

本质是将一个 RDD 转换为另一个 RDD,从 Transformation

Transformation

RDD 的操作算子分为两类:


  • Transformation,用来对 RDD 进行转换,这个操作时延迟执行的(或者是 Lazy),Transformation,返回一个新的 RDD

  • Action,用来触发 RDD 的计算,得到相关计算结果或者将结果保存到外部系统中,Action:返回 int、double、集合(不会返回新的 RDD)


每一个 Transformation 操作都会产生新的 RDD,供给下一个“转换”使用转换得到 RDD 是惰性求值,也就是说,整个转换过程只有记录了转换的轨迹,并不会发生真正的计算,只有遇到 Action 操作时,才会发生真正的计算,开始从学院关系(lineage)源头开始,进行物理的转换操作。


常见转换算子 1

map(func)

  • 功能:对 RDD 中的每个元素应用 func 函数,生成一个包含转换结果的新 RDD

  • 示例:将整型 RDD 中的每个元素加 1


  val rdd = sc.parallelize(List(1, 2, 3))  val mapped = rdd.map(x => x + 1)  // 结果:List(2, 3, 4)
复制代码


  • 特点:输入输出元素一一对应,不改变数据量

filter(func)

  • 功能:筛选出使 func 返回 true 的元素组成新 RDD

  • 示例:过滤出偶数


  val filtered = rdd.filter(x => x % 2 == 0)  // 输入List(1,2,3),输出List(2)
复制代码


  • 应用场景:数据清洗、异常值过滤等

flatMap(func)

  • 功能:每个输入元素可映射为 0 或多个输出元素(返回一个序列)

  • 示例:将每行文本拆分为单词


  val lines = sc.parallelize(List("hello world", "hi"))  val words = lines.flatMap(_.split(" "))  // 输出:List("hello", "world", "hi")
复制代码


  • 与 map 区别:map 保持 1:1 映射,flatMap 允许 1:N 映射

mapPartitions(func)

  • 功能:以分区为单位处理数据,func 接收一个迭代器(代表整个分区)

  • 性能优势:适合需要初始化资源的操作(如数据库连接)


  val partitioned = rdd.mapPartitions(iter => {    // 每个分区初始化一次数据库连接    val conn = createConnection()    iter.map(x => processWithDB(x, conn))  })
复制代码


  • 注意事项:需确保迭代器被完全消费,否则可能导致资源泄漏

mapPartitionsWithIndex(func)

  • 功能:在 mapPartitions 基础上增加分区索引参数

  • 典型应用:调试时查看数据分布


  rdd.mapPartitionsWithIndex((index, iter) => {    println(s"Processing partition $index")    iter.map(x => x * index)  })
复制代码


  • 参数说明:func 接收(Int, Iterator[T]) => Iterator[U],第一个参数是分区索引

性能比较


注意:mapPartitions 系列算子可能引起内存问题,因为需要将整个分区数据加载到内存。

转换算子 1 测试

map filter

测试如下的代码:


val rdd1 = sc.parallelize(1 to 10)val rdd2 = rdd1.map(_*2)val rdd3 = rdd2.filter(_>10)
复制代码


执行结果如下图:



我们可以查看当前的结果,但是当前的操作都是 Transformation 的,并没有真正的执行。我们需要通过 collect 触发执行,拿到最终的结果


rdd2.collectrdd3.collect
复制代码


将会触发执行,可以看到结果为:


flatMap

我们从 HDFS 加载一个文件过来


val rdd4 = sc.textFile("hdfs://h121.wzk.icu:9000/wcinput/wordcount.txt")rdd4.collect
复制代码


执行结果如下图:



我们使用“a”作为分隔符,对这段内容进行分割:


rdd4.flatMap(_.split("a")).collect
复制代码


执行结果如下图:


mapPartitions

val rdd5 = rdd1.mapPartitions(iter => iter.map(_*2))
复制代码


执行结果如下


对比 map 和 mapPartitions

上面我们用:


  • rdd1.map(_*2)

  • rdd1.mapPartitions(iter => iter.map(_*2))


那么这两种有什么区别呢?


  • map:每次只处理一条数据

  • mapPartitions:每次处理一个分区的数据,分区的数据处理完成后,数据才能释放,资源不足时容易 OOM

  • 当资源充足时,建议使用 mapPartitions,充分提高处理效率

常见转换算子 2

  • groupBy(func):按照传入函数的返回值进行分组,将 key 相同的值放入一个迭代器

  • glom():将每一个分区形成一个数组,形成新的 RDD 类型 RDD[Array[T]]

  • sample(withReplacement,fraction,seed):采样算子,以指定的随机数种子 seed 随机抽样出数量为 fraction 的数据,withReplacenent 表示抽出数据是否放回,true 则放回,false 不放回

  • distinct([numTasks]):对 RDD 元素去重后,返回一个新的 RDD,可传入 numTasks 参数改变 RDD 分区数

  • coalesce(numPartitions):缩减分区数,没有 shuffle

  • repartition(numPartitions):增加或减少分区数,有 shuffle

  • sortBy(func,[ascending], [numTasks]):使用 func 对数据进行处理,对处理后的结果进行排序


宽依赖的算子(shuffle):groupBy,distinct、repartition、sortBy

转换算子 2 测试

group by

val rdd1 = sc.parallelize(1 to 10)val group = rdd1.groupBy(_%3)group.collect
复制代码


执行的结果如下图:


glom.map

将 RDD 中元素的每 10 个元素分组


val rdd1 = sc.parallelize(1 to 101)val rdd2 = rdd1.glom.map(_.sliding(10, 10).toArray)rdd2.collect
复制代码


执行结果如下图:


sample

对数据采样,fraction 表示采样的百分比


rdd1.sample(true, 0.2, 2).collectrdd1.sample(false, 0.2, 2).collectrdd1.sample(true, 0.2).collect
复制代码


执行结果如下图:


distinct

对数据进行去重,我们生成一些随机数,然后对这些数值进行去重。


val random = scala.util.Randomval arr = (1 to 20).map(x => random.nextInt(10))val rdd = sc.makeRDD(arr)rdd.distinct.collect
复制代码


执行结果如下图:


numSlices

对 RDD 重分区,我们需要多分一些区出来


val rdd1 = sc.range(1, 1000, numSlices=10)val rdd2 = rdd1.filter(_%2==0)rdd2.getNumPartitions
复制代码


执行结果如下图:


repartition & coalesce

增加或者减少分区


rdd2.getNumPartitions# repartition 是增加和缩减分区数val rdd3 = rdd2.repartition(5)# coalesce 是缩减分区数val rdd4 = rdd2.coalesce(5)
复制代码


执行结果如下图:


sortBy

rdd.sortBy(x => x).collectrdd.sortBy(x => x).collect
复制代码


执行结果如下:


coalesce & repartition

  • repartition:增大或者减少分区数,有 shuffle

  • coalesce:一般用于减少分区数(此时无 shuffle)

发布于: 刚刚阅读数: 2
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-84 Spark RDD创建全攻略:从集合、文件到转换操作详解_Java_武子康_InfoQ写作社区