前段时间 NebulaGraph 3.5.0 发布,@whitewum 吴老师建议我把前段时间 NebulaGraph 社区里开启的新项目 ng_ai 公开给大家。
所以,就有了这个系列文章,本文是该系列的开篇之作。
ng_ai 是什么
ng_ai 的全名是:Nebulagraph AI Suite,顾名思义,它是在 NebulaGraph 之上跑算法的 Python 套件,希望能给 NebulaGraph 的用户一个自然、简洁的高级 API。简单来说,用很少的代码量就可以执行图上的算法相关的任务。
ng_ai 这个开源项目的目标是,快速迭代、公开讨论、持续演进,一句话概述便是:
Simplifying things in surprising ways.
这个 ng_ai 的专属 url:https://github.com/wey-gu/nebulagraph-ai 可以帮你了解更全面的它。
ng_ai 的特点
为了让 NebulaGraph 社区的小伙伴拥有顺滑的算法体验,ng_ai 有以下特点:
- 与 NebulaGraph 紧密结合,方便从其中读、写图数据 
- 支持多引擎、后端,目前支持 Spark(NebulaGraph Algorithm)、NetworkX,之后会支持 DGL、PyG
 
 
- 友好、符合直觉的 API 设计 
- 与 NebulaGraph 的 UDF 无缝结合,支持从 Query 中调用 ng_ai 任务 
- 友好的自定义算法接口,方便用户自己实现算法(尚未完成) 
- 一键试玩环境(基于 Docker Extensions) 
你可以这么用 ng_ai
跑分布式 PageRank 算法
可以在一个大图上,基于 nebula-algorithm 分布式地跑 PageRank 算法,像是这样:
 from ng_ai import NebulaReader
# scan 模式,通过 Spark 引擎读取数据reader = NebulaReader(engine="spark")reader.scan(edge="follow", props="degree")df = reader.read()
# 运行 PageRank 算法pr_result = df.algo.pagerank(reset_prob=0.15, max_iter=10)
   复制代码
 写回算法结果到 NebulaGraph
假设我们要跑一个 Label Propagation 算法,然后把结果写回 NebulaGraph,我们可以这么做:
先确保结果中要写回图数据库的数据 Schema 已经创建好了,像是下面的示例,便是写到 label_propagation.cluster_id 字段里:
 CREATE TAG IF NOT EXISTS label_propagation (    cluster_id string NOT NULL);
   复制代码
 
下面,我们来看下具体流程。执行算法:
 df_result = df.algo.label_propagation()
   复制代码
 
再看一下结果的 Schema:
 df_result.printSchema()
root |-- _id: string (nullable = false) |-- lpa: string (nullable = false)
   复制代码
 
参考下面的代码,把 lpa 的结果写回 NebulaGraph 中的 cluster_id 字段里({"lpa": "cluster_id"}):
 from ng_ai import NebulaWriterfrom ng_ai.config import NebulaGraphConfig
config = NebulaGraphConfig()writer = NebulaWriter(    data=df_result, sink="nebulagraph_vertex", config=config, engine="spark")
# 将 lpa 同 cluster_id 进行映射properties = {"lpa": "cluster_id"}
writer.set_options(    tag="label_propagation",    vid_field="_id",    properties=properties,    batch_size=256,    write_mode="insert",)# 将数据写回到 NebulaGraphwriter.write()
   复制代码
 
最后,验证一下:
 USE basketballplayer;MATCH (v:label_propagation)RETURN id(v), v.label_propagation.cluster_id LIMIT 3;
   复制代码
 
结果:
 +-------------+--------------------------------+| id(v)       | v.label_propagation.cluster_id |+-------------+--------------------------------+| "player103" | "player101"                    || "player113" | "player129"                    || "player121" | "player129"                    |+-------------+--------------------------------+
   复制代码
 
更详细的例子参考:ng_ai/examples
通过 nGQL 调用算法
自 NebulaGraph v3.5.0 开始,用户可从 nGQL 中调用自己实现的函数。而 ng_ai 也用这个能力来实现了一个自己的 ng_ai 函数,让它从 nGQL 中调用 ng_ai 的算法,例如:
 -- 准备将要写入数据的 SchemaUSE basketballplayer;CREATE TAG IF NOT EXISTS pagerank(pagerank string);:sleep 20;-- 回调 ng_ai()RETURN ng_ai("pagerank", ["follow"], ["degree"], "spark", {space: "basketballplayer", max_iter: 10}, {write_mode: "insert"})
   复制代码
 
更详细的例子参考:ng_ai/examples
单机运行算法
在单机、本地的环境,ng_ai 支持基于 NetworkX 运行算法。
举个例子,读取图为 ng_ai graph 对象:
 from ng_ai import NebulaReaderfrom ng_ai.config import NebulaGraphConfig
# query 模式,通过 NebulaGraph 或是 NetworkX 引擎读取数据config_dict = {    "graphd_hosts": "graphd:9669",    "user": "root",    "password": "nebula",    "space": "basketballplayer",}config = NebulaGraphConfig(**config_dict)reader = NebulaReader(engine="nebula", config=config)reader.query(edges=["follow", "serve"], props=[["degree"], []])g = reader.read()
   复制代码
 
