写点什么

Neo4j APOC 使用

用户头像
wkq2786130
关注
发布于: 2020 年 07 月 22 日

原文 http://weikeqin.com/2018/04/17/neo4j-apoc-use/

(1) 下载配置


(1.1) 下载对应版本的 apoc jar 包

  1. 从 github apoc 各个版本下载地址下载对应版本的 apoc jar 包,并放到 $NEO4J_HOME/plugs 目录下


wget https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/3.3.0.2/apoc-3.3.0.2-all.jar


(1.2) 配置 neo4j.conf 配置文件


neo4j.conf 最后一行添加

dbms.security.procedures.unrestricted=apoc.*apoc.import.file.enabled=true
复制代码


neo4j-sh (?)$ return apoc.version();+----------------+| apoc.version() |+----------------+| "3.3.0.2"      |+----------------+1 row29 ms
复制代码


<!--more-->


查看 apoc 版本


return apoc.version();


call apoc.help('apoc');


call dbms.procedures


call dbms.functions()


(2) 导入 json apoc.load.json


(2.1) 官方文档


neo4j-sh (?)$ CALL apoc.help("apoc.load.json");+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| type        | name                   | text                                                   | signature                                                                                                                               | roles  | writes |+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| "procedure" | "apoc.load.json"       | "apoc.load.json('url',path, config) YIELD value -  import JSON as stream of values if the JSON was an array or a single value if it was a map"                                                   | "apoc.load.json(url :: STRING?, path =  :: STRING?, config = {} :: MAP?) :: (value :: MAP?)"                                            | <null> | <null> || "procedure" | "apoc.load.jsonArray"  | "apoc.load.jsonArray('url') YIELD value - load array from JSON URL (e.g. web-api) to import JSON as stream of values"                                                   | "apoc.load.jsonArray(url :: STRING?, path =  :: STRING?) :: (value :: ANY?)"                                                            | <null> | <null> || "procedure" | "apoc.load.jsonParams" | "apoc.load.jsonParams('url',{header:value},payload, config) YIELD value - load from JSON URL (e.g. web-api) while sending headers / payload to import JSON as stream of values if the JSON was an array or a single value if it was a map" | "apoc.load.jsonParams(url :: STRING?, headers :: MAP?, payload :: STRING?, path =  :: STRING?, config = {} :: MAP?) :: (value :: MAP?)" | <null> | <null> |+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+3 rows42 ms
复制代码


(2.2) 官方示例


WITH 'https://raw.githubusercontent.com/neo4j-contrib/neo4j-apoc-procedures/3.3.0.2/src/test/resources/person.json' AS urlCALL apoc.load.json(url) YIELD value as personMERGE (p:Person {name:person.name})   ON CREATE SET p.age = person.age,  p.children = size(person.children);
复制代码


{"name":"Michael", "age": 41, "children": ["Selina","Rana","Selma"]}
复制代码


(3) 导入 csv 文件 apoc.load.csv


(3.1) 官方文档


neo4j-sh (?)$ CALL apoc.help("apoc.load.csv");+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| type        | name            | text                                                                                             | signature                               | roles  | writes |+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| "procedure" | "apoc.load.csv" | "apoc.load.csv('url',{config}) YIELD lineNo, list, map - load CSV fom URL as stream of values, config contains any of: {skip:1,limit:5,header:false,sep:'TAB',ignore:['tmp'],nullValues:['na'],arraySep:';',mapping:{years:{type:'int',arraySep:'-',array:false,name:'age',ignore:false}}" | "apoc.load.csv(url :: STRING?, config = {} :: MAP?) :: (lineNo :: INTEGER?, list :: LIST? OF ANY?, strings :: LIST? OF STRING?, map :: MAP?, stringMap :: MAP?)" | <null> | <null> |+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+1 row50 ms
复制代码


(3.2) 用法


apoc.load.csv('url',{config}) YIELD lineNo, list, map - load CSV fom URL as stream of values, config contains any of: {skip:1,limit:5,header:false,sep:'TAB',ignore:['tmp'],arraySep:';',mapping:{years:{type:'int',arraySep:'-',array:false,name:'age',ignore:false}}


返回行号

call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield lineNo return lineNo;


返回 map,header 必须是 true

call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield map return map;


返回行号+map

call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {skip:0, limit:5, header:true, sep:','}) yield lineNo,map return lineNo,map;


