写点什么

Flink 的广播变量

发布于: 2021 年 05 月 20 日
Flink的广播变量

Flink 支持广播变量,就是将数据广播到具体的 taskmanager 上,数据存储在内存中,这样可以减缓大量的 shuffle 操作;比如在数据 join 阶段,不可避免的就是大量的 shuffle 操作,我们可以把其中一个 dataSet 广播出去,一直加载到 taskManager 的内存中,可以直接在内存中拿数据,避免了大量的 shuffle,导致集群性能下降;广播变量创建后,它可以运行在集群中的任何 function 上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。一句话解释,可以理解为是一个公共的共享变量,我们可以把一个 dataset 数据集广播出去,然后不同的 task 在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用 broadcast,则在每个节点中的每个 task 中都需要拷贝一份 dataset 数据集,比较浪费内存(也就是一个节点中可能会存在多份 dataset 数据)。注意:因为广播变量是要把 dataset 广播到内存中,所以广播的数据量不能太大,否则会出现 OOM 这样的问题 Broadcast:Broadcast 是通过 withBroadcastSet(dataset,string)来注册的 Access:通过 getRuntimeContext().getBroadcastVariable(String)访问广播变量


可以理解广播就是一个公共的共享变量


将一个数据集广播后,不同的 Task 都可以在节点上获取到每个节点 只存一份如果不使用广播,每一个 Task 都会拷贝一份数据集,造成内存资源浪费


示例从内存中拿到 data2 的广播数据,再与 data1 数据根据第二列元素组合成(Int, Long, String, String)样例数据:val data1 = new mutable.MutableList[(Int, Long, String)]data1 .+=((1, 1L, "xiaoming"))data1 .+=((2, 2L, "xiaoli"))data1 .+=((3, 2L, "xiaoqiang"))val ds1 = env.fromCollection(data1)


val data2 = new mutable.MutableList[(Int, Long, Int, String, Long)]data2 .+=((1, 1L, 0, "Hallo", 1L))data2 .+=((2, 2L, 1, "Hallo Welt", 2L))
复制代码


data2 .+=((2, 3L, 2, "Hallo Welt wie", 1L))val ds2 = env.fromCollection(data2)


步骤


  1. 获取 ExecutionEnvironment 运行环境

  2. 加载创建数据源

  3. 数据转换(使用广播)

  4. 打印测试


参考代码

object BrodCast {  def main(args: Array[String]): Unit = {    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment    //TODO data1  join  data2的数据,使用广播变量完成    val data1 = new mutable.MutableList[(Int, Long, String)]    data1 .+=((1, 1L, "Hi"))    data1 .+=((2, 2L, "Hello"))    data1 .+=((3, 2L, "Hello world"))    val ds1 = env.fromCollection(data1)    val data2 = new mutable.MutableList[(Int, Long, Int, String, Long)]    data2 .+=((1, 1L, 0, "Hallo", 1L))    data2 .+=((2, 2L, 1, "Hallo Welt", 2L))    data2 .+=((3, 3L, 2, "Hallo Welt wie", 1L))    val ds2 = env.fromCollection(data2 )    //todo 使用内部类RichMapFunction,提供open和map,可以完成join的操作    val result = ds1.map(new RichMapFunction[(Int, Long, String), ArrayBuffer[(Int, Long, String, String)]] {      var brodCast: mutable.Buffer[(Int, Long, Int, String, Long)] = null
override def open(parameters: Configuration): Unit = { import scala.collection.JavaConverters._ //asScala需要使用隐式转换 brodCast = this.getRuntimeContext.getBroadcastVariable[(Int, Long, Int, String, Long)]("ds2").asScala }
override def map(value: (Int, Long, String)): ArrayBuffer[(Int, Long, String, String)] = { val toArray: Array[(Int, Long, Int, String, Long)] = brodCast.toArray val array = new mutable.ArrayBuffer[(Int, Long, String, String)] var index = 0
var a: (Int, Long, String, String) = null while (index < toArray.size) { if (value._2 == toArray(index)._5) { a = (value._1, value._2, value._3, toArray(index)._4) array += a } index = index + 1 } array } }).withBroadcastSet(ds2, "ds2") println(result.collect()) }}
复制代码


发布于: 2021 年 05 月 20 日阅读数: 9
用户头像

还未添加个人签名 2021.03.07 加入

还未添加个人简介

评论

发布
暂无评论
Flink的广播变量