写点什么

无依赖单机尝鲜 Nebula Exchange 的 SST 导入

作者:Nebula Graph
  • 2022 年 3 月 03 日
  • 本文字数:10016 字

    阅读完需:约 33 分钟


本文尝试分享下以最小方式(单机、容器化 Spark、Hadoop、Nebula Graph),快速趟一下 Nebula Exchange 中 SST 写入方式的步骤。本文适用于 v2.5 以上版本的 Nebula- Exchange。


原文链接:


什么是 Nebula Exchange?

之前我在 Nebula Data Import Options 之中介绍过,Nebula Exchange 是一个 Nebula Graph 社区开源的 Spark Applicaiton,它专门用来支持批量或者流式地把数据导入 Nebula Graph Database 之中。


Nebula Exchange 支持多种多样的数据源(从 Apache Parquet、ORC、JSON、CSV、HBase、Hive MaxCompute 到 Neo4j、MySQL、ClickHouse,再有 Kafka、Pulsar,更多的数据源也在不断增加之中)。



如上图所示,在 Exchange 内部,从除了不同 Reader 可以读取不同数据源之外,在数据经过 Processor 处理之后通过 Writer 写入(sink) Nebula Graph 图数据库的时候,除了走正常的 ServerBaseWriter 的写入流程之外,它还可以绕过整个写入流程,利用 Spark 的计算能力并行生成底层 RocksDB 的 SST 文件,从而实现超高性能的数据导入,这个 SST 文件导入的场景就是本文带大家上手熟悉的部分。


详细信息请参阅:Nebula Graph 手册:什么是 Nebula Exchange

Nebula Graph 官方博客也有更多 Nebula Exchange 的实践文章

步骤概观

  • 实验环境

  • 配置 Exchange

  • 生成 SST 文件

  • 写入 SST 文件到 Nebula Graph

实验环境准备

为了最小化使用 Nebula Exchange 的 SST 功能,我们需要:


  • 搭建一个 Nebula Graph 集群,创建导入数据的 Schema,我们选择使用 Docker-Compose 方式、利用 Nebula-Up 快速部署,并简单修改其网络,以方便同样容器化的 Exchange 程序对其访问。

  • 搭建容器化的 Spark 运行环境

  • 搭建容器化的 HDFS

1. 搭建 Nebula Graph 集群

借助于 Nebula-Up 我们可以在 Linux 环境下一键部署一套 Nebula Graph 集群:


curl -fsSL nebula-up.siwei.io/install.sh | bash
复制代码



待部署成功之后,我们需要对环境做一些修改,这里我做的修改其实就是两点:


  1. 只保留一个 metaD 服务

  2. 起用 Docker 的外部网络


详细修改的部分参考附录一


应用 docker-compose 的修改:


cd ~/.nebula-up/nebula-docker-composevim docker-compose.yaml # 参考附录一docker network create nebula-net # 需要创建外部网络docker-compose up -d --remove-orphans
复制代码


之后,我们来创建要测试的图空间,并创建图的 Schema,为此,我们可以利用 nebula-console ,同样,Nebula-Up 里自带了容器化的 nebula-console。


  • 进入 Nebula-Console 所在的容器


~/.nebula-up/console.sh/ #
复制代码


  • 在 console 容器里发起链接到图数据库,其中 192.168.x.y 是我所在的 Linux VM 的第一个网卡地址,请换成您的


/ # nebula-console -addr 192.168.x.y -port 9669 -user root -p password[INFO] connection pool is initialized successfully
Welcome to Nebula Graph!
复制代码


  • 创建图空间(我们起名字叫 sst ),以及 schema


create space sst(partition_num=5,replica_factor=1,vid_type=fixed_string(32));:sleep 20use sstcreate tag player(name string, age int);
复制代码


示例输出


(root@nebula) [(none)]> create space sst(partition_num=5,replica_factor=1,vid_type=fixed_string(32));Execution succeeded (time spent 1468/1918 us)
(root@nebula) [(none)]> :sleep 20
(root@nebula) [(none)]> use sstExecution succeeded (time spent 1253/1566 us)
Wed, 18 Aug 2021 08:18:13 UTC
(root@nebula) [sst]> create tag player(name string, age int);Execution succeeded (time spent 1312/1735 us)
Wed, 18 Aug 2021 08:18:23 UTC
复制代码

2. 搭建容器化的 Spark 环境

利用 big data europe 做的工作,这个过程非常容易。