call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield map return map.uuid;


(4) 动态创建节点 apoc.create.node


CALL apoc.create.node(['Label'], {key:value,…​}) create node with dynamic labels


创建节点时动态传入参数


官方作者在 stackoverflow 说,cypther 里不能动态传 Label,这是由 Cypther 语言决定的,只能通过动态拼接字符串在程序里实现。


但是 APOC 里有 apoc.create.node 方法,可以动态传 Label,应该是用的动态拼接字符串,源码还没看。


uuid,name,Label2a3e275d9abc4c45913d8e7e619db87a,"张忆耕",Laebl1db6ee76baff64db5956b6a5deb80acbf,"傅某评",Laebl28d2f4a74e7e7429390b3389d64d77637,"王苏维",Laebl34d0a5c3fa89a49e89f81a152f2aa259c,"蓝波",Laebl42f811c5341b84b70840acbdd451e2490,"范为华",Laebl5311f7441fe4b4b7c8d5dc56d7590c1c3,"黄日波",Laebl1b4d04f00887c49308e86afd0f0baf641,"徐国康",Laebl3b717f94e2f3f4d25bb3dcd8a8efbc667,"王心迪",Laebl6
复制代码


不加 headers

load csv from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line CALL apoc.create.node([line[2]], {uuid:line[0], name:line[1]}) YIELD node return labels(node), node limit 5 ;


neo4j-sh (?)$ load csv  from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line  CALL apoc.create.node([line[2]], {uuid:line[0], name:line[1]}) YIELD node return labels(node), node limit 5 ;+-----------------------------------------------------------------------------------+| labels(node) | node                                                               |+-----------------------------------------------------------------------------------+| ["Label"]    | Node[18004017]{name:"name",uuid:"uuid"}                            || ["WB_AJ"]    | Node[18004018]{name:"张忆耕",uuid:"2a3e275d9abc4c45913d8e7e619db87a"} || ["WP_SJH"]   | Node[18004019]{name:"傅某评",uuid:"db6ee76baff64db5956b6a5deb80acbf"} || ["LG_JG"]    | Node[18004020]{name:"王苏维",uuid:"8d2f4a74e7e7429390b3389d64d77637"} || ["JTSGXX"]   | Node[18004021]{name:"蓝波",uuid:"4d0a5c3fa89a49e89f81a152f2aa259c"}  |+-----------------------------------------------------------------------------------+5 rows84 ms
复制代码


加 headers


load csv with headers from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line CALL apoc.create.node([line.Label], {uuid:line.uuid, name:line.name}) YIELD node return labels(node), node limit 5 ;


neo4j-sh (?)$ load csv with headers from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line  CALL apoc.create.node([line.Label], {uuid:line.uuid, name:line.name}) YIELD node return labels(node), node limit 5 ;+-----------------------------------------------------------------------------------+| labels(node) | node                                                               |+-----------------------------------------------------------------------------------+| ["WB_AJ"]    | Node[18004022]{name:"张忆耕",uuid:"2a3e275d9abc4c45913d8e7e619db87a"} || ["WP_SJH"]   | Node[18004023]{name:"傅某评",uuid:"db6ee76baff64db5956b6a5deb80acbf"} || ["LG_JG"]    | Node[18004024]{name:"王苏维",uuid:"8d2f4a74e7e7429390b3389d64d77637"} || ["JTSGXX"]   | Node[18004025]{name:"蓝波",uuid:"4d0a5c3fa89a49e89f81a152f2aa259c"}  || ["SWXX"]     | Node[18004026]{name:"范为华",uuid:"2f811c5341b84b70840acbdd451e2490"} |+-----------------------------------------------------------------------------------+5 rows75 ms
复制代码


(5) 动态创建关系 apoc.create.relationship


(5.1) 官方文档


neo4j-sh (?)$ CALL apoc.help("apoc.create.relationship");+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| type        | name                       | text                                                                                                           | signature                                                                                                          | roles  | writes |+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| "procedure" | "apoc.create.relationship" | "apoc.create.relationship(person1,'KNOWS',{key:value,...}, person2) create relationship with dynamic rel-type" | "apoc.create.relationship(from :: NODE?, relType :: STRING?, props :: MAP?, to :: NODE?) :: (rel :: RELATIONSHIP?)" | <null> | <null> |+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+1 row61 ms
复制代码


