写点什么

大数据 -91 Spark 广播变量:高效共享只读数据的最佳实践 RDD+Scala 编程

作者:武子康
  • 2025-09-08
    山东
  • 本文字数:3769 字

    阅读完需:约 12 分钟

大数据-91 Spark广播变量:高效共享只读数据的最佳实践 RDD+Scala编程

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 09 月 08 日更新到:Java-118 深入浅出 MySQL ShardingSphere 分片剖析:SQL 支持范围、限制与优化实践 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节完成的内容如下:


  • RDD 容错机制

  • RDD 分区机制

  • RDD 分区器

  • RDD 自定义分区器


广播变量

基本介绍

在分布式计算环境中,经常需要在多个任务之间共享变量,或者在任务(Task)和驱动程序(Driver Program)之间传递共享数据。为了满足这个需求并优化 Spark 程序的性能,Spark 提供了两种特殊类型的共享变量机制:


  1. 广播变量(Broadcast Variables)

  2. 累加器(Accumulators)

广播变量详解

广播变量的主要设计目的是为了高效地在集群中的各个 Executor 之间共享较大的只读数据。其工作原理是:由 Driver 程序将变量值广播到所有工作节点(Executor),每个 Executor 只需要接收一次该变量的副本,之后所有任务都可以访问这个只读值。


典型应用场景:


  • 共享大型的查找表或字典数据

  • 分发机器学习模型参数

  • 传递配置信息给所有任务

  • 在 join 操作中优化小表的传输


详细使用步骤:


  1. 创建广播变量(Driver 端)


   # 假设有一个大型字典需要共享   lookup_table = {"key1": "value1", "key2": "value2", ...}     # 创建广播变量   broadcast_var = sc.broadcast(lookup_table)
复制代码


  1. 访问广播变量(Executor 端)


   # 在任务中通过value属性访问   def process_record(record):       # 获取广播变量的值       table = broadcast_var.value       # 使用广播数据进行处理       return table.get(record.key, default_value)
复制代码


  1. 广播变量的特点

  2. 只读性:一旦广播后,不能被修改

  3. 高效传输:使用高效的广播算法(如 BitTorrent-like 协议)减少网络开销

  4. 自动清理:当不再需要时,可以调用unpersist()方法释放资源

  5. 内存管理:广播变量会存储在 Executor 的内存中,直到应用程序结束或显式删除


性能优化建议:


  • 只广播真正需要共享的数据

  • 广播变量的总大小应该控制在 GB 级别以下

  • 对于频繁使用的数据才考虑广播

  • 广播前可以考虑压缩数据


示例场景:


假设我们要处理用户日志数据,需要根据用户 ID 查询用户信息:


# Driver端user_info = {"u001": {"name": "Alice", "age": 25},              "u002": {"name": "Bob", "age": 30}}broadcast_user = sc.broadcast(user_info)
# 在RDD操作中使用logs = sc.textFile("user_logs.txt")enhanced_logs = logs.map(lambda log: { "log": log, "user_info": broadcast_user.value.get(extract_user_id(log), {})})
复制代码


广播变量通过减少数据的重复传输,显著提高了分布式计算的效率,特别是在需要频繁访问共享数据的场景下效果尤为明显。



广播变量的相关参数详解:


  1. spark.broadcast.blockSize(缺省值:4m)

  2. 功能说明:控制广播变量在传输过程中被分割成的块大小

  3. 取值范围:支持 k(千字节)、m(兆字节)、g(千兆字节)等单位

  4. 应用场景:当广播较大数据(如超过 1GB 的查找表)时,可考虑增大此值以减少网络开销

  5. 示例:设置为"8m"可提高大变量传输效率,但会占用更多内存

  6. spark.broadcast.checksum(缺省值:true)

  7. 功能说明:是否启用广播变量的校验和检查

  8. 工作原理:在传输过程中计算校验和来确保数据完整性

  9. 性能影响:启用后会增加少量 CPU 开销(约 1-3%)

  10. 使用建议:在可靠网络环境中可设为 false 以获得更高性能

  11. spark.broadcast.compress(缺省值:true)

  12. 功能说明:控制是否压缩广播变量

  13. 压缩算法:默认使用 Snappy 压缩

  14. 压缩效果:对文本数据通常可减少 50-70%体积

  15. 典型用例:当广播大型数据集(如机器学习特征映射表)时特别有效

  16. 注意事项:对已压缩数据(如 JPEG 图像)可能适得其反


最佳实践建议:


  • 对于小于 100MB 的数据,保持默认参数即可

  • 对于 GB 级数据,可考虑:

  • 增大 blockSize 到 16m-32m

  • 保持 checksum 为 true 以确保数据可靠性

  • 对结构化数据保持 compress 开启

