使用apoc将数据从数据库导入neo4j

1、创建实体

CREATE CONSTRAINT uniq_law_id  ON (p:Law) ASSERT p.id IS UNIQUE;

CALL apoc.periodic.iterate(
      'call apoc.load.jdbc("jdbc:clickhouse://192.xxx.x.xxx:8123/xxx?user=xxx&password=xxx", " select * from xxx.xxx", []) ',
      'CALL apoc.merge.node([row.ent_label], 
      {id: row.id},
      {name:row.name,level:row.level,office:row.office,publish:row.publish,expiry:row.expiry,law_type:row.law_type,status:row.status},
      {name:row.name,level:row.level,office:row.office,publish:row.publish,expiry:row.expiry,law_type:row.law_type,status:row.status}
    ) yield node RETURN count(*)',
      {batchSize:1000, parallel:false}
    )
;

这段代码的目的是从 ClickHouse 数据库中加载数据到 Neo4j 图数据库,并在加载过程中使用 APOC(Awesome Procedures on Cypher)库提供的 apoc.merge.node 过程来合并数据,确保在图数据库中的节点具有唯一性。

逐行解释这段代码:

  1. CREATE CONSTRAINT uniq_law_id ON (p:Law) ASSERT p.id IS UNIQUE;: 这一行创建了一个唯一约束,确保 "Law" 类型的节点中的 id 属性是唯一的。这是为了防止在后续的数据加载过程中出现重复的节点。

  2. CALL apoc.periodic.iterate('call apoc.load.jdbc("jdbc:clickhouse://192.xxx.x.xxx:8123/xxx?user=xxx&password=xxx", "select * from xxx.xxx", [])', ...: 这一行使用 APOC 提供的 apoc.periodic.iterate 过程,该过程允许对数据进行迭代处理。

  3. 'call apoc.load.jdbc("jdbc:clickhouse://192.xxx.x.xxx:8123/xxx?user=xxx&password=xxx", " select * from xxx.xxx", [])': 在迭代中,首先调用 apoc.load.jdbc 过程,从 ClickHouse 数据库中加载数据。这里使用的 JDBC 连接字符串指向 ClickHouse 数据库,提供用户名和密码用于连接

  4. 'CALL apoc.merge.node([row.ent_label], ...': 在每次迭代中,对于从 ClickHouse 加载的每一行数据,调用 apoc.merge.node` 过程

    • [row.ent_label]: 这是一个用于标记节点标签的列表。在这里,使用了 row.ent_label 作为节点的标签,可能是从 ClickHouse 数据库中的某个列获取的。

    • {id: row.id}, {name:row.name,level:row.level,...}: 这里定义了节点的属性。{id: row.id} 表示节点的 id 属性,其值来自加载的行数据中的 id 列。同样的逻辑适用于其他属性。

    • yield node RETURN count(*): 返回每次迭代中处理的节点数量。这可以帮助你了解迭代的进展。

    • {batchSize:1000, parallel:false}: 定义了迭代的参数。batchSize 表示每次迭代处理的行数,parallel 表示是否并行处理。在这里,设置为串行(false)。

总的来说,这段代码的目的是从 ClickHouse 数据库中加载数据到 Neo4j 图数据库,确保在图数据库中的 "Law" 节点具有唯一的 id 属性。在加载的过程中,使用了 APOC 库的 apoc.merge.node 过程,它可以合并节点,确保数据的唯一性。

2、创建关系

CALL apoc.periodic.iterate(
  'call apoc.load.jdbc("jdbc:clickhouse://192.xxx.x.xxx:8123/xxx?user=xxx&password=xxx", " select * from xxx.xxx", [])yield row',
  'merge (n1:row.from_label {id: row.from_id})
  merge (n2:row.to_label {id: row.to_id})
  with n1, n2, row
  CALL apoc.merge.relationship(
  n1, 
  row.rel_type,
  row.name,
  {},
  {}, 
  n2, 
  {}
  ) YIELD rel
  return id(rel), type(rel), rel',
  {batchSize:2000, parallel:false}
  )
;

这段代码的目的是从 ClickHouse 数据库中加载关系数据到 Neo4j 图数据库,并在加载的过程中使用 APOC 库提供的 apoc.merge.relationship 过程来合并关系,确保在图数据库中的关系具有唯一性。

逐行解释这段代码:

  1. CALL apoc.periodic.iterate(...: 这是一个调用 APOC 提供的 apoc.periodic.iterate 过程的 Cypher 查询。该过程允许对数据进行迭代处理

  2. 'call apoc.load.jdbc("jdbc:clickhouse://192.168.1.168:8123/law?user=default&password=QuBmUhBv", " select * from law.rel_law_bzjtkx_include", []) yield row': 在迭代中,首先调用 apoc.load.jdbc 过程,从 ClickHouse 数据库中加载关系数据。这里使用的是 ClickHouse 数据库的 JDBC 连接字符串,提供用户名和密码用于连接。

  3. 'merge (n1:row.from_label {id: row.from_id}) merge (n2:row.to_label {id: row.to_id}) with n1, n2, row': 对于从 ClickHouse 加载的每一行数据,使merge 关键字创建起始节点 n1 和目标节点 n2。这里使用了 row.from_labelrow.to_label 作为节点标签,并使用 row.from_idrow.to_id 作为节点的 id 属性值

  4. CALL apoc.merge.relationship(n1, row.rel_type, row.name, {}, {}, n2, {}) YIELD rel: 使用 APOC 提供的 apoc.merge.relationship 过程,该过程可以合并关系。具体参数包括:

    • n1: 起始节点。
    • row.rel_type: 关系类型,来自于加载的数据的 rel_type 列。
    • row.name: 关系的名称,来自于加载的数据的 name 列。
    • {}: 关系的属性,这里为空对象。
    • {}: 关系的属性更新规则,这里为空对象。
    • n2: 目标节点。
  5. YIELD rel: 返回被合并的关系对象。

  6. return id(rel), type(rel), rel': 返回合并关系的 ID、关系类型和关系对象。

  7. {batchSize:2000, parallel:false}: 定义了迭代的参数。batchSize 表示每次迭代处理的行数parallel 表示是否并行处理。在这里,设置为串行(false)。

总体来说,这段代码的目的是从 ClickHouse 数据库中加载关系数据到 Neo4j 图数据库,确保在图数据库中的关系具有唯一性。在加载的过程中,使用了 APOC 库的 apoc.merge.relationship 过程,它可以合并关系,确保数据的唯一性。

相关推荐

  1. 使用apoc数据数据库导入neo4j

    2024-02-21 07:26:03       32 阅读
  2. pythoncsv数据导入neo4j

    2024-02-21 07:26:03       31 阅读
  3. neo4j导出导入数据库

    2024-02-21 07:26:03       30 阅读
  4. 使用python向neo4j中批量导入txt和csv三元组数据

    2024-02-21 07:26:03       47 阅读
  5. neo4j查询语言Cypher详解(五)--apoc

    2024-02-21 07:26:03       40 阅读
  6. 数据库Neo4j集成springboot及导入数据踩坑问题

    2024-02-21 07:26:03       33 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-02-21 07:26:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-02-21 07:26:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-02-21 07:26:03       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-02-21 07:26:03       20 阅读

热门阅读

  1. 小脑萎缩患者平时生活中应该注意哪些?

    2024-02-21 07:26:03       24 阅读
  2. 【clickhouse笔记】 查询表或列的磁盘占用大小

    2024-02-21 07:26:03       30 阅读
  3. 高速自动驾驶智慧匝道(HIC)系统功能规范

    2024-02-21 07:26:03       32 阅读
  4. 小程序怎么开发?怎么开发自己的小程序

    2024-02-21 07:26:03       34 阅读
  5. 使用 openssl 进行哈希计算

    2024-02-21 07:26:03       33 阅读