Neo4j APOC 使用

(1) 下载配置

(1.1) 下载对应版本的apoc jar包

  1. github apoc各个版本下载地址下载对应版本的apoc jar包,并放到 $NEO4J_HOME/plugs 目录下

wget https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/3.3.0.2/apoc-3.3.0.2-all.jar

(1.2) 配置 neo4j.conf 配置文件

neo4j.conf 最后一行添加

dbms.security.procedures.unrestricted=apoc.*
apoc.import.file.enabled=true
neo4j-sh (?)$ return apoc.version();
+----------------+
| apoc.version() |
+----------------+
| "3.3.0.2"      |
+----------------+
1 row
29 ms

查看apoc版本

return apoc.version();

call apoc.help('apoc');

call dbms.procedures

call dbms.functions()

(2) 导入json apoc.load.json

(2.1) 官方文档

neo4j-sh (?)$ CALL apoc.help("apoc.load.json");
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| type        | name                   | text
                                                   | signature                                                                                                                               | roles  | writes |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| "procedure" | "apoc.load.json"       | "apoc.load.json('url',path, config) YIELD value -  import JSON as stream of values if the JSON was an array or a single value if it was a map"
                                                   | "apoc.load.json(url :: STRING?, path =  :: STRING?, config = {} :: MAP?) :: (value :: MAP?)"                                            | <null> | <null> |
| "procedure" | "apoc.load.jsonArray"  | "apoc.load.jsonArray('url') YIELD value - load array from JSON URL (e.g. web-api) to import JSON as stream of values"
                                                   | "apoc.load.jsonArray(url :: STRING?, path =  :: STRING?) :: (value :: ANY?)"                                                            | <null> | <null> |
| "procedure" | "apoc.load.jsonParams" | "apoc.load.jsonParams('url',{header:value},payload, config) YIELD value - load from JSON URL (e.g. web-api) while sending headers / payload to import JSON as stream of values if the JSON was an array or a single value if it was a map" | "apoc.load.jsonParams(url :: STRING?, headers :: MAP?, payload :: STRING?, path =  :: STRING?, config = {} :: MAP?) :: (value :: MAP?)" | <null> | <null> |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
3 rows
42 ms

(2.2) 官方示例

WITH 'https://raw.githubusercontent.com/neo4j-contrib/neo4j-apoc-procedures/3.3.0.2/src/test/resources/person.json' AS url
CALL apoc.load.json(url) YIELD value as person
MERGE (p:Person {name:person.name})
   ON CREATE SET p.age = person.age,  p.children = size(person.children);
{"name":"Michael",
 "age": 41,
 "children": ["Selina","Rana","Selma"]
}

(3) 导入csv文件 apoc.load.csv

(3.1) 官方文档

