写点什么

GeaFlow 图计算快速上手之 PageRank 算法

作者:GeaFlow
  • 2023-07-07
    浙江
  • 本文字数:3329 字

    阅读完需:约 11 分钟

GeaFlow图计算快速上手之PageRank算法

GeaFlow(品牌名 TuGraph-Analytics) 已正式开源,欢迎大家关注!!! 欢迎给我们 Star 哦! GitHub👉https://github.com/TuGraph-family/tugraph-analytics

更多精彩内容,关注我们的博客 https://geaflow.github.io/


GeaFlow 介绍

GeaFlow(品牌名 TuGraph-Analytics)是蚂蚁集团开源的分布式实时图计算引擎,目前广泛应用于金融风控、社交网络、知识图谱以及数据应用等场景。GeaFlow 的核心能力是流式图计算,流式图计算相比离线图计算提供了一种高时效性低延迟的图计算模式,更多详细内容参考 GeaFlow GitHub 介绍(https://github.com/TuGraph-family/tugraph-analytics).


GeaFlow 整体架构如下所示:



  • GeaFlow DSL GeaFlow 对用户提供图表融合分析语言,采用 SQL + ISO/GQL 方式.用户可以通过类似 SQL 编程的方式编写实时图计算任务.

  • GraphView API GeaFlow 以 GraphView 为核心定义的一套图计算的编程接口,包含图构建、图计算以及 Stream API 接口.

  • GeaFlow Runtime GeaFlow 运行时,包含 GeaFlow 图表算子、task 调度、failover 以及 shuffle 等核心功能.

  • GeaFlow State GeaFlow 的图状态存储,用于存储图的点边数据.同时流式计算的状态如聚合状态也存放在 State 中.

  • K8S Deployment GeaFlow 支持 K8S 的方式进行部署运行.

  • GeaFlow Console GeaFlow 的管控平台,包含作业管理、元数据管理等功能.

PageRank 算法介绍

PageRank 是图计算领域一个应用广泛的算法,由 Google 公司创始人之一拉里·佩奇与谢尔盖·布林在 1998 年发明,主要用于网页的排序。该算法基于网页之间相互引用的关系,将网页评分的思想引入到搜索引擎中,用于计算网页的重要度和排名。


PageRank 算法的核心思想是:一个网页的重要度是由其他网页对它的引用数量和质量决定的。如果一个网页被其他网页引用得多,那么它的重要度就越高。同时,如果一个网页的引用来源也很重要,那么它对被引用网页的贡献也会更大。


实现 PageRank 算法的具体步骤包括:首先构建网页之间的链接关系图,然后对图进行迭代计算,直到收敛为止。在每一次迭代中,每个网页的得分都会被重新计算,并更新到下一次迭代中。最后,按照网页得分的大小对搜索结果进行排序,输出排名前几位的网页。如下有 4 个页面,A, B, C, D:



以 A 点为例,其每一轮的 PageRank 值计算方法如下:


PR(A) = d * (PR(D)/ 2 + PR(B)/1 + PR(C)/2) + (1- d)


每一个点的 PageRank 值等于其入点的 PageRank 值除以入点出边数的加权和, 其中 d 为 0~1 之间的修正系数.


PageRank 算法在搜索引擎中广泛应用,成为搜索引擎排名的重要算法之一。除此之外,PageRank 算法的思想也在社交网络、推荐系统等领域得到了应用。

TuGraph-Analytics 实现 PageRank

接口与实现

TuGraph-Analytics 支持在图查询里调用图算法,语法形式如下:


INSERT INTO tbl_resultCALL page_rank() YIELD (vid, prValue)RETURN vid, prValue;
复制代码


我们通过 CALL 语句调用具体的算法,通过 YIELD 定义算法的返回字段,比如 page_rank 算法返回点 id 和 page rank 值两个字段,则可以通过 YIELD(vid, prValue)来表示。


DSL 里面实现一个图算法需要实现 AlgorithmUserFunction 接口,其定义如下:


/** * Interface for the User Defined Graph Algorithm. * @param <K> The id type for vertex. * @param <M> The message type for message send between vertices. */public interface AlgorithmUserFunction<K, M> extends Serializable {
/** * Init method for the function * @param context The runtime context. * @param params The parameters for the function. */ void init(AlgorithmRuntimeContext<K, M> context, Object[] params);
/** * Processing method for each vertex and the messages it received. */ void process(RowVertex vertex, Iterator<M> messages);
/** * Returns the output type for the function. */ StructType getOutputType();}
复制代码


  • init


算法的初始化接口,主要完成算法的一些初始化操作. PageRank 的 init 方法实现如下:


    @Override    public void init(AlgorithmRuntimeContext context, Object[] parameters) {        this.context = context;        if (parameters.length > 3) {            throw new IllegalArgumentException(                "Only support zero or more arguments, false arguments "                    + "usage: func([alpha, [convergence, [max_iteration]]])");        }        // 修正系数,即前面介绍的参数d.        if (parameters.length > 0) {            alpha = Double.parseDouble(String.valueOf(parameters[0]));        }        // PR值更新阀值,当点的pr差值小于该值时,不再更新pr值.        if (parameters.length > 1) {            convergence = Double.parseDouble(String.valueOf(parameters[1]));        }        // 迭代次数        if (parameters.length > 2) {            iteration = Integer.parseInt(String.valueOf(parameters[2]));        }    }
复制代码


  • process


算法的主要处理逻辑,入参为当前 Active 点和要处理的消息,PageRank 主要实现如下:


    @Override    public void process(RowVertex vertex, Iterator messages) {       ....            if (context.getCurrentIterationId() == 1L) {            // 首轮迭代设置pr初始值,并发送给出边,同时更新当前点的pr值。            double initValue = 1.0;            sendMessageToNeighbors(outEdges, initValue / outEdges.size());            ...            context.updateVertexValue(ObjectRow.create(initValue));        } else if (context.getCurrentIterationId() < iteration) {            double sum = 0.0;            while (messages.hasNext()) {                double input = (double) messages.next();                input = input > 0 ? input : 0.0;                sum += input;            }            // 计算当前迭代的pr值            double pr = (1 - alpha) + (sum * alpha);            double currentPr = (double) vertex.getValue().getField(0, DoubleType.INSTANCE);            if (Math.abs(currentPr - pr) > convergence) {                // pr值发送给出边目标点.                sendMessageToNeighbors(outEdges, pr / outEdges.size());            }             context.updateVertexValue(ObjectRow.create(pr));        } else { // 到达最大迭代次数,结束本轮迭代,take最终计算结果.            double currentPr = (double) vertex.getValue().getField(0, DoubleType.INSTANCE);            context.take(ObjectRow.create(vertex.getId(), currentPr));            return;        }    }
复制代码


  • getOutputType


定义算法返回类型,PageRank 实现如下:


 @Override    public StructType getOutputType() {        return new StructType(            // id            new TableField("id", LongType.INSTANCE, false),            // pr值            new TableField("pr", DoubleType.INSTANCE, false)        );    }
复制代码

算法注册

算法实现通过注解来定义算法名称,如下所示:


@Description(name = "page_rank", description = "built-in udga for PageRank")public class PageRank implements AlgorithmUserFunction {}
复制代码


算法和 UDF 一样,需要注册或者创建后才能使用. DSL 内置算法或者 UDF 在 BuildInSqlFunctionTable 中进行注册.对于非内置算法,可以通过 create function 语句来创建.


Create funciton page_rank as 'com.antgroup.geaflow.dsl.udf.graph.PageRank';
复制代码

总结

本文主要介绍实时图计算引擎 GeaFlow 的基本架构,然后介绍了图算法 PageRank 的基本原理以及在 GeaFlow 中的实现细节和使用方式.


GeaFlow(品牌名 TuGraph-Analytics) 已正式开源,欢迎大家关注!!!


欢迎给我们 Star 哦!


Welcome to give us a Star!


GitHub👉https://github.com/TuGraph-family/tugraph-analytics


更多精彩内容,关注我们的博客 https://geaflow.github.io/


用户头像

GeaFlow

关注

欢迎访问:geaflow.github.io 2023-07-05 加入

GeaFlow(品牌名TuGraph-Analytics) 是一个分布式流图计算引擎 欢迎给我们 Star 哦! GitHub👉github.com/TuGraph-family/tugraph-analytics 更多精彩内容,关注我们的博客geaflow.github.io

评论

发布
暂无评论
GeaFlow图计算快速上手之PageRank算法_PageRank_GeaFlow_InfoQ写作社区