写点什么

Spark 中的累加器和广播变量

发布于: 2021 年 04 月 09 日
Spark中的累加器和广播变量

在默认情况下,当 Spark 在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。


为了满足这种需求,Spark 提供了两种类型的变量:

1.累加器 accumulators:累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)

2.广播变量 broadcast variables:广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。

 

1. 累加器

1.1. 不使用累加器



1.2. 使用累加器

通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果。

val xx: Accumulator[Int] = sc.accumulator(0)


代码如下:

package cn.itcast.core
import org.apache.spark.rdd.RDDimport org.apache.spark.{Accumulator, SparkConf, SparkContext}
object AccumulatorTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN")
//使用scala集合完成累加 var counter1: Int = 0; var data = Seq(1,2,3) data.foreach(x => counter1 += x ) println(counter1)//6
println("+++++++++++++++++++++++++")
//使用RDD进行累加 var counter2: Int = 0; val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3] dataRDD.foreach(x => counter2 += x) println(counter2)//0 //注意:上面的RDD操作运行结果是0 //因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量 //而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2 //最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系
//那这个问题得解决啊!不能因为使用了Spark连累加都做不了了啊! //如果解决?---使用累加器 val counter3: Accumulator[Int] = sc.accumulator(0) dataRDD.foreach(x => counter3 += x) println(counter3)//6 }}

复制代码

2. 广播变量

2.1. 不使用广播变量



2.2. 使用广播变量


代码如下:

import org.apache.spark.broadcast.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}
object BroadcastVariablesTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN")
//不使用广播变量 val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape"))) val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap //scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana) val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3)) //根据水果编号取水果名称 val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x)) fruitNames.foreach(println) //注意:以上代码看似一点问题没有,但是考虑到数据量如果较大,且Task数较多, //那么会导致,被各个Task共用到的fruitMap会被多次传输 //应该要减少fruitMap的传输,一台机器上一个,被该台机器中的Task共用即可 //如何做到?---使用广播变量//注意:广播变量的值不能被修改,如需修改可以将数据存到外部数据源,如MySQL、Redis println("=====================") val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap) val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x)) fruitNames2.foreach(println)
}}
复制代码


发布于: 2021 年 04 月 09 日阅读数: 51
用户头像

专注于大数据技术研究 2020.11.10 加入

运营公众号:五分钟学大数据。大数据领域原创技术号,深入大数据技术

评论

发布
暂无评论
Spark中的累加器和广播变量