原文 http://weikeqin.com/2018/04/17/neo4j-apoc-use/
(1) 下载配置
(1.1) 下载对应版本的 apoc jar 包
从 github apoc 各个版本下载地址下载对应版本的 apoc jar 包,并放到 $NEO4J_HOME/plugs
目录下
wget https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/3.3.0.2/apoc-3.3.0.2-all.jar
(1.2) 配置 neo4j.conf 配置文件
在 neo4j.conf
最后一行添加
dbms.security.procedures.unrestricted=apoc.*
apoc.import.file.enabled=true
复制代码
neo4j-sh (?)$ return apoc.version();
+----------------+
| apoc.version() |
+----------------+
| "3.3.0.2" |
+----------------+
1 row
29 ms
复制代码
<!--more-->
查看 apoc 版本
return apoc.version();
call apoc.help('apoc');
call dbms.procedures
call dbms.functions()
(2) 导入 json apoc.load.json
(2.1) 官方文档
neo4j-sh (?)$ CALL apoc.help("apoc.load.json");
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| type | name | text
| signature | roles | writes |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| "procedure" | "apoc.load.json" | "apoc.load.json('url',path, config) YIELD value - import JSON as stream of values if the JSON was an array or a single value if it was a map"
| "apoc.load.json(url :: STRING?, path = :: STRING?, config = {} :: MAP?) :: (value :: MAP?)" | <null> | <null> |
| "procedure" | "apoc.load.jsonArray" | "apoc.load.jsonArray('url') YIELD value - load array from JSON URL (e.g. web-api) to import JSON as stream of values"
| "apoc.load.jsonArray(url :: STRING?, path = :: STRING?) :: (value :: ANY?)" | <null> | <null> |
| "procedure" | "apoc.load.jsonParams" | "apoc.load.jsonParams('url',{header:value},payload, config) YIELD value - load from JSON URL (e.g. web-api) while sending headers / payload to import JSON as stream of values if the JSON was an array or a single value if it was a map" | "apoc.load.jsonParams(url :: STRING?, headers :: MAP?, payload :: STRING?, path = :: STRING?, config = {} :: MAP?) :: (value :: MAP?)" | <null> | <null> |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
3 rows
42 ms
复制代码
(2.2) 官方示例
WITH 'https://raw.githubusercontent.com/neo4j-contrib/neo4j-apoc-procedures/3.3.0.2/src/test/resources/person.json' AS url
CALL apoc.load.json(url) YIELD value as person
MERGE (p:Person {name:person.name})
ON CREATE SET p.age = person.age, p.children = size(person.children);
复制代码
{"name":"Michael",
"age": 41,
"children": ["Selina","Rana","Selma"]
}
复制代码
(3) 导入 csv 文件 apoc.load.csv
(3.1) 官方文档
neo4j-sh (?)$ CALL apoc.help("apoc.load.csv");
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| type | name | text
| signature
| roles | writes |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| "procedure" | "apoc.load.csv" | "apoc.load.csv('url',{config}) YIELD lineNo, list, map - load CSV fom URL as stream of values,
config contains any of: {skip:1,limit:5,header:false,sep:'TAB',ignore:['tmp'],nullValues:['na'],arraySep:';',mapping:{years:{type:'int',arraySep:'-',array:false,name:'age',ignore:false}}" | "apoc.load.csv(url :: STRING?, config = {} :: MAP?) :: (lineNo :: INTEGER?, list :: LIST? OF ANY?, strings :: LIST? OF STRING?, map :: MAP?, stringMap :: MAP?)" | <null> | <null> |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row
50 ms
复制代码
(3.2) 用法
apoc.load.csv('url',{config}) YIELD lineNo, list, map - load CSV fom URL as stream of values, config contains any of: {skip:1,limit:5,header:false,sep:'TAB',ignore:['tmp'],arraySep:';',mapping:{years:{type:'int',arraySep:'-',array:false,name:'age',ignore:false}}
返回行号
call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield lineNo return lineNo;
返回 map,header 必须是 true
call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield map return map;
返回行号+map
call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {skip:0, limit:5, header:true, sep:','}) yield lineNo,map return lineNo,map;
call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield map return map.uuid;
(4) 动态创建节点 apoc.create.node
CALL apoc.create.node(['Label'], {key:value,…}) create node with dynamic labels
创建节点时动态传入参数
官方作者在 stackoverflow 说,cypther 里不能动态传 Label,这是由 Cypther 语言决定的,只能通过动态拼接字符串在程序里实现。
但是 APOC 里有 apoc.create.node 方法,可以动态传 Label,应该是用的动态拼接字符串,源码还没看。
uuid,name,Label
2a3e275d9abc4c45913d8e7e619db87a,"张忆耕",Laebl1
db6ee76baff64db5956b6a5deb80acbf,"傅某评",Laebl2
8d2f4a74e7e7429390b3389d64d77637,"王苏维",Laebl3
4d0a5c3fa89a49e89f81a152f2aa259c,"蓝波",Laebl4
2f811c5341b84b70840acbdd451e2490,"范为华",Laebl5
311f7441fe4b4b7c8d5dc56d7590c1c3,"黄日波",Laebl1
b4d04f00887c49308e86afd0f0baf641,"徐国康",Laebl3
b717f94e2f3f4d25bb3dcd8a8efbc667,"王心迪",Laebl6
复制代码
不加 headers
load csv from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line CALL apoc.create.node([line[2]], {uuid:line[0], name:line[1]}) YIELD node return labels(node), node limit 5 ;
neo4j-sh (?)$ load csv from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line CALL apoc.create.node([line[2]], {uuid:line[0], name:line[1]}) YIELD node return labels(node), node limit 5 ;
+-----------------------------------------------------------------------------------+
| labels(node) | node |
+-----------------------------------------------------------------------------------+
| ["Label"] | Node[18004017]{name:"name",uuid:"uuid"} |
| ["WB_AJ"] | Node[18004018]{name:"张忆耕",uuid:"2a3e275d9abc4c45913d8e7e619db87a"} |
| ["WP_SJH"] | Node[18004019]{name:"傅某评",uuid:"db6ee76baff64db5956b6a5deb80acbf"} |
| ["LG_JG"] | Node[18004020]{name:"王苏维",uuid:"8d2f4a74e7e7429390b3389d64d77637"} |
| ["JTSGXX"] | Node[18004021]{name:"蓝波",uuid:"4d0a5c3fa89a49e89f81a152f2aa259c"} |
+-----------------------------------------------------------------------------------+
5 rows
84 ms
复制代码
加 headers
load csv with headers from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line CALL apoc.create.node([line.Label], {uuid:line.uuid, name:line.name}) YIELD node return labels(node), node limit 5 ;
neo4j-sh (?)$ load csv with headers from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line CALL apoc.create.node([line.Label], {uuid:line.uuid, name:line.name}) YIELD node return labels(node), node limit 5 ;
+-----------------------------------------------------------------------------------+
| labels(node) | node |
+-----------------------------------------------------------------------------------+
| ["WB_AJ"] | Node[18004022]{name:"张忆耕",uuid:"2a3e275d9abc4c45913d8e7e619db87a"} |
| ["WP_SJH"] | Node[18004023]{name:"傅某评",uuid:"db6ee76baff64db5956b6a5deb80acbf"} |
| ["LG_JG"] | Node[18004024]{name:"王苏维",uuid:"8d2f4a74e7e7429390b3389d64d77637"} |
| ["JTSGXX"] | Node[18004025]{name:"蓝波",uuid:"4d0a5c3fa89a49e89f81a152f2aa259c"} |
| ["SWXX"] | Node[18004026]{name:"范为华",uuid:"2f811c5341b84b70840acbdd451e2490"} |
+-----------------------------------------------------------------------------------+
5 rows
75 ms
复制代码
(5) 动态创建关系 apoc.create.relationship
(5.1) 官方文档
neo4j-sh (?)$ CALL apoc.help("apoc.create.relationship");
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| type | name | text | signature
| roles | writes |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| "procedure" | "apoc.create.relationship" | "apoc.create.relationship(person1,'KNOWS',{key:value,...}, person2) create relationship with dynamic rel-type" | "apoc.create.relationship(from :: NODE?, relType :: STRING?, props :: MAP?, to :: NODE?) :: (rel :: RELATIONSHIP?)" | <null> | <null> |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row
61 ms
复制代码
CALL apoc.create.relationship(person1,'KNOWS',{key:value,…}, person2) create relationship with dynamic rel-type
(5.2) 示例
using periodic commit 10000
//call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield map return map
load csv with headers from 'file:/data/stale/data01/neo4j/relathionship_uuid_10w.csv' as line fieldterminator ','
match (p1:Person {uuid: line[0]})
match (p2:Person {uuid: line[1]})
WITH p1, p2, line
CALL apoc.create.relationship(p1, line[2], {name: line[3]}, p2) YIELD rel
RETURN rel
复制代码
merge (n1:Test {name:'zhangsan'}) merge (n2:Test {name:'lisi'})
with n1, n2
call apoc.create.relationship(n1, 'R1', {}, n2) YIELD rel
return id(rel), type(rel), rel ;
复制代码
using periodic commit 10000
load csv from 'file:/data/stale/data01/neo4j/relathionship_uuid_10w.csv' as line fieldterminator ','
merge (n1:Test {uuid: line[0]})
merge (n2:Test {uuid: line[1]})
with n1, n2, line
CALL apoc.create.relationship(n1, line[2], {}, n2) YIELD rel
return id(rel), type(rel), rel ;
复制代码
using periodic commit 10000
load csv with headers from 'file:/data/stale/data01/neo4j/relathionship_uuid_10w.csv' as line fieldterminator ','
merge (n1:Test {uuid: line.uuid1})
merge (n2:Test {uuid: line.uuid2})
with n1, n2, line
CALL apoc.create.relationship(n1, line.type, {}, n2) YIELD rel
return id(rel), type(rel), rel ;
复制代码
create (n:Test {name:'zhangsan'}) return n;
create constraint on (n:Test) assert n.name is unique;
merge (n:Test {name:'zhangsan'})-[r:Friend]->(m:Test {name:'lisi'});
{
"signature": 127,
"fields": [
{
"code": "Neo.ClientError.Schema.ConstraintValidationFailed",
"message": "Node 1000020 already exists with label Test and property \"name\"=[zhangsan]"
}
],
"timings": {
"type": "client"
}
}
复制代码
(6) 热启动 apoc.warmup.run
neo4j-sh (?)$ CALL apoc.warmup.run();
+--------------------------------------------------------------------------------------------------------------------------+
| pageSize | nodesPerPage | nodesTotal | nodePages | nodesTime | relsPerPage | relsTotal | relPages | relsTime | totalTime |
+--------------------------------------------------------------------------------------------------------------------------+
| 8192 | 546 | 119510256 | 234717 | 9 | 240 | 131001022 | 587505 | 21 | 30 |
+--------------------------------------------------------------------------------------------------------------------------+
1 row
30337 ms
复制代码
(7) 从 RDBMS 导入数据 apoc.load.jdbc
(7.1) 官方文档
neo4j-sh (?)$ CALL apoc.help("apoc.load.jdbc");
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| type | name | text
| signature | roles | writes |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| "procedure" | "apoc.load.jdbc" | "apoc.load.jdbc('key or url','table or kernelTransaction') YIELD row - load from relational database, from a full table or a sql kernelTransaction" | "apoc.load.jdbc(jdbc :: STRING?, tableOrSql :: STRING?, params = [] :: LIST? OF ANY?) :: (row :: MAP?)" | <null> | <null> |
| "procedure" | "apoc.load.jdbcParams" | "deprecated - please use: apoc.load.jdbc('key or url','kernelTransaction',[params]) YIELD row - load from relational database, from a sql kernelTransaction with parameters" | "apoc.load.jdbcParams(jdbc :: STRING?, sql :: STRING?, params :: LIST? OF ANY?) :: (row :: MAP?)" | <null> | <null> |
| "procedure" | "apoc.load.jdbcUpdate" | "apoc.load.jdbcUpdate('key or url','kernelTransaction',[params]) YIELD row - update relational database, from a SQL kernelTransaction with optional parameters" | "apoc.load.jdbcUpdate(jdbc :: STRING?, query :: STRING?, params = [] :: LIST? OF ANY?) :: (row :: MAP?)" | <null> | <null> |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
3 rows
180 ms
复制代码
(7.2) 使用
CALL apoc.load.driver("com.mysql.jdbc.Driver");
(7.3.1) 查看数据个数
with "jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin" as url
CALL apoc.load.jdbc(url, "city") YIELD row
RETURN count(*);
复制代码
(7.3.2) 查看数据样例
// 固定参数
with "jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin" as url,
"select id, name from city where level = 1" as sql
CALL apoc.load.jdbc(url, sql, []) YIELD row
RETURN row LIMIT 10;
复制代码
(7.3.3) 动态传入参数
// 动态传参数
with "jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin" as url,
"select id, name from city where level = ? and is_deleted = ? " as sql
CALL apoc.load.jdbcParams(url, sql, [1, 'N']) YIELD row
RETURN row LIMIT 10;
复制代码
(7.3.4) 分批处理
CALL apoc.periodic.iterate(
'call apoc.load.jdbc("jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin", " select id, name, level, pid from city where level = 6 and is_deleted = \'N\' ", [])',
'MATCH (s:Test {id: toInteger(row.pid)}) MERGE (s)-[r:Connect]->(n:Group {id: toInteger(row.id)}) set n += row ',
{batchSize:10000, parallel:true}
)
复制代码
(8) 遇到的错误
(1) There is no procedure with the name `apoc.version` registered
There is no procedure with the name
apoc.version registered for this database instance. Please ensure you've spelled the procedure name correctly and that the procedure is properly deployed.
1. 检查是否下载对应的 apoc jar 包,并放到 plugs 目录下
2. 在 neo4j.conf 里添加 dbms.security.procedures.unrestricted=apoc.* apoc.import.file.enabled=true
的配置
3. 重启 Neo4j 数据库
(2) Failed to invoke procedure `apoc.load.driver`: Caused by: java.lang.RuntimeException: Could not load driver class com.mysql.jdbc.Driver com.mysql.jdbc.Driver
neo4j-sh (?)$ CALL apoc.load.driver("com.mysql.jdbc.Driver");
1 ms
WARNING: Failed to invoke procedure `apoc.load.driver`: Caused by: java.lang.RuntimeException: Could not load driver class com.mysql.jdbc.Driver com.mysql.jdbc.Driver
复制代码
少 mysql jar 包
(3) `Invalid input 'y': expected 'r/R' or 'a/A' `
neo4j-sh (?)$ with "jdbc:mysql://localhost:3306/test?user=root&password=root" as url cypher CALL apoc.load.jdbc(url,"test_table") YIELD row RETURN count(*);
5 ms
WARNING: Invalid input 'y': expected 'r/R' or 'a/A' (line 1, column 73 (offset: 72))
"with "jdbc:mysql://localhost:3306/test?user=root&password=root" as url cypher CALL apoc.load.jdbc(url,"test_table") YIELD row RETURN count(*)"
复制代码
原因:
缺少 MySQL jar 包
下载 MySQL jar 包放到 lib 或 plugs 目录下,重启 Neo4j 数据库就好了。
(4) Neo.ClientError.Statement.SyntaxError: Unknown function 'apoc.version' (line 1, column 8 (offset: 7))
Neo.ClientError.Statement.SyntaxError: Unknown function 'apoc.version' (line 1, column 8 (offset: 7))
"return apoc.version();"
^
复制代码
neo4j.conf 里配置有问题
(5) Neo.ClientError.Procedure.ProcedureNotFound: There is no procedure with the name `apoc.help` registered for this database instance.
Neo.ClientError.Procedure.ProcedureNotFound: There is no procedure with the name `apoc.help` registered for this database instance. Please ensure you've spelled the procedure name correctly and that the procedure is properly deployed.
复制代码
neo4j.conf 里配置有问题
References
[1] neo4j-apoc-procedures github 地址
[2] neo4j-apoc 官方文档
[3] cypher-dynamic-label-at-query cypher 语句中动态传 Label 参数
[4] Cannot run a query looking for a dynamic label
[5] how-to-use-apoc-load-csv-in-conjunction-with-apoc-create-node
[6] Neo4j 导入动态类型关系
[7] how-do-i-fix-this-neo4j-apoc-query-that-creates-nodes-froma-csv-file
评论