写点什么

GeaFlow 任务能力增强:通过 API 定制流图计算逻辑

作者:GeaFlow
  • 2023-08-15
    浙江
  • 本文字数:4919 字

    阅读完需:约 16 分钟

GeaFlow任务能力增强:通过API定制流图计算逻辑

GeaFlow(品牌名 TuGraph-Analytics) 已正式开源,欢迎大家关注!!! 欢迎给我们 Star 哦! GitHub👉https://github.com/TuGraph-family/tugraph-analytics更多精彩内容,关注我们的博客 https://geaflow.github.io/


GeaFlow API 介绍

GeaFlow API 是对高阶用户提供的开发接口,用户可以直接通过编写 java 代码来编写计算作业,相比于 DSL,API 的方式开发更加灵活,也能实现更丰富的功能和更复杂的计算逻辑。在 GeaFlow 中,API 支持 Graph API 和 Stream API 两种类型:


  • Graph API:Graph 是 GeaFlow 框架的一等公民,当前 GeaFlow 框架提供了一套基于 GraphView 的图计算编程接口,包含图构建、图计算及遍历。在 GeaFlow 中支持 Static Graph 和 Dynamic Graph 两种类型。

  • Static Graph API:静态图计算 API,基于该类 API 可以进行全量的图计算或图遍历。

  • Dynamic Graph API:动态图计算 API,GeaFlow 中 GraphView 是动态图的数据抽象,基于 GraphView 之上,可以进行动态图计算或图遍历。同时支持对 Graphview 生成 Snapshot 快照,基于 Snapshot 可以提供和 Static Graph API 一样的接口能力。



  • Stream API:GeaFlow 提供了一套通用计算的编程接口,包括 source 构建、流批计算及 sink 输出。在 GeaFlow 中支持 Batch 和 Stream 两种类型。

  • Batch API:批计算 API,基于该类 API 可以进行批量计算。

  • Stream API:流计算 API,GeaFlow 中 StreamView 是动态流的数据抽象,基于 StreamView 之上,可以进行流计算。


更多 API 的介绍可参考 https://github.com/TuGraph-family/tugraph-analytics/blob/master/docs/docs-cn/application-development/api/overview.md

PageRank 算法示例

本例子是从文件中读取点边进行构图,执行 pageRank 算法后,将每个点的 pageRank 值进行打印。其中,用户需要实现 AbstractVcFunc,在 compute 方法中进行每一轮迭代的计算逻辑。在本例子中,只计算了两轮迭代的结果。在第一轮中,每个点都会向邻居点发送当前点的 value 值,而在第二轮中,每个点收到邻居点发送的消息,将其 value 值进行累加,并更新为自己的 value 值,即为最后的 PageRank 值。


public class PageRank {
private static final Logger LOGGER = LoggerFactory.getLogger(PageRank.class);
public static final String RESULT_FILE_PATH = "./target/tmp/data/result/pagerank";
private static final double alpha = 0.85;
public static void main(String[] args) { Environment environment = EnvironmentUtil.loadEnvironment(args); IPipelineResult result = PageRank.submit(environment); PipelineResultCollect.get(result); environment.shutdown(); }
public static IPipelineResult submit(Environment environment) { Pipeline pipeline = PipelineFactory.buildPipeline(environment); Configuration envConfig = environment.getEnvironmentContext().getConfig(); envConfig.put(FileSink.OUTPUT_DIR, RESULT_FILE_PATH); ResultValidator.cleanResult(RESULT_FILE_PATH);
pipeline.submit((PipelineTask) pipelineTaskCxt -> { Configuration conf = pipelineTaskCxt.getConfig(); PWindowSource<IVertex<Integer, Double>> prVertices = pipelineTaskCxt.buildSource(new FileSource<>("email_vertex", line -> { String[] fields = line.split(","); IVertex<Integer, Double> vertex = new ValueVertex<>( Integer.valueOf(fields[0]), Double.valueOf(fields[1])); return Collections.singletonList(vertex); }), AllWindow.getInstance()) .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));
PWindowSource<IEdge<Integer, Integer>> prEdges = pipelineTaskCxt.buildSource(new FileSource<>("email_edge", line -> { String[] fields = line.split(","); IEdge<Integer, Integer> edge = new ValueEdge<>(Integer.valueOf(fields[0]), Integer.valueOf(fields[1]), 1); return Collections.singletonList(edge); }), AllWindow.getInstance()) .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));
int iterationParallelism = conf.getInteger(ExampleConfigKeys.ITERATOR_PARALLELISM); GraphViewDesc graphViewDesc = GraphViewBuilder .createGraphView(GraphViewBuilder.DEFAULT_GRAPH) .withShardNum(2) .withBackend(BackendType.Memory) .build(); PGraphWindow<Integer, Double, Integer> graphWindow = pipelineTaskCxt.buildWindowStreamGraph(prVertices, prEdges, graphViewDesc);
SinkFunction<IVertex<Integer, Double>> sink = ExampleSinkFunctionFactory.getSinkFunction(conf); graphWindow.compute(new PRAlgorithms(10)) .compute(iterationParallelism) .getVertices() .sink(v -> { LOGGER.info("result {}", v); }) .withParallelism(conf.getInteger(ExampleConfigKeys.SINK_PARALLELISM)); });
return pipeline.execute(); }
public static class PRAlgorithms extends VertexCentricCompute<Integer, Double, Integer, Double> {
public PRAlgorithms(long iterations) { super(iterations); }
@Override public VertexCentricComputeFunction<Integer, Double, Integer, Double> getComputeFunction() { return new PRVertexCentricComputeFunction(); }
@Override public VertexCentricCombineFunction<Double> getCombineFunction() { return null; }
}
public static class PRVertexCentricComputeFunction extends AbstractVcFunc<Integer, Double, Integer, Double> {
@Override public void compute(Integer vertexId, Iterator<Double> messageIterator) { IVertex<Integer, Double> vertex = this.context.vertex().get(); List<IEdge<Integer, Integer>> outEdges = context.edges().getOutEdges(); if (this.context.getIterationId() == 1) { if (!outEdges.isEmpty()) { this.context.sendMessageToNeighbors(vertex.getValue() / outEdges.size()); }
} else { double sum = 0; while (messageIterator.hasNext()) { double value = messageIterator.next(); sum += value; } double pr = sum * alpha + (1 - alpha); this.context.setNewVertexValue(pr);
if (!outEdges.isEmpty()) { this.context.sendMessageToNeighbors(pr / outEdges.size()); } } }
}}
复制代码