值得注意的是:


  • 现在的 Nebula Exchange 对 Spark 的版本有要求,在现在的 2021 年 8 月,我是用了 spark-2.4.5-hadoop-2.7 的版本。

  • 为了方便,我让 Spark 运行在 Nebula Graph 相同的机器上,并且指定了运行在同一个 Docker 网络下


docker run --name spark-master --network nebula-net \    -h spark-master -e ENABLE_INIT_DAEMON=false -d \    bde2020/spark-master:2.4.5-hadoop2.7
复制代码


然后,我们就可以进入到环境中了:


docker exec -it spark-master bash
复制代码


进到 Spark 容器中之后,可以像这样安装 maven:


export MAVEN_VERSION=3.5.4export MAVEN_HOME=/usr/lib/mvnexport PATH=$MAVEN_HOME/bin:$PATH
wget http://archive.apache.org/dist/maven/maven-3/$MAVEN_VERSION/binaries/apache-maven-$MAVEN_VERSION-bin.tar.gz && \ tar -zxvf apache-maven-$MAVEN_VERSION-bin.tar.gz && \ rm apache-maven-$MAVEN_VERSION-bin.tar.gz && \ mv apache-maven-$MAVEN_VERSION /usr/lib/mvn
复制代码


还可以这样在容器里下载 nebula-exchange 的 jar 包:


cd ~wget https://repo1.maven.org/maven2/com/vesoft/nebula-exchange/2.1.0/nebula-exchange-2.1.0.jar
复制代码

3. 搭建容器化的 HDFS

同样借助 big-data-euroupe 的工作,这非常简单,不过我们要做一点修改,让它的 docker-compose.yml 文件里使用 nebula-net 这个之前创建的 Docker 网络。


详细修改的部分参考附录二


git clone https://github.com/big-data-europe/docker-hadoop.gitcd docker-hadoopvim docker-compose.ymldocker-compose up -d
复制代码

配置 Exchange

这个配置主要填入的信息就是 Nebula Graph 集群本身和将要写入数据的 Space Name,以及数据源相关的配置(这里我们用 csv 作为例子),最后再配置输出(sink)为 sst


  • Nebula Graph

  • GraphD 地址

  • MetaD 地址

  • credential

  • Space Name

  • 数据源

  • source: csv

  • path

  • fields etc.

  • ink: sst


详细的配置参考附录二


注意,这里 metaD 的地址可以这样获取,可以看到 0.0.0.0:49377->9559 表示 49377 是外部的地址。


$ docker ps | grep meta887740c15750   vesoft/nebula-metad:v2.0.0                               "./bin/nebula-metad …"   6 hours ago    Up 6 hours (healthy)    9560/tcp, 0.0.0.0:49377->9559/tcp, :::49377->9559/tcp, 0.0.0.0:49376->19559/tcp, :::49376->19559/tcp, 0.0.0.0:49375->19560/tcp, :::49375->19560/tcp                  nebula-docker-compose_metad0_1
复制代码

生成 SST 文件

1. 准备源文件、配置文件

docker cp exchange-sst.conf spark-master:/root/docker cp player.csv spark-master:/root/
复制代码


其中 player.csv 的例子:


1100,Tim Duncan,421101,Tony Parker,361102,LaMarcus Aldridge,331103,Rudy Gay,321104,Marco Belinelli,321105,Danny Green,311106,Kyle Anderson,251107,Aron Baynes,321108,Boris Diaw,361109,Tiago Splitter,341110,Cory Joseph,271111,David West,38
复制代码

2. 执行 exchange 程序

进入 spark-master 容器,提交执行 exchange 应用。


docker exec -it spark-master bashcd /root//spark/bin/spark-submit --master local \    --class com.vesoft.nebula.exchange.Exchange nebula-exchange-2.1.0.jar\    -c exchange-sst.conf
复制代码


检查执行结果:


spark-submit 输出:


21/08/17 03:37:43 INFO TaskSetManager: Finished task 31.0 in stage 2.0 (TID 33) in 1093 ms on localhost (executor driver) (32/32)21/08/17 03:37:43 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool21/08/17 03:37:43 INFO DAGScheduler: ResultStage 2 (foreachPartition at VerticesProcessor.scala:179) finished in 22.336 s21/08/17 03:37:43 INFO DAGScheduler: Job 1 finished: foreachPartition at VerticesProcessor.scala:179, took 22.500639 s21/08/17 03:37:43 INFO Exchange$: SST-Import: failure.player: 021/08/17 03:37:43 WARN Exchange$: Edge is not defined21/08/17 03:37:43 INFO SparkUI: Stopped Spark web UI at http://spark-master:404021/08/17 03:37:43 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
复制代码