neo4j-sh (?)$ CALL apoc.help("apoc.load.csv");
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| type        | name            | text
                                                                                             | signature
                               | roles  | writes |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| "procedure" | "apoc.load.csv" | "apoc.load.csv('url',{config}) YIELD lineNo, list, map - load CSV fom URL as stream of values,
 config contains any of: {skip:1,limit:5,header:false,sep:'TAB',ignore:['tmp'],nullValues:['na'],arraySep:';',mapping:{years:{type:'int',arraySep:'-',array:false,name:'age',ignore:false}}" | "apoc.load.csv(url :: STRING?, config = {} :: MAP?) :: (lineNo :: INTEGER?, list :: LIST? OF ANY?, strings :: LIST? OF STRING?, map :: MAP?, stringMap :: MAP?)" | <null> | <null> |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row
50 ms

(3.2) 用法

apoc.load.csv('url',{config}) YIELD lineNo, list, map - load CSV fom URL as stream of values, config contains any of: {skip:1,limit:5,header:false,sep:'TAB',ignore:['tmp'],arraySep:';',mapping:{years:{type:'int',arraySep:'-',array:false,name:'age',ignore:false}}

返回行号
call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield lineNo return lineNo;

返回map,header必须是true
call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield map return map;

返回行号+map
call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {skip:0, limit:5, header:true, sep:','}) yield lineNo,map return lineNo,map;

call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield map return map.uuid;

(4) 动态创建节点 apoc.create.node

CALL apoc.create.node(['Label'], {key:value,…​}) create node with dynamic labels

创建节点时动态传入参数

官方作者在stackoverflow说,cypther里不能动态传Label,这是由Cypther语言决定的,只能通过动态拼接字符串在程序里实现。

但是APOC里有apoc.create.node方法,可以动态传Label,应该是用的动态拼接字符串,源码还没看。

uuid,name,Label
2a3e275d9abc4c45913d8e7e619db87a,"张忆耕",Laebl1
db6ee76baff64db5956b6a5deb80acbf,"傅某评",Laebl2
8d2f4a74e7e7429390b3389d64d77637,"王苏维",Laebl3
4d0a5c3fa89a49e89f81a152f2aa259c,"蓝波",Laebl4
2f811c5341b84b70840acbdd451e2490,"范为华",Laebl5
311f7441fe4b4b7c8d5dc56d7590c1c3,"黄日波",Laebl1
b4d04f00887c49308e86afd0f0baf641,"徐国康",Laebl3
b717f94e2f3f4d25bb3dcd8a8efbc667,"王心迪",Laebl6

不加headers

load csv from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line CALL apoc.create.node([line[2]], {uuid:line[0], name:line[1]}) YIELD node return labels(node), node limit 5 ;

neo4j-sh (?)$ load csv  from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line  CALL apoc.create.node([line[2]], {uuid:line[0], name:line[1]}) YIELD node return labels(node), node limit 5 ;
+-----------------------------------------------------------------------------------+
| labels(node) | node                                                               |
+-----------------------------------------------------------------------------------+
| ["Label"]    | Node[18004017]{name:"name",uuid:"uuid"}                            |
| ["WB_AJ"]    | Node[18004018]{name:"张忆耕",uuid:"2a3e275d9abc4c45913d8e7e619db87a"} |
| ["WP_SJH"]   | Node[18004019]{name:"傅某评",uuid:"db6ee76baff64db5956b6a5deb80acbf"} |
| ["LG_JG"]    | Node[18004020]{name:"王苏维",uuid:"8d2f4a74e7e7429390b3389d64d77637"} |
| ["JTSGXX"]   | Node[18004021]{name:"蓝波",uuid:"4d0a5c3fa89a49e89f81a152f2aa259c"}  |
+-----------------------------------------------------------------------------------+
5 rows
84 ms

加headers

load csv with headers from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line CALL apoc.create.node([line.Label], {uuid:line.uuid, name:line.name}) YIELD node return labels(node), node limit 5 ;

neo4j-sh (?)$ load csv with headers from "file:/data/stale/data01/neo4j/node_uuid_10w.csv" as line with line  CALL apoc.create.node([line.Label], {uuid:line.uuid, name:line.name}) YIELD node return labels(node), node limit 5 ;
+-----------------------------------------------------------------------------------+
| labels(node) | node                                                               |
+-----------------------------------------------------------------------------------+
| ["WB_AJ"]    | Node[18004022]{name:"张忆耕",uuid:"2a3e275d9abc4c45913d8e7e619db87a"} |
| ["WP_SJH"]   | Node[18004023]{name:"傅某评",uuid:"db6ee76baff64db5956b6a5deb80acbf"} |
| ["LG_JG"]    | Node[18004024]{name:"王苏维",uuid:"8d2f4a74e7e7429390b3389d64d77637"} |
| ["JTSGXX"]   | Node[18004025]{name:"蓝波",uuid:"4d0a5c3fa89a49e89f81a152f2aa259c"}  |
| ["SWXX"]     | Node[18004026]{name:"范为华",uuid:"2f811c5341b84b70840acbdd451e2490"} |
+-----------------------------------------------------------------------------------+
5 rows
75 ms

(5) 动态创建关系 apoc.create.relationship

(5.1) 官方文档

neo4j-sh (?)$ CALL apoc.help("apoc.create.relationship");
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| type        | name                       | text                                                                                                           | signature
                                                                                                          | roles  | writes |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| "procedure" | "apoc.create.relationship" | "apoc.create.relationship(person1,'KNOWS',{key:value,...}, person2) create relationship with dynamic rel-type" | "apoc.create.relationship(from :: NODE?, relType :: STRING?, props :: MAP?, to :: NODE?) :: (rel :: RELATIONSHIP?)" | <null> | <null> |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row
61 ms

CALL apoc.create.relationship(person1,'KNOWS',{key:value,…​}, person2) create relationship with dynamic rel-type

(5.2) 示例

using periodic commit 10000
//call apoc.load.csv('file:/data/stale/data01/neo4j/apoc_test_data.csv', {limit:5, header:true, sep:'|'}) yield map return map
load csv with headers from 'file:/data/stale/data01/neo4j/relathionship_uuid_10w.csv' as line fieldterminator ','
match (p1:Person {uuid: line[0]})
match (p2:Person {uuid: line[1]})
WITH p1, p2, line
CALL apoc.create.relationship(p1, line[2], {name: line[3]}, p2) YIELD rel
RETURN rel
merge (n1:Test {name:'zhangsan'}) merge (n2:Test {name:'lisi'})
with n1, n2
call apoc.create.relationship(n1, 'R1', {}, n2) YIELD rel
return id(rel), type(rel), rel ;
using periodic commit 10000
load csv from 'file:/data/stale/data01/neo4j/relathionship_uuid_10w.csv' as line fieldterminator ','
merge (n1:Test {uuid: line[0]})
merge (n2:Test {uuid: line[1]})
with n1, n2, line
CALL apoc.create.relationship(n1, line[2], {}, n2) YIELD rel
return id(rel), type(rel), rel ;
using periodic commit 10000
load csv with headers from 'file:/data/stale/data01/neo4j/relathionship_uuid_10w.csv' as line fieldterminator ','
merge (n1:Test {uuid: line.uuid1})
merge (n2:Test {uuid: line.uuid2})
with n1, n2, line
CALL apoc.create.relationship(n1, line.type, {}, n2) YIELD rel
return id(rel), type(rel), rel ;
create (n:Test {name:'zhangsan'}) return n;

create constraint on (n:Test) assert n.name is unique;

merge (n:Test {name:'zhangsan'})-[r:Friend]->(m:Test {name:'lisi'});

{
  "signature": 127,
  "fields": [
    {
      "code": "Neo.ClientError.Schema.ConstraintValidationFailed",
      "message": "Node 1000020 already exists with label Test and property \"name\"=[zhangsan]"
    }
  ],
  "timings": {
    "type": "client"
  }
}

(6) 热启动 apoc.warmup.run

neo4j-sh (?)$  CALL apoc.warmup.run();
+--------------------------------------------------------------------------------------------------------------------------+
| pageSize | nodesPerPage | nodesTotal | nodePages | nodesTime | relsPerPage | relsTotal | relPages | relsTime | totalTime |
+--------------------------------------------------------------------------------------------------------------------------+
| 8192     | 546          | 119510256  | 234717    | 9         | 240         | 131001022 | 587505   | 21       | 30        |
+--------------------------------------------------------------------------------------------------------------------------+
1 row
30337 ms

(7) 从RDBMS导入数据 apoc.load.jdbc

(7.1) 官方文档

neo4j-sh (?)$ CALL apoc.help("apoc.load.jdbc");
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| type        | name                   | text
                                              | signature                                                                                                | roles  | writes |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| "procedure" | "apoc.load.jdbc"       | "apoc.load.jdbc('key or url','table or kernelTransaction') YIELD row - load from relational database, from a full table or a sql kernelTransaction"                          | "apoc.load.jdbc(jdbc :: STRING?, tableOrSql :: STRING?, params = [] :: LIST? OF ANY?) :: (row :: MAP?)"  | <null> | <null> |
| "procedure" | "apoc.load.jdbcParams" | "deprecated - please use: apoc.load.jdbc('key or url','kernelTransaction',[params]) YIELD row - load from relational database, from a sql kernelTransaction with parameters" | "apoc.load.jdbcParams(jdbc :: STRING?, sql :: STRING?, params :: LIST? OF ANY?) :: (row :: MAP?)"        | <null> | <null> |
| "procedure" | "apoc.load.jdbcUpdate" | "apoc.load.jdbcUpdate('key or url','kernelTransaction',[params]) YIELD row - update relational database, from a SQL kernelTransaction with optional parameters"              | "apoc.load.jdbcUpdate(jdbc :: STRING?, query :: STRING?, params = [] :: LIST? OF ANY?) :: (row :: MAP?)" | <null> | <null> |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
3 rows
180 ms

(7.2) 使用

CALL apoc.load.driver("com.mysql.jdbc.Driver");

(7.3.1) 查看数据个数

with "jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin" as url
CALL apoc.load.jdbc(url, "city") YIELD row
RETURN count(*);

(7.3.2) 查看数据样例

// 固定参数
with "jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin" as url,
     "select id, name from city where level = 1" as sql
CALL apoc.load.jdbc(url, sql, []) YIELD row
RETURN row LIMIT 10;

(7.3.3) 动态传入参数

// 动态传参数
with "jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin" as url,
     "select id, name from city where level = ? and is_deleted = ? " as sql
CALL apoc.load.jdbcParams(url, sql, [1, 'N']) YIELD row
RETURN row LIMIT 10;

(7.3.4) 分批处理

CALL apoc.periodic.iterate(
  'call apoc.load.jdbc("jdbc:mysql://localhost:3306/dataserver?user=admin&password=admin", " select id, name, level, pid  from city where level = 6 and is_deleted = \'N\' ", [])',
  'MATCH (s:Test {id: toInteger(row.pid)}) MERGE (s)-[r:Connect]->(n:Group {id: toInteger(row.id)}) set n += row ',
  {batchSize:10000, parallel:true}
)

(8) 遇到的错误

(1) There is no procedure with the name apoc.version registered

There is no procedure with the nameapoc.versionregistered for this database instance. Please ensure you've spelled the procedure name correctly and that the procedure is properly deployed.

  1. 检查是否下载对应的apoc jar包,并放到plugs目录下
  2. 在neo4j.conf里添加 dbms.security.procedures.unrestricted=apoc.* apoc.import.file.enabled=true 的配置
  3. 重启Neo4j数据库

(2) Failed to invoke procedure apoc.load.driver: Caused by: java.lang.RuntimeException: Could not load driver class com.mysql.jdbc.Driver com.mysql.jdbc.Driver

neo4j-sh (?)$ CALL apoc.load.driver("com.mysql.jdbc.Driver");
1 ms

WARNING: Failed to invoke procedure `apoc.load.driver`: Caused by: java.lang.RuntimeException: Could not load driver class com.mysql.jdbc.Driver com.mysql.jdbc.Driver

少 mysql jar包

(3) Invalid input 'y': expected 'r/R' or 'a/A'

neo4j-sh (?)$ with "jdbc:mysql://localhost:3306/test?user=root&password=root" as url cypher CALL apoc.load.jdbc(url,"test_table") YIELD row RETURN count(*);
5 ms

WARNING: Invalid input 'y': expected 'r/R' or 'a/A' (line 1, column 73 (offset: 72))
"with "jdbc:mysql://localhost:3306/test?user=root&password=root" as url cypher CALL apoc.load.jdbc(url,"test_table") YIELD row RETURN count(*)"

原因:
缺少MySQL jar包
下载MySQL jar包放到lib或plugs目录下,重启Neo4j数据库就好了。

(4) Neo.ClientError.Statement.SyntaxError: Unknown function ‘apoc.version’ (line 1, column 8 (offset: 7))

Neo.ClientError.Statement.SyntaxError: Unknown function 'apoc.version' (line 1, column 8 (offset: 7))
 "return apoc.version();"
 ^

neo4j.conf 里配置有问题

(5) Neo.ClientError.Procedure.ProcedureNotFound: There is no procedure with the name apoc.help registered for this database instance.

Neo.ClientError.Procedure.ProcedureNotFound: There is no procedure with the name `apoc.help` registered for this database instance. Please ensure you've spelled the procedure name correctly and that the procedure is properly deployed.

neo4j.conf 里配置有问题

References

[1] neo4j-apoc-procedures github地址
[2] neo4j-apoc 官方文档
[3] cypher-dynamic-label-at-query cypher语句中动态传Label参数
[4] Cannot run a query looking for a dynamic label
[5] how-to-use-apoc-load-csv-in-conjunction-with-apoc-create-node
[6] Neo4j 导入动态类型关系
[7] how-do-i-fix-this-neo4j-apoc-query-that-creates-nodes-froma-csv-file