写点什么

【干货】大数据开发之 Spark 总结

  • 2021 年 11 月 25 日
  • 本文字数:2357 字

    阅读完需:约 8 分钟

一、本质

Spark 是一个分布式的计算框架,是下一代的 MapReduce,扩展了 MR 的数据处理流程

二、mapreduce 有什么问题

1.调度慢,启动 map、reduce 太耗时

2.计算慢,每一步都要保存中间结果落磁盘

3.API 抽象简单,只有 map 和 reduce 两个原语

4.缺乏作业流描述,一项任务需要多轮 mr

三、spark 解决了什么问题

1.最大化利用内存 cache

2.中间结果放内存,加速迭代

3.将结果集放内存,加速后续查询和处理,解决运行慢的问题

select * from table where col1 > 50

rdd.registerastable(cachetable)

SQL:

select col2, max (col3) from cachetable group by col2

select col3, max (col2) from cachetable group by col3

4. 更丰富的 API(Transformation 类和 Actions 类)

5. 完整作业描述,将用户的整个作业串起来

val file = sc.textFile(hdfs://input)

val counts = file.flatMap(

line => line.split(" "))

.map(word => (word, 1))

.reduceByKey(_ + _)

counts.saveAsTextFile(hdfs://output)

6. 由于 Excutor 进程可以运行多个 Task 线程,因而实现了多线程的操作,加快了处理速度

四、Spark 核心—RDD( Resilient Distributed Dataset 弹性分布式数据集模型)

1.四个特征

– RDD 使用户能够显式将计算结果保存在内存中,控制数据的划分

– 记录数据的变换和描述,而不是数据本身,以保证容错

– 懒操作,延迟计算,action 的时候才操作

– 瞬时性,用时才产生,用完就释放

2.四种构建方法

– 从共享文件系统中获取,如从 HDFS 中读数据构建 RDD

• val a = sc.textFile(“/xxx/yyy/file”)

– 通过现有 RDD 转换得到

• val b = a.map(x => (x, 1))

– 定义一个 scala 数组

• val c = sc.parallelize(1 to 10, 1)

– 由一个已经存在的 RDD 通过持久化操作生成

• val d = a.persist(), a. saveAsHadoopFile(“/xxx/yyy/zzz”)

3.partition 和依赖

– 每个 RDD 包含了数据分块/分区(partition)的集合,每个 partition 是不可分割的

– 每个 partition 的计算就是一个 task,task 是调度的基本单位

– 与父 RDD 的依赖关系(rddA=>rddB)

宽依赖: B 的每个 partition 依赖于 A 的所有 partition

• 比如 groupByKey、reduceByKey、join……,由 A 产生 B 时会先对 A 做 shuffle 分桶

窄依赖: B 的每个 partition 依赖于 A 的常数个 partition

• 比如 map、filter、union……

4.stage 和依赖



– 从后往前,将宽依赖的边删掉,大数据培训连通分量及其在原图中所有依赖的 RDD,构成一个 stage

– 每个 stage 内部尽可能多地包含一组具有窄依赖关系的转换,并将它们流水线并行化

5.数据局部性原则

– 如果一个任务需要的数据在某个节点的内存中,这个任务就会被分配至那个节点

– 需要的数据在某个节点的文件系统中,就分配至那个节点

6.容错性原则

– 如果此 task 失败,AM 会重新分配 task

– 如果 task 依赖的上层 partition 数据已经失效了,会先将其依赖的 partition 计算任务再重算一遍

• 宽依赖中被依赖 partition,可以将数据保存 HDFS,以便快速重构(checkpoint)

• 窄依赖只依赖上层一个 partition,恢复代价较少

– 可以指定保存一个 RDD 的数据至节点的 cache 中,如果内存不够,会 LRU 释放一部分,仍有重构的可能

五、Spark 系统架构



1.Excutor 的内存分为三块:

1)task 执行代码所需的内存,占总内存的 20%;

2)task 通过 shuffle 过程拉取上一个 stage 的 task 的输出后,进行聚合操作时使用,占 20%

3)让 RDD 持久化时使用,默认占 executor 总内存的 60%

2.Excutor 的 cpu core:

每个 core 同一时间只能执行一个线程

六、Spark 资源参数和开发调优

1.七个参数

• num-executors:该作业总共需要多少 executor 进程执行

建议:每个作业运行一般设置 5-~100 个左右较合适

• executor-memory:设置每个 executor 进程的内存, num-executors* executor-memory 代表作业申请的总内存量(尽量不要超过最大总内存的 1/3~1/2)

建议:设置 4G~8G 较合适

• executor-cores: 每个 executor 进程的 CPU Core 数量,该参数决定每个 executor 进程并行执行 task 线程的能力,num-executors * executor-cores 代表作业申请总 CPU core 数(不要超过总 CPU Core 的 1/3~1/2 )

建议:设置 2~4 个较合适

• driver-memory: 设置 Driver 进程的内存

建议:通常不用设置,一般 1G 就够了,若出现使用 collect 算子将 RDD 数据全部拉取到 Driver 上处理,就必须确保该值足够大,否则 OOM 内存溢出

• spark.default.parallelism: 每个 stage 的默认 task 数量

建议:设置 500~1000 较合适,默认一个 HDFS 的 block 对应一个 task,Spark 默认值偏少,这样导致不能充分利用资源

• spark.storage.memoryFraction: 设置 RDD 持久化数据在 executor 内存中能占的比例,默认 0.6,即默认 executor 60%的内存可以保存持久化 RDD 数据

建议:若有较多的持久化操作,可以设置高些,超出内存的会频繁 gc 导致运行缓慢

• spark.shuffle.memoryFraction: 聚合操作占 executor 内存的比例,默认 0.2

建议:若持久化操作较少,但 shuffle 较多时,可以降低持久化内存占比,提高 shuffle 操作内存占比

spark-submit:



2.六个原则

• 避免创建重复的 RDD



• 尽可能复用同一个 RDD



• 对多次使用的 RDD 进行持久化处理



• 避免使用 shuffle 类算子

如:groupByKey、reduceByKey、join 等



• 使用 map-side 预聚合的 shuffle 操作

一定要使用 shuffle 的,无法用 map 类算子替代的,那么尽量使用 map-site 预聚合的算子,如可能的情况下使用 reduceByKey 或 aggregateByKey 算子替代 groupByKey 算子

• 使用 Kryo 优化序列化性能

Kryo 是一个序列化类库,来优化序列化和反序列化性能, Spark 支持使用 Kryo 序列化库,性能比 Java 序列化库高 10 倍左右



七、Spark 技术栈



 • Spark Core: 基于 RDD 提供操作接口,利用 DAG 进行统一的任务规划

• Spark SQL: Hive 的表 + Spark 的里。通过把 Hive 的 HQL 转化为 Spark DAG 计算来实现

• Spark Streaming: Spark 的流式计算框架,延迟在 1S 左右,mini batch 的处理方法

• MLIB: Spark 的机器学习库,包含常用的机器学习算法

• GraphX: Spark 图并行操作库

用户头像

关注尚硅谷,轻松学IT 2021.11.23 加入

还未添加个人简介

评论

发布
暂无评论
【干货】大数据开发之Spark总结