验证 HDFS 上生成的 SST 文件:


docker exec -it namenode /bin/bash
root@2db58903fb53:/# hdfs dfs -ls /sstFound 10 itemsdrwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/1drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/10drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/2drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/3drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/4drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/5drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/6drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/7drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/8drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/9
复制代码

写入 SST 到 Nebula Graph

这里的操作实际上都是参考文档:SST 导入,得来。其中就是从 console 之中执行了两步操作:


  • Download

  • Ingest


其中 Download 实际上是触发 Nebula Graph 从服务端发起 HDFS Client 的 download,获取 HDFS 上的 SST 文件,然后放到 storageD 能访问的本地路径下,这里,需要我们在服务端部署 HDFS 的依赖。因为我们是最小实践,我就偷懒手动做了这个 Download 的操作。

1. 手动下载

这里边手动下载我们就要知道 Nebula Graph 服务端下载的路径,实际上是 /data/storage/nebula/<space_id>/download/,这里的 Space ID 需要手动获取一下:


这个例子里,我们的 Space Name 是 sst,而 Space ID 是 49


(root@nebula) [sst]> DESC space sst+----+-------+------------------+----------------+---------+------------+--------------------+-------------+-----------+| ID | Name  | Partition Number | Replica Factor | Charset | Collate    | Vid Type           | Atomic Edge | Group     |+----+-------+------------------+----------------+---------+------------+--------------------+-------------+-----------+| 49 | "sst" | 10               | 1              | "utf8"  | "utf8_bin" | "FIXED_STRING(32)" | "false"     | "default" |+----+-------+------------------+----------------+---------+------------+--------------------+-------------+-----------+
复制代码


于是,下边的操作就是手动把 SST 文件从 HDFS 之中 get 下来,再拷贝到 storageD 之中。


docker exec -it namenode /bin/bash
$ hdfs dfs -get /sst /sstexitdocker cp namenode:/sst .docker exec -it nebula-docker-compose_storaged0_1 mkdir -p /data/storage/nebula/49/download/docker exec -it nebula-docker-compose_storaged1_1 mkdir -p /data/storage/nebula/49/download/docker exec -it nebula-docker-compose_storaged2_1 mkdir -p /data/storage/nebula/49/download/docker cp sst nebula-docker-compose_storaged0_1:/data/storage/nebula/49/download/docker cp sst nebula-docker-compose_storaged1_1:/data/storage/nebula/49/download/docker cp sst nebula-docker-compose_storaged2_1:/data/storage/nebula/49/download/
复制代码

2. SST 文件导入

  • 进入 Nebula-Console 所在的容器


~/.nebula-up/console.sh/ #
复制代码


  • 在 console 容器里发起链接到图数据库,其中 192.168.x.y 是我所在的 Linux VM 的第一个网卡地址,请换成您的


/ # nebula-console -addr 192.168.x.y -port 9669 -user root -p password[INFO] connection pool is initialized successfully
Welcome to Nebula Graph!
复制代码


  • 执行 INGEST 开始让 StorageD 读取 SST 文件


(root@nebula) [(none)]> use sst(root@nebula) [sst]> INGEST;
复制代码


我们可以用如下方法实时查看 Nebula Graph 服务端的日志


