写点什么

Flink 的分布式缓存

发布于: 2021 年 05 月 21 日
Flink的分布式缓存

Flink 提供了一个类似于 Hadoop 的分布式缓存,让并行运行实例的函数可以在本地访问。这个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等!


缓存的使用流程:


 使用 ExecutionEnvironment 实例对本地的或者远程的文件(例如:HDFS 上的文件),为缓存文件指定一个名字注册该缓存文件!当程序执行时候,Flink 会自动复制文件或者目录到所有 worker 节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检索该文件!


【注意】广播是将变量分发到各个 worker 节点的内存上,分布式缓存是将文件缓存到各个 worker 节点上;


示例 


从 hdfs 上拿到 subject.txt 数据,再与 Clazz 数据组合成新的数据集,组成:(学号,班级,学科,分数)


样例数据:


subject.txt : (学号,学科,分数)

1,wuli,76

2,yuwen,80

3,yingwu,75

4,shuju,65

5,huaxue,87

6,shuju,95.5

7,yuwen,96.5

8,huaxue,95.5

Clazz : (学号, 班级 )


valclazz:DataSet[Clazz] = env.fromElements(        

Clazz(1,"class_1"),       

Clazz(2,"class_1"),   

Clazz(3,"class_2"), 

Clazz(4,"class_2"), 

Clazz(5,"class_3"), 

Clazz(6,"class_3"),

Clazz(7,"class_4"),

Clazz(8,"class_1")   

 )


步骤 

1. 获取 ExecutionEnvironment 运行环境

2. 开启分布式缓存

2.加载本地数据 

3.数据转换(使用分布式缓存) 

4. 打印测试

 

代码

import org.apache.flink.api.common.functions.RichMapFunctionimport org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}import org.apache.flink.configuration.Configurationimport scala.collection.mutable.{ArrayBuffer, ListBuffer}import scala.io.Sourceimport org.apache.flink.streaming.api.scala._
object Distribute_cache { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment //1"开启分布式缓存 val path = "hdfs://node01:9000/score" env.registerCachedFile(path , "Distribute_cache")
//2:加载本地数据 val clazz:DataSet[Clazz] = env.fromElements( Clazz(1,"class_1"), Clazz(2,"class_1"), Clazz(3,"class_2"), Clazz(4,"class_2"), Clazz(5,"class_3"), Clazz(6,"class_3"), Clazz(7,"class_4"), Clazz(8,"class_1") )
//3:开始进行关联操作 clazz.map(new MyJoinmap()).print() }}class MyJoinmap() extends RichMapFunction[Clazz , ArrayBuffer[INFO]]{ private var myLine = new ListBuffer[String] override def open(parameters: Configuration): Unit = { val file = getRuntimeContext.getDistributedCache.getFile("Distribute_cache") val lines: Iterator[String] = Source.fromFile(file.getAbsoluteFile).getLines() lines.foreach( line =>{ myLine.append(line) }) }
//在map函数下进行关联操作 override def map(value: Clazz): ArrayBuffer[INFO] = { var stoNO = 0 var subject = "" var score = 0.0 var array = new collection.mutable.ArrayBuffer[INFO]() //(学生学号---学科---分数) for(str <- myLine){ val tokens = str.split(",") stoNO = tokens(0).toInt subject = tokens(1) score = tokens(2).toDouble if(tokens.length == 3){ if(stoNO == value.stu_no){ array += INFO(value.stu_no , value.clazz_no , subject , score) } } } array }}//(学号 , 班级) join (学生学号---学科---分数) ==(学号 , 班级 , 学科 , 分数)case class INFO(stu_no:Int , clazz_no:String , subject:String , score:Double)case class Clazz(stu_no:Int , clazz_no:String)
复制代码


发布于: 2021 年 05 月 21 日阅读数: 14
用户头像

还未添加个人签名 2021.03.07 加入

还未添加个人简介

评论

发布
暂无评论
Flink的分布式缓存