变量应用

普通 JOIN

MapSideJoin

生成数据 test_spark_01.txt

1000;商品11001;商品21002;商品31003;商品41004;商品51005;商品61006;商品71007;商品81008;商品9
复制代码


生成数据格式如下:


生成数据 test_spark_02.txt

10000;订单1;100010001;订单2;100110002;订单3;100210003;订单4;100310004;订单5;100410005;订单6;100510006;订单7;100610007;订单8;100710008;订单9;1008
复制代码


生成的数据格式如下:


编写代码 1

我们编写代码进行测试


package icu.wzk
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}

object JoinDemo {
def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("JoinDemo") .setMaster("local[*]")
val sc = new SparkContext(conf) sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)
val productRDD: RDD[(String, String)] = sc .textFile("data/test_spark_01.txt") .map { line => val fields = line.split(";") (fields(0), line) }
val orderRDD: RDD[(String, String)] = sc .textFile("data/test_spark_02.txt", 8) .map { line => val fields = line.split(";") (fields(2), line) }
val resultRDD = productRDD.join(orderRDD) println(resultRDD.count()) Thread.sleep(100000) sc.stop() }
}
复制代码

编译打包 1

mvn clean package
复制代码


并上传到服务器,准备运行


运行测试 1

spark-submit --master local[*] --class icu.wzk.JoinDemo spark-wordcount-1.0-SNAPSHOT.jar
复制代码


提交任务并执行,注意数据的路径,查看下图:



运行结果可以查看到,运行了: 2.203100 秒 (取决于你的数据量的多少)



2024-07-19 10:35:08,808 INFO  [main] scheduler.DAGScheduler (Logging.scala:logInfo(54)) - Job 0 finished: count at JoinDemo.scala:32, took 2.203100 s200
复制代码

编写代码 2

接下来,我们对比使用 MapSideJoin 的方式


package icu.wzk
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}
object MapSideJoin {
def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("MapSideJoin") .setMaster("local[*]")
val sc = new SparkContext(conf) sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)
val productRDD: RDD[(String, String)] = sc .textFile("data/test_spark_01.txt") .map { line => val fields = line.split(";") (fields(0), line) }
val productBC = sc.broadcast(productRDD.collectAsMap())
val orderRDD: RDD[(String, String)] = sc .textFile("data/test_spark_02.txt") .map { line => val fields = line.split(";") (fields(2), line) }
val resultRDD = orderRDD .map { case (pid, orderInfo) => val productInfo = productBC.value (pid, (orderInfo, productInfo.getOrElse(pid, null))) } println(resultRDD.count())
sc.stop() }
}
复制代码

编译打包 2

mvn clean package
复制代码


编译后上传到服务器准备执行:


运行测试 2

spark-submit --master local[*] --class icu.wzk.MapSideJoin spark-wordcount-1.0-SNAPSHOT.jar
复制代码


启动我们的程序,并观察结果



我们可以观察到,这次只用了 0.10078 秒就完成了任务:


累加器

基本介绍

累加器的作用:可以实现一个变量在不同的 Executor 端能保持状态的累加。累加器在 Driver 端定义、读取,在 Executor 中完成累加。累加器也是 Lazy 的,需要 Action 触发:Action 触发一次,执行一次;触发多次,执行多次。


Spark 内置了三种类型的累加器,分别是:


  • LongAccumulator 用来累加整数型

  • DoubleAccumulator 用来累加浮点型

  • CollectionAccumulator 用来累加集合元素

运行测试

我们可以在 SparkShell 中进行一些简单的测试,目前我在 h122 节点上,启动 SparkShell


spark-shell --master local[*]
复制代码


启动的主界面如下:



写入如下的内容进行测试:


val data = sc.makeRDD("hadoop spark hive hbase java scala hello world spark scala java hive".split("\\s+"))val acc1 = sc.longAccumulator("totalNum1")val acc2 = sc.doubleAccumulator("totalNum2")val acc3 = sc.collectionAccumulator[String]("allwords")
复制代码


我们进行测试的结果如下图所示:



继续编写一段进行测试:


val rdd = data.map{word => acc1.add(word.length); acc2.add(word.length); acc3.add(word); word}rdd.countrdd.collect
println(acc1.value)println(acc2.value)println(acc3.value)
复制代码


我们进行测试的结果如下:



发布于: 刚刚阅读数: 4
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-91 Spark广播变量:高效共享只读数据的最佳实践 RDD+Scala编程_Java_武子康_InfoQ写作社区