CALL apoc.create.relationship(person1,'KNOWS',{key:value,…​}, person2) create relationship with dynamic rel-type


(5.2) 示例


using periodic commit 10000//call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield map return mapload csv with headers from 'file:/data/stale/data01/neo4j/relathionship_uuid_10w.csv' as line fieldterminator ','match (p1:Person {uuid: line[0]})match (p2:Person {uuid: line[1]})WITH p1, p2, lineCALL apoc.create.relationship(p1, line[2], {name: line[3]}, p2) YIELD relRETURN rel
复制代码


merge (n1:Test {name:'zhangsan'}) merge (n2:Test {name:'lisi'})with n1, n2call apoc.create.relationship(n1, 'R1', {}, n2) YIELD relreturn id(rel), type(rel), rel ;
复制代码


using periodic commit 10000load csv from 'file:/data/stale/data01/neo4j/relathionship_uuid_10w.csv' as line fieldterminator ','merge (n1:Test {uuid: line[0]})merge (n2:Test {uuid: line[1]})with n1, n2, lineCALL apoc.create.relationship(n1, line[2], {}, n2) YIELD relreturn id(rel), type(rel), rel ;
复制代码


using periodic commit 10000load csv with headers from 'file:/data/stale/data01/neo4j/relathionship_uuid_10w.csv' as line fieldterminator ','merge (n1:Test {uuid: line.uuid1})merge (n2:Test {uuid: line.uuid2})with n1, n2, lineCALL apoc.create.relationship(n1, line.type, {}, n2) YIELD relreturn id(rel), type(rel), rel ;
复制代码


create (n:Test {name:'zhangsan'}) return n;
create constraint on (n:Test) assert n.name is unique;
merge (n:Test {name:'zhangsan'})-[r:Friend]->(m:Test {name:'lisi'});
{ "signature": 127, "fields": [ { "code": "Neo.ClientError.Schema.ConstraintValidationFailed", "message": "Node 1000020 already exists with label Test and property \"name\"=[zhangsan]" } ], "timings": { "type": "client" }}
复制代码


(6) 热启动 apoc.warmup.run


neo4j-sh (?)$  CALL apoc.warmup.run();+--------------------------------------------------------------------------------------------------------------------------+| pageSize | nodesPerPage | nodesTotal | nodePages | nodesTime | relsPerPage | relsTotal | relPages | relsTime | totalTime |+--------------------------------------------------------------------------------------------------------------------------+| 8192     | 546          | 119510256  | 234717    | 9         | 240         | 131001022 | 587505   | 21       | 30        |+--------------------------------------------------------------------------------------------------------------------------+1 row30337 ms
复制代码


(7) 从 RDBMS 导入数据 apoc.load.jdbc


(7.1) 官方文档


neo4j-sh (?)$ CALL apoc.help("apoc.load.jdbc");+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| type        | name                   | text                                              | signature                                                                                                | roles  | writes |+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| "procedure" | "apoc.load.jdbc"       | "apoc.load.jdbc('key or url','table or kernelTransaction') YIELD row - load from relational database, from a full table or a sql kernelTransaction"                          | "apoc.load.jdbc(jdbc :: STRING?, tableOrSql :: STRING?, params = [] :: LIST? OF ANY?) :: (row :: MAP?)"  | <null> | <null> || "procedure" | "apoc.load.jdbcParams" | "deprecated - please use: apoc.load.jdbc('key or url','kernelTransaction',[params]) YIELD row - load from relational database, from a sql kernelTransaction with parameters" | "apoc.load.jdbcParams(jdbc :: STRING?, sql :: STRING?, params :: LIST? OF ANY?) :: (row :: MAP?)"        | <null> | <null> || "procedure" | "apoc.load.jdbcUpdate" | "apoc.load.jdbcUpdate('key or url','kernelTransaction',[params]) YIELD row - update relational database, from a SQL kernelTransaction with optional parameters"              | "apoc.load.jdbcUpdate(jdbc :: STRING?, query :: STRING?, params = [] :: LIST? OF ANY?) :: (row :: MAP?)" | <null> | <null> |+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+3 rows180 ms
复制代码


(7.2) 使用


CALL apoc.load.driver("com.mysql.jdbc.Driver");