查看、画图:
运行算法:
 pr_result = g.algo.pagerank(reset_prob=0.15, max_iter=10)
   复制代码
 
写回 NebulaGraph:
 from ng_ai import NebulaWriter
writer = NebulaWriter(    data=pr_result,    sink="nebulagraph_vertex",    config=config,    engine="nebula",)
# 待写入的属性properties = ["pagerank"]
writer.set_options(    tag="pagerank",    properties=properties,    batch_size=256,    write_mode="insert",)# 将数据写回到 NebulaGraphwriter.write()
   复制代码
 
其他算法:
 # 获取所有算法g.algo.get_all_algo()
# 获取相关算法的帮助信息help(g.algo.node2vec)
# 调用算法g.algo.node2vec()
   复制代码
 
更详细的例子参考:ng_ai/examples
可视化图算法结果
这里演示一个 NetworkX 引擎情况下,计算 Louvain、PageRank 并可视化的例子:
先执行两个图算法:
 pr_result = g.algo.pagerank(reset_prob=0.15, max_iter=10)louvain_result = g.algo.louvain()
   复制代码
 
再手写一个画图好看的函数:
 from matplotlib.colors import ListedColormap
def draw_graph_louvain_pr(G, pr_result, louvain_result, colors=["#1984c5", "#22a7f0", "#63bff0", "#a7d5ed", "#e2e2e2", "#e1a692", "#de6e56", "#e14b31", "#c23728"]):    # 设定节点的位置    pos = nx.spring_layout(G)
    # 新建一个图形并设置坐标轴    fig, ax = plt.subplots(figsize=(35, 15))    ax.set_xlim(-1, 1)    ax.set_ylim(-1, 1)
    # 从颜色列表中创建一个 colormap    cmap = ListedColormap(colors)
    # 将图中的节点和边进行绘图    node_colors = [louvain_result[node] for node in G.nodes()]    node_sizes = [70000 * pr_result[node] for node in G.nodes()]    nx.draw_networkx_nodes(G, pos=pos, ax=ax, node_color=node_colors, node_size=node_sizes, cmap=cmap, vmin=0, vmax=max(louvain_result.values()))
    nx.draw_networkx_edges(G, pos=pos, ax=ax, edge_color='gray', width=1, connectionstyle='arc3, rad=0.2', arrowstyle='-|>', arrows=True)
    # 提取边数据中的 label 数据作为字典    edge_labels = nx.get_edge_attributes(G, 'label')
    # 在图中加入边的 label 数据    for edge, label in edge_labels.items():        ax.text((pos[edge[0]][0] + pos[edge[1]][0])/2,                (pos[edge[0]][1] + pos[edge[1]][1])/2,                label, fontsize=12, color='black', ha='center', va='center')
    # 在图中加入点的 label 数据    node_labels = {n: G.nodes[n]['label'] if 'label' in G.nodes[n] else n for n in G.nodes()}    nx.draw_networkx_labels(G, pos=pos, ax=ax, labels=node_labels, font_size=12, font_color='black')
    # 为同社区数据添加相同颜色    sm = plt.cm.ScalarMappable(cmap=cmap, norm=plt.Normalize(vmin=0, vmax=max(louvain_result.values())))    sm.set_array([])    cbar = plt.colorbar(sm, ax=ax, ticks=range(max(louvain_result.values()) + 1), shrink=0.5)    cbar.ax.set_yticklabels([f'Community {i}' for i in range(max(louvain_result.values()) + 1)])
    # 数据展示    plt.show()
draw_graph_louvain_pr(G, pr_result=pr_result, louvain_result=louvain_result)
   复制代码
 
效果如下所示:
更详细的例子参考:ng_ai/examples
更方便的 Notebook 操作 NebulaGraph
结合 NebulaGraph 的 Jupyter Notebook 插件: https://github.com/wey-gu/ipython-ngql,我们还可以更便捷地操作 NebulaGraph:
可通过 ng_ai 的 extras 在 Jupyter Notbook 中安装插件:
 %pip install ng_ai[jupyter]%load_ext ngql
   复制代码
 
当然,也可以单独安装插件:
 %pip install ipython-ngql%load_ext ngql
   复制代码
 
安装完成后,就可以在 Notebook 里直接使用 %ngql 命令来执行 nGQL 语句:
 %ngql --address 127.0.0.1 --port 9669 --user root --password nebula%ngql USE basketballplayer;%ngql MATCH (v:player{name:"Tim Duncan"})-->(v2:player) RETURN v2.player.name AS Name;
   复制代码
 
注,多行的 Query 用两个百分号就好了 %%ngql
最后,我们还能在 Jupyter Notebook 里直接可视化渲染结果!只需要 %ng_draw 就可以啦!
 %ngql match p=(:player)-[]->() return p LIMIT 5%ng_draw
   复制代码
 