tail -f ~/.nebula-up/nebula-docker-compose/logs/*/*
复制代码


成功的 INGEST 日志:


I0817 08:03:28.611877   169 EventListner.h:96] Ingest external SST file: column family default, the external file path /data/storage/nebula/49/download/8/8-6.sst, the internal file path /data/storage/nebula/49/data/000023.sst, the properties of the table: # data blocks=1; # entries=1; # deletions=0; # merge operands=0; # range deletions=0; raw key size=48; raw average key size=48.000000; raw value size=40; raw average value size=40.000000; data block size=75; index block size (user-key? 0, delta-value? 0)=66; filter block size=0; (estimated) table size=141; filter policy name=N/A; prefix extractor name=nullptr; column family ID=N/A; column family name=N/A; comparator name=leveldb.BytewiseComparator; merge operator name=nullptr; property collectors names=[]; SST file compression algo=Snappy; SST file compression options=window_bits=-14; level=32767; strategy=0; max_dict_bytes=0; zstd_max_train_bytes=0; enabled=0; ; creation time=0; time stamp of earliest key=0; file creation time=0;E0817 08:03:28.611912   169 StorageHttpIngestHandler.cpp:63] SSTFile ingest successfully
复制代码

附录

附录一

docker-compose.yaml


diff --git a/docker-compose.yaml b/docker-compose.yamlindex 48854de..cfeaedb 100644--- a/docker-compose.yaml+++ b/docker-compose.yaml@@ -6,11 +6,13 @@ services:       USER: root       TZ:   "${TZ}"     command:-      - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559+      - --meta_server_addrs=metad0:9559       - --local_ip=metad0       - --ws_ip=metad0       - --port=9559       - --ws_http_port=19559+      - --ws_storage_http_port=19779       - --data_path=/data/meta       - --log_dir=/logs       - --v=0@@ -34,81 +36,14 @@ services:     cap_add:       - SYS_PTRACE
- metad1:- image: vesoft/nebula-metad:v2.0.0- environment:- USER: root- TZ: "${TZ}"- command:- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559- - --local_ip=metad1- - --ws_ip=metad1- - --port=9559- - --ws_http_port=19559- - --data_path=/data/meta- - --log_dir=/logs- - --v=0- - --minloglevel=0- healthcheck:- test: ["CMD", "curl", "-sf", "http://metad1:19559/status"]- interval: 30s- timeout: 10s- retries: 3- start_period: 20s- ports:- - 9559- - 19559- - 19560- volumes:- - ./data/meta1:/data/meta- - ./logs/meta1:/logs- networks:- - nebula-net- restart: on-failure- cap_add:- - SYS_PTRACE-- metad2:- image: vesoft/nebula-metad:v2.0.0- environment:- USER: root- TZ: "${TZ}"- command:- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559- - --local_ip=metad2- - --ws_ip=metad2- - --port=9559- - --ws_http_port=19559- - --data_path=/data/meta- - --log_dir=/logs- - --v=0- - --minloglevel=0- healthcheck:- test: ["CMD", "curl", "-sf", "http://metad2:19559/status"]- interval: 30s- timeout: 10s- retries: 3- start_period: 20s- ports:- - 9559- - 19559- - 19560- volumes:- - ./data/meta2:/data/meta- - ./logs/meta2:/logs- networks:- - nebula-net- restart: on-failure- cap_add:- - SYS_PTRACE- storaged0: image: vesoft/nebula-storaged:v2.0.0 environment: USER: root TZ: "${TZ}" command:- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559+ - --meta_server_addrs=metad0:9559 - --local_ip=storaged0 - --ws_ip=storaged0 - --port=9779@@ -119,8 +54,8 @@ services: - --minloglevel=0 depends_on: - metad0- - metad1- - metad2 healthcheck: test: ["CMD", "curl", "-sf", "http://storaged0:19779/status"] interval: 30s@@ -146,7 +81,7 @@ services: USER: root TZ: "${TZ}" command:- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559+ - --meta_server_addrs=metad0:9559 - --local_ip=storaged1 - --ws_ip=storaged1 - --port=9779@@ -157,8 +92,8 @@ services: - --minloglevel=0 depends_on: - metad0- - metad1- - metad2 healthcheck: test: ["CMD", "curl", "-sf", "http://storaged1:19779/status"] interval: 30s@@ -184,7 +119,7 @@ services: USER: root TZ: "${TZ}" command:- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559+ - --meta_server_addrs=metad0:9559 - --local_ip=storaged2 - --ws_ip=storaged2 - --port=9779@@ -195,8 +130,8 @@ services: - --minloglevel=0 depends_on: - metad0- - metad1- - metad2 healthcheck: test: ["CMD", "curl", "-sf", "http://storaged2:19779/status"] interval: 30s@@ -222,17 +157,19 @@ services: USER: root TZ: "${TZ}" command:- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559+ - --meta_server_addrs=metad0:9559 - --port=9669 - --ws_ip=graphd - --ws_http_port=19669+ - --ws_meta_http_port=19559 - --log_dir=/logs - --v=0 - --minloglevel=0 depends_on: - metad0- - metad1- - metad2 healthcheck: test: ["CMD", "curl", "-sf", "http://graphd:19669/status"] interval: 30s@@ -257,17 +194,19 @@ services: USER: root TZ: "${TZ}" command:- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559+ - --meta_server_addrs=metad0:9559 - --port=9669 - --ws_ip=graphd1 - --ws_http_port=19669+ - --ws_meta_http_port=19559 - --log_dir=/logs - --v=0 - --minloglevel=0 depends_on: - metad0- - metad1- - metad2 healthcheck: test: ["CMD", "curl", "-sf", "http://graphd1:19669/status"] interval: 30s@@ -292,17 +231,21 @@ services: USER: root TZ: "${TZ}" command:- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559+ - --meta_server_addrs=metad0:9559 - --port=9669 - --ws_ip=graphd2 - --ws_http_port=19669+ - --ws_meta_http_port=19559 - --log_dir=/logs - --v=0 - --minloglevel=0+ - --storage_client_timeout_ms=60000+ - --local_config=true depends_on: - metad0- - metad1- - metad2 healthcheck: test: ["CMD", "curl", "-sf", "http://graphd2:19669/status"] interval: 30s@@ -323,3 +266,4 @@ services:
networks: nebula-net:+ external: true
复制代码

附录二

https://github.com/big-data-europe/docker-hadoopdocker-compose.yml


diff --git a/docker-compose.yml b/docker-compose.ymlindex ed40dc6..66ff1f4 100644--- a/docker-compose.yml+++ b/docker-compose.yml@@ -14,6 +14,8 @@ services:       - CLUSTER_NAME=test     env_file:       - ./hadoop.env+    networks:+      - nebula-net
datanode: image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8@@ -25,6 +27,8 @@ services: SERVICE_PRECONDITION: "namenode:9870" env_file: - ./hadoop.env+ networks:+ - nebula-net
resourcemanager: image: bde2020/hadoop-resourcemanager:2.0.0-hadoop3.2.1-java8@@ -34,6 +38,8 @@ services: SERVICE_PRECONDITION: "namenode:9000 namenode:9870 datanode:9864" env_file: - ./hadoop.env+ networks:+ - nebula-net
nodemanager1: image: bde2020/hadoop-nodemanager:2.0.0-hadoop3.2.1-java8@@ -43,6 +49,8 @@ services: SERVICE_PRECONDITION: "namenode:9000 namenode:9870 datanode:9864 resourcemanager:8088" env_file: - ./hadoop.env+ networks:+ - nebula-net
historyserver: image: bde2020/hadoop-historyserver:2.0.0-hadoop3.2.1-java8@@ -54,8 +62,14 @@ services: - hadoop_historyserver:/hadoop/yarn/timeline env_file: - ./hadoop.env+ networks:+ - nebula-net
volumes: hadoop_namenode: hadoop_datanode: hadoop_historyserver:++networks:+ nebula-net:+ external: true
复制代码

附录三

nebula-exchange-sst.conf


{  # Spark relation config  spark: {    app: {      name: Nebula Exchange 2.1    }
master:local
driver: { cores: 1 maxResultSize: 1G }
executor: { memory:1G }
cores:{ max: 16 } }
# Nebula Graph relation config nebula: { address:{ graph:["192.168.8.128:9669"] meta:["192.168.8.128:49377"] } user: root pswd: nebula space: sst
# parameters for SST import, not required path:{ local:"/tmp" remote:"/sst" hdfs.namenode: "hdfs://192.168.8.128:9000" }
# nebula client connection parameters connection { # socket connect & execute timeout, unit: millisecond timeout: 30000 }
error: { # max number of failures, if the number of failures is bigger than max, then exit the application. max: 32 # failed import job will be recorded in output path output: /tmp/errors }
# use google's RateLimiter to limit the requests send to NebulaGraph rate: { # the stable throughput of RateLimiter limit: 1024 # Acquires a permit from RateLimiter, unit: MILLISECONDS # if it can't be obtained within the specified timeout, then give up the request. timeout: 1000 } }
# Processing tags # There are tag config examples for different dataSources. tags: [
# HDFS csv # Import mode is sst, just change type.sink to client if you want to use client import mode. { name: player type: { source: csv sink: sst } path: "file:///root/player.csv" # if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields fields: [_c1, _c2] nebula.fields: [name, age] vertex: { field:_c0 } separator: "," header: false batch: 256 partition: 32 }
]}
复制代码


本文中如有任何错误或疏漏,欢迎去 GitHub:https://github.com/vesoft-inc/nebula issue 区向我们提 issue 或者前往官方论坛:https://discuss.nebula-graph.com.cn/建议反馈 分类下提建议 👏;交流图数据库技术?加入 Nebula 交流群请先填写下你的 Nebula 名片,Nebula 小助手会拉你进群~~

用户头像

Nebula Graph

关注

一款开源的分布式图数据库 2020.04.28 加入

Follow me, here is my GitHub profile: https://github.com/vesoft-inc/nebula

评论

发布
暂无评论
无依赖单机尝鲜 Nebula Exchange 的 SST 导入_数据库_Nebula Graph_InfoQ写作平台