(7.3.1) 查看数据个数


with "jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin" as urlCALL apoc.load.jdbc(url, "city") YIELD rowRETURN count(*);
复制代码


(7.3.2) 查看数据样例


// 固定参数with "jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin" as url,     "select id, name from city where level = 1" as sqlCALL apoc.load.jdbc(url, sql, []) YIELD rowRETURN row LIMIT 10;
复制代码


(7.3.3) 动态传入参数


// 动态传参数with "jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin" as url,     "select id, name from city where level = ? and is_deleted = ? " as sqlCALL apoc.load.jdbcParams(url, sql, [1, 'N']) YIELD rowRETURN row LIMIT 10;
复制代码


(7.3.4) 分批处理


CALL apoc.periodic.iterate(  'call apoc.load.jdbc("jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin", " select id, name, level, pid  from city where level = 6 and is_deleted = \'N\' ", [])',  'MATCH (s:Test {id: toInteger(row.pid)}) MERGE (s)-[r:Connect]->(n:Group {id: toInteger(row.id)}) set n += row ',  {batchSize:10000, parallel:true})
复制代码


(8) 遇到的错误


(1) There is no procedure with the name `apoc.version` registered


There is no procedure with the name apoc.version registered for this database instance. Please ensure you've spelled the procedure name correctly and that the procedure is properly deployed.


1. 检查是否下载对应的 apoc jar 包,并放到 plugs 目录下

2. 在 neo4j.conf 里添加 dbms.security.procedures.unrestricted=apoc.* apoc.import.file.enabled=true 的配置

3. 重启 Neo4j 数据库


(2) Failed to invoke procedure `apoc.load.driver`: Caused by: java.lang.RuntimeException: Could not load driver class com.mysql.jdbc.Driver com.mysql.jdbc.Driver


neo4j-sh (?)$ CALL apoc.load.driver("com.mysql.jdbc.Driver");1 ms
WARNING: Failed to invoke procedure `apoc.load.driver`: Caused by: java.lang.RuntimeException: Could not load driver class com.mysql.jdbc.Driver com.mysql.jdbc.Driver
复制代码


少 mysql jar 包


(3) `Invalid input 'y': expected 'r/R' or 'a/A' `

neo4j-sh (?)$ with "jdbc:mysql://localhost:3306/test?user=root&password=root" as url cypher CALL apoc.load.jdbc(url,"test_table") YIELD row RETURN count(*);5 ms
WARNING: Invalid input 'y': expected 'r/R' or 'a/A' (line 1, column 73 (offset: 72))"with "jdbc:mysql://localhost:3306/test?user=root&password=root" as url cypher CALL apoc.load.jdbc(url,"test_table") YIELD row RETURN count(*)"
复制代码


原因:

缺少 MySQL jar 包

下载 MySQL jar 包放到 lib 或 plugs 目录下,重启 Neo4j 数据库就好了。


(4) Neo.ClientError.Statement.SyntaxError: Unknown function 'apoc.version' (line 1, column 8 (offset: 7))


Neo.ClientError.Statement.SyntaxError: Unknown function 'apoc.version' (line 1, column 8 (offset: 7)) "return apoc.version();" ^
复制代码


neo4j.conf 里配置有问题


(5) Neo.ClientError.Procedure.ProcedureNotFound: There is no procedure with the name `apoc.help` registered for this database instance.


Neo.ClientError.Procedure.ProcedureNotFound: There is no procedure with the name `apoc.help` registered for this database instance. Please ensure you've spelled the procedure name correctly and that the procedure is properly deployed.
复制代码


neo4j.conf 里配置有问题


References

[1] neo4j-apoc-procedures github 地址

[2] neo4j-apoc 官方文档

[3] cypher-dynamic-label-at-query cypher 语句中动态传 Label 参数

[4] Cannot run a query looking for a dynamic label

[5] how-to-use-apoc-load-csv-in-conjunction-with-apoc-create-node

[6] Neo4j 导入动态类型关系

[7] how-do-i-fix-this-neo4j-apoc-query-that-creates-nodes-froma-csv-file


发布于: 2020 年 07 月 22 日阅读数: 121
用户头像

wkq2786130

关注

hello 2018.09.28 加入

http://weikeqin.com/

评论

发布
暂无评论
Neo4j APOC 使用