写点什么

GeaFlow 图计算快速上手之 K-hop 算法

  • 2023-08-15
    浙江
  • 本文字数:3233 字

    阅读完需:约 11 分钟

GeaFlow图计算快速上手之K-hop算法

引言

随着年轻人的社交需求不断增长,各种社交软件应运而生,这些社交软件通常都会有好友推荐功能,根据六度分离理论,理想情况下,每个人通过 6 个人就可以跟所有人产生关联,因此 K-hop 算法(K 跳算法)被用于实现好友推荐,现在让我们来尝试使用 GeaFlow 在 5 分钟内实现 K-hop 算法吧!

K-hop(K 跳)算法介绍

K-hop 算法是一种基于图论的算法,用于寻找一个起点通过 K 次以内跳跃能够到达的节点,也就是从起点出发,找出 K 层内与之关联的节点。K-hop 算法广泛应用于好友推荐、影响力预测和关系发现等场景。K-hop 算法本质上是一种广度优先搜索(BFS)算法,通过从起点开始不断向外扩散的方式来计算每一个节点到起点的跳跃数。算法流程如下:


GeaFlow 实现 K-hop 算法

首先需要通过实现 AlgorithmUserFunction 接口来编写 K-hop 算法的 UDGA,K-hop 算法的参考实现如下:


package com.antfin.rayag.myUDF;
import com.antgroup.geaflow.common.type.primitive.IntegerType;import com.antgroup.geaflow.common.type.primitive.StringType;import com.antgroup.geaflow.dsl.common.algo.AlgorithmRuntimeContext;import com.antgroup.geaflow.dsl.common.algo.AlgorithmUserFunction;import com.antgroup.geaflow.dsl.common.data.RowEdge;import com.antgroup.geaflow.dsl.common.data.RowVertex;import com.antgroup.geaflow.dsl.common.data.impl.ObjectRow;import com.antgroup.geaflow.dsl.common.data.impl.types.IntVertex;import com.antgroup.geaflow.dsl.common.function.Description;import com.antgroup.geaflow.dsl.common.types.StructType;import com.antgroup.geaflow.dsl.common.types.TableField;import com.antgroup.geaflow.model.graph.edge.EdgeDirection;
import java.util.ArrayList;import java.util.Iterator;import java.util.List;
@Description(name = "khop", description = "built-in udga for KHop")public class KHop implements AlgorithmUserFunction<Object, Integer> {
private AlgorithmRuntimeContext<Object, Integer> context; private int srcId = 1; private int k = 1;
@Override public void init(AlgorithmRuntimeContext<Object, Integer> context, Object[] parameters) { this.context = context; if (parameters.length > 2) { throw new IllegalArgumentException( "Only support zero or more arguments, false arguments " + "usage: func([alpha, [convergence, [max_iteration]]])"); } if (parameters.length > 0) { srcId = Integer.parseInt(String.valueOf(parameters[0])); } if (parameters.length > 1) { k = Integer.parseInt(String.valueOf(parameters[1])); } }
@Override public void process(RowVertex vertex, Iterator<Integer> messages) { List<RowEdge> outEdges = new ArrayList<>(context.loadEdges(EdgeDirection.OUT)); //第一轮迭代将所有顶点初始化,目标点的K值初始化为0,并向邻点发送消息,其他点的K值初始化为Integer.MAX_VALUE if (context.getCurrentIterationId() == 1L) { if(srcId == (int) vertex.getId()) { sendMessageToNeighbors(outEdges, 1); context.updateVertexValue(ObjectRow.create(0)); context.take(ObjectRow.create(vertex.getId(), 0)); }else{ context.updateVertexValue(ObjectRow.create(Integer.MAX_VALUE)); } } else if (context.getCurrentIterationId() <= k+1) { int currentK = (int) vertex.getValue().getField(0, IntegerType.INSTANCE); //如果当前顶点收到消息,并且K值为Integer.MAX_VALUE(没有被遍历到),则本轮应该修改K值,并向邻边发消息 if(messages.hasNext() && currentK == Integer.MAX_VALUE){ Integer currK = messages.next(); //将当前顶点写出 context.take(ObjectRow.create(vertex.getId(), currK)); //更新当前顶点的K值 context.updateVertexValue(ObjectRow.create(currK)); //向邻点发消息 sendMessageToNeighbors(outEdges, currK+1); } } }
//设置输出类型 @Override public StructType getOutputType() { return new StructType( new TableField("id", IntegerType.INSTANCE, false), new TableField("k", IntegerType.INSTANCE, false) ); }
private void sendMessageToNeighbors(List<RowEdge> outEdges, Integer message) { for (RowEdge rowEdge : outEdges) { context.sendMessage(rowEdge.getTargetId(), message); } }}
复制代码

Geaflow 运行 K-hop 算法实战

将 KHop 类打包成 UDGA

新建一个 maven 工程,将 KHop 类放/src/main/java 目录下,pom 文件中需要添加如下依赖:


<dependency>            <groupId>com.antgroup.tugraph</groupId>            <artifactId>geaflow-dsl-common</artifactId>            <version>0.1</version></dependency>
复制代码


参考https://github.com/TuGraph-family/tugraph-analytics/blob/master/docs/docs-cn/application-development/dsl/overview.md

将 UDGA 上传至 geaflow-console 平台

注册 khop 函数,并在 DSL 中使用

set geaflow.dsl.window.size = -1;set geaflow.dsl.ignore.exception = true;
CREATE GRAPH IF NOT EXISTS g ( Vertex v ( vid int ID, vvalue int ), Edge e ( srcId int SOURCE ID, targetId int DESTINATION ID )) WITH ( storeType='rocksdb', shardCount = 1);
CREATE TABLE IF NOT EXISTS v_source ( v_id int, v_value int) WITH ( type='file', //vertex文件中保存了点的信息,文件放在与KHop类目录下的resources目录下,此处可以换成其他数据源 geaflow.dsl.file.path = 'resource:///input/vertex');

CREATE TABLE IF NOT EXISTS e_source ( src_id int, dst_id int) WITH ( type='file', //edge文件中保存了边的信息,文件放在与KHop类目录下的resources目录下,此处可以换成其他数据源 geaflow.dsl.file.path = 'resource:///input/edge');
//定义结果表CREATE TABLE IF NOT EXISTS tbl_result ( v_id int, k_value int) WITH ( type='file', geaflow.dsl.file.path = '/tmp/result');
USE GRAPH g;
INSERT INTO g.v(vid, vvalue)SELECTv_id, v_valueFROM v_source;
INSERT INTO g.e(srcId, targetId)SELECT src_id, dst_idFROM e_source;
//注册khop函数CREATE Function khop AS 'com.antfin.rayag.myUDF.KHop';
INSERT INTO tbl_result(v_id, k_value)//调用khop函数,并返回结果CALL khop(1,2) YIELD (vid, kValue)RETURN vid, kValue;
复制代码

运行结果

输入数据如下

//vertex文件内容:1,12,13,14,15,16,1
//edge文件内容:1,31,51,62,33,44,14,65,45,6
复制代码

在 container 的/tmp/result 文件中可以得到结果如下

1,03,15,16,14,2
复制代码


至此,我们就成功使用 Geaflow 实现并运行了 K-hop 算法了!是不是超简单!快来试一试吧!


GeaFlow(品牌名 TuGraph-Analytics) 已正式开源,欢迎大家关注!!!欢迎给我们 Star 哦! GitHub👉 https://github.com/TuGraph-family/tugraph-analytics更多精彩内容,关注我们的博客 https://geaflow.github.io/

用户头像

欢迎访问:geaflow.github.io 2023-07-05 加入

GeaFlow(品牌名TuGraph-Analytics) 是一个分布式流图计算引擎 欢迎给我们 Star 哦! GitHub👉github.com/TuGraph-family/tugraph-analytics 更多精彩内容,关注我们的博客geaflow.github.io

评论

发布
暂无评论
GeaFlow图计算快速上手之K-hop算法_大数据_TuGraph-Analytics(GeaFlow)_InfoQ写作社区