GeaFlow(品牌名 TuGraph-Analytics) 已正式开源,欢迎大家关注!!! 欢迎给我们 Star 哦! GitHub👉https://github.com/TuGraph-family/tugraph-analytics更多精彩内容,关注我们的博客 https://geaflow.github.io/
GeaFlow API 介绍
GeaFlow API 是对高阶用户提供的开发接口,用户可以直接通过编写 java 代码来编写计算作业,相比于 DSL,API 的方式开发更加灵活,也能实现更丰富的功能和更复杂的计算逻辑。在 GeaFlow 中,API 支持 Graph API 和 Stream API 两种类型:
Graph API:Graph 是 GeaFlow 框架的一等公民,当前 GeaFlow 框架提供了一套基于 GraphView 的图计算编程接口,包含图构建、图计算及遍历。在 GeaFlow 中支持 Static Graph 和 Dynamic Graph 两种类型。
Static Graph API:静态图计算 API,基于该类 API 可以进行全量的图计算或图遍历。
Dynamic Graph API:动态图计算 API,GeaFlow 中 GraphView 是动态图的数据抽象,基于 GraphView 之上,可以进行动态图计算或图遍历。同时支持对 Graphview 生成 Snapshot 快照,基于 Snapshot 可以提供和 Static Graph API 一样的接口能力。
Stream API:GeaFlow 提供了一套通用计算的编程接口,包括 source 构建、流批计算及 sink 输出。在 GeaFlow 中支持 Batch 和 Stream 两种类型。
Batch API:批计算 API,基于该类 API 可以进行批量计算。
Stream API:流计算 API,GeaFlow 中 StreamView 是动态流的数据抽象,基于 StreamView 之上,可以进行流计算。
更多 API 的介绍可参考 https://github.com/TuGraph-family/tugraph-analytics/blob/master/docs/docs-cn/application-development/api/overview.md
PageRank 算法示例
本例子是从文件中读取点边进行构图,执行 pageRank 算法后,将每个点的 pageRank 值进行打印。其中,用户需要实现 AbstractVcFunc,在 compute 方法中进行每一轮迭代的计算逻辑。在本例子中,只计算了两轮迭代的结果。在第一轮中,每个点都会向邻居点发送当前点的 value 值,而在第二轮中,每个点收到邻居点发送的消息,将其 value 值进行累加,并更新为自己的 value 值,即为最后的 PageRank 值。
public class PageRank {
private static final Logger LOGGER = LoggerFactory.getLogger(PageRank.class);
public static final String RESULT_FILE_PATH = "./target/tmp/data/result/pagerank";
private static final double alpha = 0.85;
public static void main(String[] args) { Environment environment = EnvironmentUtil.loadEnvironment(args); IPipelineResult result = PageRank.submit(environment); PipelineResultCollect.get(result); environment.shutdown(); }
public static IPipelineResult submit(Environment environment) { Pipeline pipeline = PipelineFactory.buildPipeline(environment); Configuration envConfig = environment.getEnvironmentContext().getConfig(); envConfig.put(FileSink.OUTPUT_DIR, RESULT_FILE_PATH); ResultValidator.cleanResult(RESULT_FILE_PATH);
pipeline.submit((PipelineTask) pipelineTaskCxt -> { Configuration conf = pipelineTaskCxt.getConfig(); PWindowSource<IVertex<Integer, Double>> prVertices = pipelineTaskCxt.buildSource(new FileSource<>("email_vertex", line -> { String[] fields = line.split(","); IVertex<Integer, Double> vertex = new ValueVertex<>( Integer.valueOf(fields[0]), Double.valueOf(fields[1])); return Collections.singletonList(vertex); }), AllWindow.getInstance()) .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));
PWindowSource<IEdge<Integer, Integer>> prEdges = pipelineTaskCxt.buildSource(new FileSource<>("email_edge", line -> { String[] fields = line.split(","); IEdge<Integer, Integer> edge = new ValueEdge<>(Integer.valueOf(fields[0]), Integer.valueOf(fields[1]), 1); return Collections.singletonList(edge); }), AllWindow.getInstance()) .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));
int iterationParallelism = conf.getInteger(ExampleConfigKeys.ITERATOR_PARALLELISM); GraphViewDesc graphViewDesc = GraphViewBuilder .createGraphView(GraphViewBuilder.DEFAULT_GRAPH) .withShardNum(2) .withBackend(BackendType.Memory) .build(); PGraphWindow<Integer, Double, Integer> graphWindow = pipelineTaskCxt.buildWindowStreamGraph(prVertices, prEdges, graphViewDesc);
SinkFunction<IVertex<Integer, Double>> sink = ExampleSinkFunctionFactory.getSinkFunction(conf); graphWindow.compute(new PRAlgorithms(10)) .compute(iterationParallelism) .getVertices() .sink(v -> { LOGGER.info("result {}", v); }) .withParallelism(conf.getInteger(ExampleConfigKeys.SINK_PARALLELISM)); });
return pipeline.execute(); }
public static class PRAlgorithms extends VertexCentricCompute<Integer, Double, Integer, Double> {
public PRAlgorithms(long iterations) { super(iterations); }
@Override public VertexCentricComputeFunction<Integer, Double, Integer, Double> getComputeFunction() { return new PRVertexCentricComputeFunction(); }
@Override public VertexCentricCombineFunction<Double> getCombineFunction() { return null; }
}
public static class PRVertexCentricComputeFunction extends AbstractVcFunc<Integer, Double, Integer, Double> {
@Override public void compute(Integer vertexId, Iterator<Double> messageIterator) { IVertex<Integer, Double> vertex = this.context.vertex().get(); List<IEdge<Integer, Integer>> outEdges = context.edges().getOutEdges(); if (this.context.getIterationId() == 1) { if (!outEdges.isEmpty()) { this.context.sendMessageToNeighbors(vertex.getValue() / outEdges.size()); }
} else { double sum = 0; while (messageIterator.hasNext()) { double value = messageIterator.next(); sum += value; } double pr = sum * alpha + (1 - alpha); this.context.setNewVertexValue(pr);
if (!outEdges.isEmpty()) { this.context.sendMessageToNeighbors(pr / outEdges.size()); } } }
}}
复制代码
提交 API 作业
(以容器模式,PageRank 算法示例)
算法打包
在新的项目中新建一个 PageRank 的 demo,pom 中引入 geaflow 依赖
<dependency> <groupId>com.antgroup.tugraph</groupId> <artifactId>geaflow-assembly</artifactId> <version>0.2-SNAPSHOT</version></dependency>
复制代码
新建 PageRank 类,编写上述相关代码。在项目 resources 路径下,创建测试数据文件 email_vertex 和 email_edge,代码中会从 resources://资源路径读取数据进行构图。
PWindowSource<IVertex<Integer, Double>> prVertices = pipelineTaskCxt.buildSource(new FileSource<>("email_vertex", line -> { String[] fields = line.split(","); IVertex<Integer, Double> vertex = new ValueVertex<>( Integer.valueOf(fields[0]), Double.valueOf(fields[1])); return Collections.singletonList(vertex); }), AllWindow.getInstance()) .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));
PWindowSource<IEdge<Integer, Integer>> prEdges = pipelineTaskCxt.buildSource(new FileSource<>("email_edge", line -> { String[] fields = line.split(","); IEdge<Integer, Integer> edge = new ValueEdge<>(Integer.valueOf(fields[0]), Integer.valueOf(fields[1]), 1); return Collections.singletonList(edge); }), AllWindow.getInstance()) .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));
复制代码
email_vertex
0,11,12,13,14,15,16,17,18,19,1
复制代码
email_edge
4,30,12,34,62,46,80,24,80,50,70,89,07,07,17,29,53,07,45,37,51,05,49,83,47,93,73,81,68,06,06,28,54,2
复制代码
maven 打包,在 target 目录获取算法的 jar 包
新增 HLA 图任务
在 GeaFlow Console 中新增图任务,任务类型选择“HLA”, 并上传 jar 包(或者选择已存在的 jar 包),其中 entryClass 为算法主函数所在的类。 点击“提交”,创建任务。
提交作业
点击"发布",可进入作业详情界面,点击“提交”即可提交作业。
查看运行结果
进入容器 /tmp/logs/task/ 目录下,查看对应作业的日志,可看到日志中打印了最终计算得到的每个点的 pageRank 值。
2023-08-01 16:51:38 INFO PageRank:107 - result ValueVertex(vertexId:0, value:1.5718675107490019)2023-08-01 16:51:38 INFO PageRank:107 - result ValueVertex(vertexId:1, value:0.5176947080197076)2023-08-01 16:51:38 INFO PageRank:107 - result ValueVertex(vertexId:2, value:1.0201253300467092)2023-08-01 16:51:38 INFO PageRank:107 - result ValueVertex(vertexId:3, value:1.3753756869824914)2023-08-01 16:51:38 INFO PageRank:107 - result ValueVertex(vertexId:4, value:1.4583114077692536)2023-08-01 16:51:38 INFO PageRank:107 - result ValueVertex(vertexId:5, value:1.1341668910561529)2023-08-01 16:51:38 INFO PageRank:107 - result ValueVertex(vertexId:6, value:0.6798184364673463)2023-08-01 16:51:38 INFO PageRank:107 - result ValueVertex(vertexId:7, value:0.70935427506243)2023-08-01 16:51:38 INFO PageRank:107 - result ValueVertex(vertexId:8, value:1.2827529511906106)2023-08-01 16:51:38 INFO PageRank:107 - result ValueVertex(vertexId:9, value:0.2505328026562969)
复制代码
可在作业详情中查看运行详情,
至此,我们就成功使用 Geaflow 实现并运行 API 任务了!是不是超简单!快来试一试吧!
GeaFlow(品牌名 TuGraph-Analytics) 已正式开源,欢迎大家关注!!!
欢迎给我们 Star 哦!
Welcome to give us a Star!
GitHub👉https://github.com/TuGraph-family/tugraph-analytics
更多精彩内容,关注我们的博客 https://geaflow.github.io/
评论