写点什么

GraphX 图计算组件最短路算法实战

用户头像
小舰
关注
发布于: 2021 年 04 月 16 日

前言:

Spark 除了批处理和流处理,还提供了 GraphX 组件提供图计算。近些年,图计算越来越受到数据分析人员的青睐。图计算目前广泛应用于公安系统和银行金融领域。通过社交网络分析,可以打击犯罪团伙,金融欺诈、信用卡盗刷等。通过人与人之间的关联关系推断,还可以用于理财产品推荐等场景。

图算法

常见的图算法大致可以分为路径搜索算法(例如 DFS & BFS、最短路径、 最小生成树、随机游走等)、中心性算法(例如 DegreeCentrality、 ClosenessCentrality、BetweennessCentrality、PageRank) 以及社群发现算法(例如 MeasuringAlgorithm、ComponentsAlgorithm、LabelPropagation Algorithm、LouvainModularity Algorithm)。

路径搜索算法建立在图搜索算法的基础上,用来探索节点之间的路径。这些路径从一个节点开始,遍历关系,直到到达目的地。路径搜索算法可以用来进行物流规划,最低成本呼叫或者叫 IP 路由问题等。

中心性算法用于识别图中特定节点的角色及其对网络的影响。中心性算法能够帮助我们识别最重要的节点,帮助我们了解组动态,例如可信度、可访问性、事物传播的速度以及组与组之间的连接。

社群的形成在各种类型的网络中都很常见。识别社群对于评估群体行为或突发事件至关重要。对于一个社群来说,内部节点与内部节点的关系(边)比社群外部节点的关系更多。识别这些社群可以揭示节点的分群,找到孤立的社群,发现整体网络结构关系。社群发现算法有助于发现社群中群体行为或者偏好,寻找嵌套关系,或者成为其他分析的前序步骤。社群发现算法也常用于网络可视化。

Graph X 实现


对于上图,我们要找出 5 号节点与各个节点的最短路,可以在 Spark 的 GraphX 帮助下利用最短路算法来实现。

import org.apache.log4j.{Level, Logger}import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.graphx._import org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSession


object GraphXTest {
def main(args: Array[String]) {
//屏蔽日志 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val spark = SparkSession.builder() .appName("WordCount") .master("local") .getOrCreate()
val sc = spark.sparkContext
//设置顶点和边,注意顶点和边都是用元组定义的Array
//顶点的数据类型是VD:(String,Int) val vertexArray = Array( (1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (6L, ("Fran", 50)) )
//边的数据类型ED:Int val edgeArray = Array( Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3) )
//构造vertexRDD和edgeRDD val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray) val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
//构造图Graph[VD,ED] val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
println("找出5到各顶点的最短路:")
val sourceId: VertexId = 5L // 定义源点 val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
//最短路算法实现 val sssp = initialGraph.pregel(Double.PositiveInfinity)( (id, dist, newDist) => math.min(dist, newDist), triplet => { // 计算权重 if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, (a,b) => math.min(a,b) // 最短距离 )
println(sssp.vertices.collect.mkString("\n"))
sc.stop()

}
}
复制代码


总结

本案例只是对 GraphX 的基本图算法实现进行了演示,更多的图算法实现都可以参照这个流程来实现,用你的智慧去尽情地发掘图网络中的价值吧~

发布于: 2021 年 04 月 16 日阅读数: 19
用户头像

小舰

关注

公众号:DLab数据实验室 2020.11.12 加入

中国人民大学硕士

评论

发布
暂无评论
GraphX图计算组件最短路算法实战