提交 API 作业

(以容器模式,PageRank 算法示例)

算法打包

在新的项目中新建一个 PageRank 的 demo,pom 中引入 geaflow 依赖


<dependency>    <groupId>com.antgroup.tugraph</groupId>    <artifactId>geaflow-assembly</artifactId>    <version>0.2-SNAPSHOT</version></dependency>
复制代码


新建 PageRank 类,编写上述相关代码。在项目 resources 路径下,创建测试数据文件 email_vertex 和 email_edge,代码中会从 resources://资源路径读取数据进行构图。


 PWindowSource<IVertex<Integer, Double>> prVertices =                pipelineTaskCxt.buildSource(new FileSource<>("email_vertex",                        line -> {                            String[] fields = line.split(",");                            IVertex<Integer, Double> vertex = new ValueVertex<>(                                Integer.valueOf(fields[0]), Double.valueOf(fields[1]));                            return Collections.singletonList(vertex);                        }), AllWindow.getInstance())                    .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));
PWindowSource<IEdge<Integer, Integer>> prEdges = pipelineTaskCxt.buildSource(new FileSource<>("email_edge", line -> { String[] fields = line.split(","); IEdge<Integer, Integer> edge = new ValueEdge<>(Integer.valueOf(fields[0]), Integer.valueOf(fields[1]), 1); return Collections.singletonList(edge); }), AllWindow.getInstance()) .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));
复制代码


email_vertex


0,11,12,13,14,15,16,17,18,19,1
复制代码


email_edge


4,30,12,34,62,46,80,24,80,50,70,89,07,07,17,29,53,07,45,37,51,05,49,83,47,93,73,81,68,06,06,28,54,2
复制代码


maven 打包,在 target 目录获取算法的 jar 包


mvn clean install
复制代码

新增 HLA 图任务

在 GeaFlow Console 中新增图任务,任务类型选择“HLA”, 并上传 jar 包(或者选择已存在的 jar 包),其中 entryClass 为算法主函数所在的类。 点击“提交”,创建任务。


提交作业


点击"发布",可进入作业详情界面,点击“提交”即可提交作业。


查看运行结果

进入容器 /tmp/logs/task/ 目录下,查看对应作业的日志,可看到日志中打印了最终计算得到的每个点的 pageRank 值。


2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:0, value:1.5718675107490019)2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:1, value:0.5176947080197076)2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:2, value:1.0201253300467092)2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:3, value:1.3753756869824914)2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:4, value:1.4583114077692536)2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:5, value:1.1341668910561529)2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:6, value:0.6798184364673463)2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:7, value:0.70935427506243)2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:8, value:1.2827529511906106)2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:9, value:0.2505328026562969)
复制代码


可在作业详情中查看运行详情,



至此,我们就成功使用 Geaflow 实现并运行 API 任务了!是不是超简单!快来试一试吧!



GeaFlow(品牌名 TuGraph-Analytics) 已正式开源,欢迎大家关注!!!


欢迎给我们 Star 哦!


Welcome to give us a Star!


GitHub👉https://github.com/TuGraph-family/tugraph-analytics


更多精彩内容,关注我们的博客 https://geaflow.github.io/

用户头像

GeaFlow

关注

欢迎访问:geaflow.github.io 2023-07-05 加入

GeaFlow(品牌名TuGraph-Analytics) 是一个分布式流图计算引擎 欢迎给我们 Star 哦! GitHub👉github.com/TuGraph-family/tugraph-analytics 更多精彩内容,关注我们的博客geaflow.github.io

评论

发布
暂无评论
GeaFlow任务能力增强:通过API定制流图计算逻辑_分布式计算_GeaFlow_InfoQ写作社区