原文 http://weikeqin.com/2018/04/17/neo4j-apoc-use/
(1) 下载配置
(1.1) 下载对应版本的 apoc jar 包
从 github apoc 各个版本下载地址下载对应版本的 apoc jar 包,并放到 $NEO4J_HOME/plugs 目录下
wget https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/3.3.0.2/apoc-3.3.0.2-all.jar
(1.2) 配置 neo4j.conf 配置文件
在 neo4j.conf 最后一行添加
dbms.security.procedures.unrestricted=apoc.*apoc.import.file.enabled=true
复制代码
neo4j-sh (?)$ return apoc.version();+----------------+| apoc.version() |+----------------+| "3.3.0.2" |+----------------+1 row29 ms
复制代码
<!--more-->
查看 apoc 版本
return apoc.version();
call apoc.help('apoc');
call dbms.procedures
call dbms.functions()
(2) 导入 json apoc.load.json
(2.1) 官方文档
neo4j-sh (?)$ CALL apoc.help("apoc.load.json");+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| type | name | text | signature | roles | writes |+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| "procedure" | "apoc.load.json" | "apoc.load.json('url',path, config) YIELD value - import JSON as stream of values if the JSON was an array or a single value if it was a map" | "apoc.load.json(url :: STRING?, path = :: STRING?, config = {} :: MAP?) :: (value :: MAP?)" | <null> | <null> || "procedure" | "apoc.load.jsonArray" | "apoc.load.jsonArray('url') YIELD value - load array from JSON URL (e.g. web-api) to import JSON as stream of values" | "apoc.load.jsonArray(url :: STRING?, path = :: STRING?) :: (value :: ANY?)" | <null> | <null> || "procedure" | "apoc.load.jsonParams" | "apoc.load.jsonParams('url',{header:value},payload, config) YIELD value - load from JSON URL (e.g. web-api) while sending headers / payload to import JSON as stream of values if the JSON was an array or a single value if it was a map" | "apoc.load.jsonParams(url :: STRING?, headers :: MAP?, payload :: STRING?, path = :: STRING?, config = {} :: MAP?) :: (value :: MAP?)" | <null> | <null> |+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+3 rows42 ms
复制代码
(2.2) 官方示例
WITH 'https://raw.githubusercontent.com/neo4j-contrib/neo4j-apoc-procedures/3.3.0.2/src/test/resources/person.json' AS urlCALL apoc.load.json(url) YIELD value as personMERGE (p:Person {name:person.name}) ON CREATE SET p.age = person.age, p.children = size(person.children);
复制代码
{"name":"Michael", "age": 41, "children": ["Selina","Rana","Selma"]}
复制代码
(3) 导入 csv 文件 apoc.load.csv
(3.1) 官方文档
neo4j-sh (?)$ CALL apoc.help("apoc.load.csv");+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| type | name | text | signature | roles | writes |+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| "procedure" | "apoc.load.csv" | "apoc.load.csv('url',{config}) YIELD lineNo, list, map - load CSV fom URL as stream of values, config contains any of: {skip:1,limit:5,header:false,sep:'TAB',ignore:['tmp'],nullValues:['na'],arraySep:';',mapping:{years:{type:'int',arraySep:'-',array:false,name:'age',ignore:false}}" | "apoc.load.csv(url :: STRING?, config = {} :: MAP?) :: (lineNo :: INTEGER?, list :: LIST? OF ANY?, strings :: LIST? OF STRING?, map :: MAP?, stringMap :: MAP?)" | <null> | <null> |+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+1 row50 ms
复制代码
(3.2) 用法
apoc.load.csv('url',{config}) YIELD lineNo, list, map - load CSV fom URL as stream of values, config contains any of: {skip:1,limit:5,header:false,sep:'TAB',ignore:['tmp'],arraySep:';',mapping:{years:{type:'int',arraySep:'-',array:false,name:'age',ignore:false}}
返回行号
call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield lineNo return lineNo;
返回 map,header 必须是 true
call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield map return map;
返回行号+map
call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {skip:0, limit:5, header:true, sep:','}) yield lineNo,map return lineNo,map;
call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield map return map.uuid;
(4) 动态创建节点 apoc.create.node
CALL apoc.create.node(['Label'], {key:value,…}) create node with dynamic labels
创建节点时动态传入参数
官方作者在 stackoverflow 说,cypther 里不能动态传 Label,这是由 Cypther 语言决定的,只能通过动态拼接字符串在程序里实现。
但是 APOC 里有 apoc.create.node 方法,可以动态传 Label,应该是用的动态拼接字符串,源码还没看。
uuid,name,Label2a3e275d9abc4c45913d8e7e619db87a,"张忆耕",Laebl1db6ee76baff64db5956b6a5deb80acbf,"傅某评",Laebl28d2f4a74e7e7429390b3389d64d77637,"王苏维",Laebl34d0a5c3fa89a49e89f81a152f2aa259c,"蓝波",Laebl42f811c5341b84b70840acbdd451e2490,"范为华",Laebl5311f7441fe4b4b7c8d5dc56d7590c1c3,"黄日波",Laebl1b4d04f00887c49308e86afd0f0baf641,"徐国康",Laebl3b717f94e2f3f4d25bb3dcd8a8efbc667,"王心迪",Laebl6
复制代码
不加 headers
load csv from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line CALL apoc.create.node([line[2]], {uuid:line[0], name:line[1]}) YIELD node return labels(node), node limit 5 ;
neo4j-sh (?)$ load csv from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line CALL apoc.create.node([line[2]], {uuid:line[0], name:line[1]}) YIELD node return labels(node), node limit 5 ;+-----------------------------------------------------------------------------------+| labels(node) | node |+-----------------------------------------------------------------------------------+| ["Label"] | Node[18004017]{name:"name",uuid:"uuid"} || ["WB_AJ"] | Node[18004018]{name:"张忆耕",uuid:"2a3e275d9abc4c45913d8e7e619db87a"} || ["WP_SJH"] | Node[18004019]{name:"傅某评",uuid:"db6ee76baff64db5956b6a5deb80acbf"} || ["LG_JG"] | Node[18004020]{name:"王苏维",uuid:"8d2f4a74e7e7429390b3389d64d77637"} || ["JTSGXX"] | Node[18004021]{name:"蓝波",uuid:"4d0a5c3fa89a49e89f81a152f2aa259c"} |+-----------------------------------------------------------------------------------+5 rows84 ms
复制代码
加 headers
load csv with headers from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line CALL apoc.create.node([line.Label], {uuid:line.uuid, name:line.name}) YIELD node return labels(node), node limit 5 ;
neo4j-sh (?)$ load csv with headers from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line CALL apoc.create.node([line.Label], {uuid:line.uuid, name:line.name}) YIELD node return labels(node), node limit 5 ;+-----------------------------------------------------------------------------------+| labels(node) | node |+-----------------------------------------------------------------------------------+| ["WB_AJ"] | Node[18004022]{name:"张忆耕",uuid:"2a3e275d9abc4c45913d8e7e619db87a"} || ["WP_SJH"] | Node[18004023]{name:"傅某评",uuid:"db6ee76baff64db5956b6a5deb80acbf"} || ["LG_JG"] | Node[18004024]{name:"王苏维",uuid:"8d2f4a74e7e7429390b3389d64d77637"} || ["JTSGXX"] | Node[18004025]{name:"蓝波",uuid:"4d0a5c3fa89a49e89f81a152f2aa259c"} || ["SWXX"] | Node[18004026]{name:"范为华",uuid:"2f811c5341b84b70840acbdd451e2490"} |+-----------------------------------------------------------------------------------+5 rows75 ms
复制代码
(5) 动态创建关系 apoc.create.relationship
(5.1) 官方文档
neo4j-sh (?)$ CALL apoc.help("apoc.create.relationship");+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| type | name | text | signature | roles | writes |+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| "procedure" | "apoc.create.relationship" | "apoc.create.relationship(person1,'KNOWS',{key:value,...}, person2) create relationship with dynamic rel-type" | "apoc.create.relationship(from :: NODE?, relType :: STRING?, props :: MAP?, to :: NODE?) :: (rel :: RELATIONSHIP?)" | <null> | <null> |+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+1 row61 ms
复制代码
CALL apoc.create.relationship(person1,'KNOWS',{key:value,…}, person2) create relationship with dynamic rel-type
(5.2) 示例
using periodic commit 10000//call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield map return mapload csv with headers from 'file:/data/stale/data01/neo4j/relathionship_uuid_10w.csv' as line fieldterminator ','match (p1:Person {uuid: line[0]})match (p2:Person {uuid: line[1]})WITH p1, p2, lineCALL apoc.create.relationship(p1, line[2], {name: line[3]}, p2) YIELD relRETURN rel
复制代码
merge (n1:Test {name:'zhangsan'}) merge (n2:Test {name:'lisi'})with n1, n2call apoc.create.relationship(n1, 'R1', {}, n2) YIELD relreturn id(rel), type(rel), rel ;
复制代码
using periodic commit 10000load csv from 'file:/data/stale/data01/neo4j/relathionship_uuid_10w.csv' as line fieldterminator ','merge (n1:Test {uuid: line[0]})merge (n2:Test {uuid: line[1]})with n1, n2, lineCALL apoc.create.relationship(n1, line[2], {}, n2) YIELD relreturn id(rel), type(rel), rel ;
复制代码
using periodic commit 10000load csv with headers from 'file:/data/stale/data01/neo4j/relathionship_uuid_10w.csv' as line fieldterminator ','merge (n1:Test {uuid: line.uuid1})merge (n2:Test {uuid: line.uuid2})with n1, n2, lineCALL apoc.create.relationship(n1, line.type, {}, n2) YIELD relreturn id(rel), type(rel), rel ;
复制代码
create (n:Test {name:'zhangsan'}) return n;
create constraint on (n:Test) assert n.name is unique;
merge (n:Test {name:'zhangsan'})-[r:Friend]->(m:Test {name:'lisi'});
{ "signature": 127, "fields": [ { "code": "Neo.ClientError.Schema.ConstraintValidationFailed", "message": "Node 1000020 already exists with label Test and property \"name\"=[zhangsan]" } ], "timings": { "type": "client" }}
复制代码
(6) 热启动 apoc.warmup.run
neo4j-sh (?)$ CALL apoc.warmup.run();+--------------------------------------------------------------------------------------------------------------------------+| pageSize | nodesPerPage | nodesTotal | nodePages | nodesTime | relsPerPage | relsTotal | relPages | relsTime | totalTime |+--------------------------------------------------------------------------------------------------------------------------+| 8192 | 546 | 119510256 | 234717 | 9 | 240 | 131001022 | 587505 | 21 | 30 |+--------------------------------------------------------------------------------------------------------------------------+1 row30337 ms
复制代码
(7) 从 RDBMS 导入数据 apoc.load.jdbc
(7.1) 官方文档
neo4j-sh (?)$ CALL apoc.help("apoc.load.jdbc");+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| type | name | text | signature | roles | writes |+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| "procedure" | "apoc.load.jdbc" | "apoc.load.jdbc('key or url','table or kernelTransaction') YIELD row - load from relational database, from a full table or a sql kernelTransaction" | "apoc.load.jdbc(jdbc :: STRING?, tableOrSql :: STRING?, params = [] :: LIST? OF ANY?) :: (row :: MAP?)" | <null> | <null> || "procedure" | "apoc.load.jdbcParams" | "deprecated - please use: apoc.load.jdbc('key or url','kernelTransaction',[params]) YIELD row - load from relational database, from a sql kernelTransaction with parameters" | "apoc.load.jdbcParams(jdbc :: STRING?, sql :: STRING?, params :: LIST? OF ANY?) :: (row :: MAP?)" | <null> | <null> || "procedure" | "apoc.load.jdbcUpdate" | "apoc.load.jdbcUpdate('key or url','kernelTransaction',[params]) YIELD row - update relational database, from a SQL kernelTransaction with optional parameters" | "apoc.load.jdbcUpdate(jdbc :: STRING?, query :: STRING?, params = [] :: LIST? OF ANY?) :: (row :: MAP?)" | <null> | <null> |+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+3 rows180 ms
复制代码
(7.2) 使用
CALL apoc.load.driver("com.mysql.jdbc.Driver");
(7.3.1) 查看数据个数
with "jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin" as urlCALL apoc.load.jdbc(url, "city") YIELD rowRETURN count(*);
复制代码
(7.3.2) 查看数据样例
// 固定参数with "jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin" as url, "select id, name from city where level = 1" as sqlCALL apoc.load.jdbc(url, sql, []) YIELD rowRETURN row LIMIT 10;
复制代码
(7.3.3) 动态传入参数
// 动态传参数with "jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin" as url, "select id, name from city where level = ? and is_deleted = ? " as sqlCALL apoc.load.jdbcParams(url, sql, [1, 'N']) YIELD rowRETURN row LIMIT 10;
复制代码
(7.3.4) 分批处理
CALL apoc.periodic.iterate( 'call apoc.load.jdbc("jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin", " select id, name, level, pid from city where level = 6 and is_deleted = \'N\' ", [])', 'MATCH (s:Test {id: toInteger(row.pid)}) MERGE (s)-[r:Connect]->(n:Group {id: toInteger(row.id)}) set n += row ', {batchSize:10000, parallel:true})
复制代码
(8) 遇到的错误
(1) There is no procedure with the name `apoc.version` registered
There is no procedure with the name apoc.version registered for this database instance. Please ensure you've spelled the procedure name correctly and that the procedure is properly deployed.
1. 检查是否下载对应的 apoc jar 包,并放到 plugs 目录下
2. 在 neo4j.conf 里添加 dbms.security.procedures.unrestricted=apoc.* apoc.import.file.enabled=true 的配置
3. 重启 Neo4j 数据库
(2) Failed to invoke procedure `apoc.load.driver`: Caused by: java.lang.RuntimeException: Could not load driver class com.mysql.jdbc.Driver com.mysql.jdbc.Driver
neo4j-sh (?)$ CALL apoc.load.driver("com.mysql.jdbc.Driver");1 ms
WARNING: Failed to invoke procedure `apoc.load.driver`: Caused by: java.lang.RuntimeException: Could not load driver class com.mysql.jdbc.Driver com.mysql.jdbc.Driver
复制代码
少 mysql jar 包
(3) `Invalid input 'y': expected 'r/R' or 'a/A' `
neo4j-sh (?)$ with "jdbc:mysql://localhost:3306/test?user=root&password=root" as url cypher CALL apoc.load.jdbc(url,"test_table") YIELD row RETURN count(*);5 ms
WARNING: Invalid input 'y': expected 'r/R' or 'a/A' (line 1, column 73 (offset: 72))"with "jdbc:mysql://localhost:3306/test?user=root&password=root" as url cypher CALL apoc.load.jdbc(url,"test_table") YIELD row RETURN count(*)"
复制代码
原因:
缺少 MySQL jar 包
下载 MySQL jar 包放到 lib 或 plugs 目录下,重启 Neo4j 数据库就好了。
(4) Neo.ClientError.Statement.SyntaxError: Unknown function 'apoc.version' (line 1, column 8 (offset: 7))
Neo.ClientError.Statement.SyntaxError: Unknown function 'apoc.version' (line 1, column 8 (offset: 7)) "return apoc.version();" ^
复制代码
neo4j.conf 里配置有问题
(5) Neo.ClientError.Procedure.ProcedureNotFound: There is no procedure with the name `apoc.help` registered for this database instance.
Neo.ClientError.Procedure.ProcedureNotFound: There is no procedure with the name `apoc.help` registered for this database instance. Please ensure you've spelled the procedure name correctly and that the procedure is properly deployed.
复制代码
neo4j.conf 里配置有问题
References
[1] neo4j-apoc-procedures github 地址
[2] neo4j-apoc 官方文档
[3] cypher-dynamic-label-at-query cypher 语句中动态传 Label 参数
[4] Cannot run a query looking for a dynamic label
[5] how-to-use-apoc-load-csv-in-conjunction-with-apoc-create-node
[6] Neo4j 导入动态类型关系
[7] how-do-i-fix-this-neo4j-apoc-query-that-creates-nodes-froma-csv-file
评论