效果如下:
未来工作
现在 ng_ai 还在开发中,我们还有很多工作要做:
 model = g.algo.gnn_link_prediction()result = model.train()# query src, dst to be predicted
model.predict(src_vertex, dst_vertices)
   复制代码
 
ng_ai 完全 build in public,欢迎社区的大家们来参与,一起来完善 ng_ai,让 NebulaGraph 上的 AI 算法更加简单、易用!
试玩 ng_ai
我们已经准备好了一键部署的 NebulaGraph + NebulaGraph Studio + ng_ai in Jupyter 的环境,只需要大家从 Docker Desktop 的 Extension(扩展)中搜索 NebulaGraph,就可以试玩了。
在 Docker Desktop 的插件市场搜索 NebulaGraph,点击安装:
进入 NebulaGraph 插件,点击 Install NX Mode,安装 ng_ai 的 NetworkX Playground,通常要等几分钟等待安装完成。
点击 Jupyter NB NetworkX,进入 NetworkX Playground。
ng_ai 的架构
ng_ai 的架构如下,它的核心模块有:
- Reader:负责从 NebulaGraph 读取数据 
- Writer:负责将数据写入 NebulaGraph 
- Engine:负责适配不同运行时,例如 Spark、DGL、NetowrkX 等 
- Algo:算法模块,例如 PageRank、Louvain、GNN_Link_Predict 等 
此外,为了支持 nGQL 中的调用,还有两个模块:
           ┌───────────────────────────────────────────────────┐          │   Spark Cluster                                   │          │    .─────.    .─────.    .─────.    .─────.       │          │   ;       :  ;       :  ;       :  ;       :      │       ┌─▶│   :       ;  :       ;  :       ;  :       ;      │       │  │    ╲     ╱    ╲     ╱    ╲     ╱    ╲     ╱       │       │  │     `───'      `───'      `───'      `───'        │  Algo Spark                                                  │    Engine└───────────────────────────────────────────────────┘       │  ┌────────────────────────────────────────────────────┬──────────┐       └──┤                                                    │          │          │   NebulaGraph AI Suite(ngai)                       │ ngai-api │◀─┐          │                                                    │          │  │          │                                                    └──────────┤  │          │     ┌────────┐    ┌──────┐    ┌────────┐   ┌─────┐            │  │          │     │ Reader │    │ Algo │    │ Writer │   │ GNN │            │  │ ┌───────▶│     └────────┘    └──────┘    └────────┘   └─────┘            │  │ │        │          │            │            │          │               │  │ │        │          ├────────────┴───┬────────┴─────┐    └──────┐        │  │ │        │          ▼                ▼              ▼           ▼        │  │ │        │   ┌─────────────┐ ┌──────────────┐ ┌──────────┐ ┌──────────┐  │  │ │     ┌──┤   │ SparkEngine │ │ NebulaEngine │ │ NetworkX │ │ DGLEngine│  │  │ │     │  │   └─────────────┘ └──────────────┘ └──────────┘ └──────────┘  │  │ │     │  └──────────┬────────────────────────────────────────────────────┘  │ │     │             │        Spark                                          │ │     │             └────────Reader ────────────┐                           │ │  Spark                   Query Mode           │                           │ │  Reader                                       │                           │ │Scan Mode                                      ▼                      ┌─────────┐ │     │  ┌───────────────────────────────────────────────────┬─────────┤ ngai-udf│◀─────────────┐ │     │  │                                                   │         └─────────┤              │ │     │  │  NebulaGraph Graph Engine         Nebula-GraphD   │   ngai-GraphD     │              │ │     │  ├──────────────────────────────┬────────────────────┼───────────────────┘              │ │     │  │                              │                    │                                  │ │     │  │  NebulaGraph Storage Engine  │                    │                                  │ │     │  │                              │                    │                                  │ │     └─▶│  Nebula-StorageD             │    Nebula-Metad    │                                  │ │        │                              │                    │                                  │ │        └──────────────────────────────┴────────────────────┘                                  │ │                                                                                               │ │    ┌───────────────────────────────────────────────────────────────────────────────────────┐  │ │    │ RETURN ng_ai("pagerank", ["follow"], ["degree"], "spark", {space:"basketballplayer"}) │──┘ │    └───────────────────────────────────────────────────────────────────────────────────────┘ │  ┌─────────────────────────────────────────────────────────────┐ │  │ from ng_ai import NebulaReader                              │ │  │                                                             │ │  │ # read data with spark engine, scan mode                    │ │  │ reader = NebulaReader(engine="spark")                       │ │  │ reader.scan(edge="follow", props="degree")                  │ └──│ df = reader.read()                                          │    │                                                             │    │ # run pagerank algorithm                                    │    │ pr_result = df.algo.pagerank(reset_prob=0.15, max_iter=10)  │    │                                                             │    └─────────────────────────────────────────────────────────────┘  
   复制代码
 
谢谢你读完本文 (///▽///)
欢迎前往 GitHub 来阅读 NebulaGraph 源码,或是尝试用它解决你的业务问题 yo~ GitHub 地址:https://github.com/vesoft-inc/nebula
评论