作者:张奇
TuGraph Analytics 简介
TuGraph Analytics 是蚂蚁集团近期开源的分布式流式图计算,目前广泛应用在蚂蚁集团的金融、社交、风控等诸多领域。更多详细内容可参考 TuGraph Analytics 的 github 首页(https://github.com/TuGraph-family/tugraph-analytics),欢迎国内外开发者们与我们共建 TuGraph Analytics 社区,壮大流图产业生态。
弱联通分量算法介绍
弱联通分量图算法(Weakly Connected Components Algorithm)是一种用于找到图中所有弱联通分量的算法。弱联通分量是指在有向图中,如果忽略所有边的方向,相互之间是连通的节点集合。
算法的基本思想是通过深度优先搜索(DFS)或广度优先搜索(BFS)遍历图的所有节点,对于每个未访问过的节点,都会生成一个新的联通分量。在遍历过程中,如果当前节点的邻居节点已经被访问过,那么将其加入当前联通分量中,并继续遍历邻居节点。
通过这种方式,算法能够找到图中所有弱联通分量,并将每个分量的节点集合进行标记或存储起来。最终,算法返回所有弱联通分量的集合。
弱联通分量图算法可以应用于许多实际问题,例如社交网络分析中的用户群体划分、网页链接分析中的网页群组划分等。它能够帮助我们理解图中不同分量之间的关系,从而更好地分析图的结构和特性。
在 TuGraph Analytics 上实现弱联通分量算法
使用方式
用户可以在 GQL 图查询语句中嵌入图算法,如下所示:
INSERT INTO tbl_result
CALL wcc() YIELD (vid, component)
RETURN vid, component
;
复制代码
通过 CALL 语句调用具体的算法,通过 YIELD 定义算法的返回字段。需要注意的是,这么做的前提是算法 udf 需要注册或者创建后才能使用。DSL 内置算法或者 UDF 在 BuildInSqlFunctionTable 中进行注册。对于非内置算法,可以通过 create function 语句来创建。
Create funciton wcc as 'com.antgroup.geaflow.dsl.udf.graph.WeakConnectedComponents';
复制代码
算法实现
TuGraph Analytics 上实现图算法需要实现 AlgorithmUserFunction 接口,该接口定义如下:
public interface AlgorithmUserFunction<K, M> extends Serializable {
/**
* Init method for the function.
* @param context The runtime context.
* @param params The parameters for the function.
*/
void init(AlgorithmRuntimeContext<K, M> context, Object[] params);
/**
* Processing method for each vertex and the messages it received.
*/
void process(RowVertex vertex, Iterator<M> messages);
/**
* Finish method called by each vertex upon algorithm convergence.
*/
void finish(RowVertex vertex);
/**
* Returns the output type for the function.
*/
StructType getOutputType();
}
复制代码
init
首先,init 方法在 worker 初始化时调用,用户往算法 udf 中传入的参数,会放在 params 数组变量里。比如 wcc(10),这里的 params[0] = 10。
public void init(AlgorithmRuntimeContext<Object, String> context, Object[] parameters) {
this.context = context;
if (parameters.length > 2) {
throw new IllegalArgumentException(
Only support zero or more arguments, false arguments "
+ "usage: func([alpha, [convergence, [max_iteration]]])");
}
// 设置最大迭代次数,如果没有设置该参数,最大迭代次数默认为20
if (parameters.length > 0) {
iteration = Integer.parseInt(String.valueOf(parameters[0]));
}
// 设置输出结果联通分量的key,默认为"component"
if (parameters.length > 1) {
keyFieldName =String.valueOf(parameters[1]);
}
}
复制代码
process
process 方法是每轮迭代执行的核心方法,弱联通分量算法的核心逻辑就实现在该方法里。在这里,第一轮迭代时我们设置每个点的 value 初始值为该点的 id,然后将该 id 通过出边和入边向其邻居节点传递出去。在此后的每轮迭代里,每个收到邻居节点消息的节点会取出消息里的最小值,作为该节点的新值,然后再将该最小值传递给其他邻居节点。到最后,所有联通分量的节点的值都会被染色成这个联通网络里的节点最小值。
public void process(RowVertex vertex, Iterator<String> messages) {
List<RowEdge> edges = new ArrayList<>(context.loadEdges(EdgeDirection.BOTH));
if (context.getCurrentIterationId() == 1L) {
String initValue = String.valueOf(vertex.getId());
sendMessageToNeighbors(edges, initValue);
context.sendMessage(vertex.getId(), String.valueOf(vertex.getId()));
context.updateVertexValue(ObjectRow.create(initValue));
} else if (context.getCurrentIterationId() < iteration) {
String minComponent = messages.next();
while (messages.hasNext()) {
String next = messages.next();
if (next.compareTo(minComponent) < 0) {
minComponent = next;
}
}
sendMessageToNeighbors(edges, minComponent);
context.sendMessage(vertex.getId(), minComponent);
context.updateVertexValue(ObjectRow.create(minComponent));
}
}
private void sendMessageToNeighbors(List<RowEdge> edges, String message) {
for (RowEdge rowEdge : edges) {
context.sendMessage(rowEdge.getTargetId(), message);
}
}
复制代码
finish
finish 方法在迭代最终收敛时会被调用,此时每个节点都会被染色成了它所在联通网络里的节点最小值,我们可以将结果输出。
public void finish(RowVertex vertex) {
String component = (String) vertex.getValue().getField(0, StringType.INSTANCE);
context.take(ObjectRow.create(vertex.getId(), component));
}
复制代码
getOutputType
getOutputType 方法中返回 udf 输出结果的 schema,在这里我们的输出结果是点,点有 meta 字段 id 和属性字段 component。
public StructType getOutputType() {
return new StructType(
new TableField("id", LongType.INSTANCE, false),
new TableField(keyFieldName, StringType.INSTANCE, false)
);
}
复制代码
总结
在本篇文章中我们介绍了如何在 TuGraph Analytics 上实现弱联通分量算法,如果你觉得比较有趣,欢迎关注我们的社区(https://github.com/TuGraph-family/tugraph-analytics)。开源不易,如果你觉得还不错,可以给我们 star 支持一下~
评论