您的位置: 首页 - 站长

php 网站缓存文件网站营销平台

当前位置: 首页 > news >正文

php 网站缓存文件,网站营销平台,东莞做微网站建设价格,多功能产品设计neo4j 图表数据导入到 TuGraph 代码文件说明后文 前言:近期在引入阿里的 TuGraph 图数据库#xff0c;需要将 原 neo4j 数据导入到新的 tugraph 数据库中。预期走csv文件导入导出#xff0c;但因为格式和数据库设计问题#xff0c;操作起来比较麻烦#xff08;可能是个人没… neo4j 图表数据导入到 TuGraph 代码文件说明后文 前言:近期在引入阿里的 TuGraph 图数据库需要将 原 neo4j 数据导入到新的 tugraph 数据库中。预期走csv文件导入导出但因为格式和数据库设计问题操作起来比较麻烦可能是个人没有发现其他比较方便的办法因此写了一个 python 脚本进行数据导入操作。 使用python3,TuGraph 4.5.1 遇到的问题tugraph 的节点需要一个主键这个只能自行指定。 支持指定节点指定边。自动创建不存在的节点/边数据导入批量导入节点单条导入边试过批量的tugraph好像不支持 官网的 CALL db.upsertEdge 我的版本也还没实现。 导入图示: 代码文件

import time

import json from typing import Dict, List, castclass GraphConnector():db_type: str tugraphdriver: str boltdialect: str cypherbatch_size: int 100# 指定节点的主键node_pro_key: dict dict({Ren:zjhm,Aj:ajbh,Che:rowkey,Hh:rowkey,Sj:dhhm})#指定需要导入的边specified_relation [ajgx,th,tfj,sysj,sycl]#指定需要导入的节点specified_node [Ren,Aj,Che,Sj,Hh]def init(self, driver, graph):Initialize the connector with a Neo4j driver.self._driver driverself._schema Noneself._graph graphself._session Noneclassmethoddef from_uri_db(cls, host: str, port: int, user: str, pwd: str, db_name: str, db_type: str) - GraphConnector:Create a new TuGraphConnector from host, port, user, pwd, db_name.try:from neo4j import GraphDatabasecls.db_type db_typedb_url f{cls.driver}://{host}:{str(port)}driver GraphDatabase.driver(db_url, auth(user, pwd))driver.verify_connectivity()return cast(GraphConnector, cls(driverdriver, graphdb_name))except ImportError as err:raise ImportError(neo4j package is not installed, please install it with pip install neo4j) from errdef create_graph_new(self, graph_name: str) - bool:Create a new graph in the database if it doesnt already exist.try:with self._driver.session() as session:graph_list session.run(CALL dbms.graph.listGraphs()).data()exists any(item[graph_name] graph_name for item in graph_list)if not exists:session.run(fCALL dbms.graph.createGraph({graph_name}, , 2048))except Exception as e:raise Exception(fFailed to create graph {graph_name}: {str(e)}) from ereturn not existsdef create_vertex_labels(self, json_data):try:with self._driver.session(databaseself._graph) as session:# graph_list session.run(fCALL db.createVertexLabelByJson({json_data})).data()session.run(CALL db.createVertexLabelByJson(\(json_data),json_datajson_data)except Exception as e:raise Exception(fFailed to create vertex_labels ) from e# 批量更新节点没有就新增有就更新def batch_update_node(self, json_data):try:with self._driver.session(databaseself._graph) as session:# graph_list session.run(fCALL db.createVertexLabelByJson({json_data})).data()session.upsertVertex(CALL db.upsertVertex(\)json_data),json_datajson_data)except Exception as e:raise Exception(fFailed to create vertex_labels ) from e# 批量更新关系没有就新增有就更新def batch_update_edge(self, json_data):try:with self._driver.session(databaseself._graph) as session:# graph_list session.run(fCALL db.createVertexLabelByJson({json_data})).data()session.upsertVertex(CALL db.upsertEdge(\(json_data),json_datajson_data)except Exception as e:raise Exception(fFailed to create vertex_labels ) from edef create_edge_labels(self, json_data):try:with self._driver.session(databaseself._graph) as session:# graph_list session.run(fCALL db.createVertexLabelByJson({json_data})).data()session.run(CALL db.createEdgeLabelByJson(\)json_data),json_datajson_data)except Exception as e:raise Exception(fFailed to create vertex_labels ) from edef run(self, query: str, fetch: str all) - List:Run query.with self._driver.session(databaseself._graph) as session:try:result session.run(query)return list(result)except Exception as e:raise Exception(fQuery execution failed: {e}\nQuery: {query}) from edef check_label_exists(self, label: str, label_type: str) - bool:with self._driver.session(databaseself._graph) as session:# Run the query to get vertex labelsif label_type node:raw_vertex_labels session.run(CALL db.vertexLabels()).data()vertex_labels [table_name[label] for table_name in raw_vertex_labels]if label in vertex_labels:return Trueelse:# Run the query to get edge labelsraw_edge_labels session.run(CALL db.edgeLabels()).data()edge_labels [table_name[label] for table_name in raw_edge_labels]if label in edge_labels:return Truereturn False# 获取节点或边的结构def get_columns(self, table_name: str, table_type: str vertex) - List[Dict]:Retrieve the column for a specified vertex or edge table in the graph db.with self._driver.session(databaseself._graph) as session:data []result Noneif table_type vertex:result session.run(fCALL db.getVertexSchema({table_name})).data()else:result session.run(fCALL db.getEdgeSchema({table_name})).data()schema_info json.loads(result[0][schema])for prop in schema_info.get(properties, []):prop_dict {name: prop[name],type: prop[type],default_expression: ,is_in_primary_key: bool(primary in schema_infoand prop[name] schema_info[primary]),comment: prop[name],}data.append(prop_dict)return datadef close(self):Close the Neo4j driver.self._driver.close()# {name: id, type: STRING, optional: False},

{name: name, type: STRING, optional: False, index: True},

{name: num, type: STRING, optional: False, unique: True},

{name: desc, type: STRING, optional: True}

构建节点json语句用于tugraph创建节点

def bulid_node_json(node_name:str,pro_key:str ,node_properties):vertex_label_json {label: node_name,primary: pro_key,type: VERTEX,detach_property: True,properties: []}for node_property in node_properties:proper_info {name: node_property[0], type: STRING, optional: False}vertex_label_json[properties].append(proper_info)return json.dumps(vertex_label_json)def bulid_edge_json(edge_name:str,edge_properties,start_node_key,end_node_key):edge_label_json {label: edge_name,type: EDGE,detach_property: True,constraints: [],properties: []}edge_label_json[constraints].append([edge_properties[0][1][0],edge_properties[0][2][0]])# 这是在边属性中存储节点的主键不需要也可以# edge_label_json[properties].append({name: start_node_key if start_node_key ! end_node_key else start_node_key1, type: STRING, optional: False})# edge_label_json[properties].append({name: end_node_key if start_node_key ! end_node_key else start_node_key2, type: STRING, optional: False})for edge_property in edge_properties:proper_info {name: edge_property[0], type: STRING, optional: True}edge_label_json[properties].append(proper_info)return json.dumps(edge_label_json)def neo4jNode2Tugrapg(connector,tugraphConn):query CALL db.labels() YIELD labelRETURN label;print(Executing query:, query)results_nodes connector.run(query)print(f所有的节点:{results_nodes})print(指定的节点:,connector.specified_node)for node in results_nodes:#获取节点结构query fMATCH (n:{node[0]})UNWIND keys(n) AS keyRETURN DISTINCT keynode_properties connector.run(query)if node[0] not in connector.specified_node and len(connector.specified_node) ! 0:continueprint(f当前 neo4j 节点 {node[0]} , roperties : {node_properties}!!)if tugraphConn.check_label_exists(node[0],node):print(node[0],节点已经存在!)else:print(node[0],节点不存在,需要新建!)node_json bulid_node_json(node[0],connector.node_pro_key[node[0]],node_properties)# 新建不存在的节点tugraphConn.create_vertex_labels(node_json)# neo4j中查询出当前节点标签下所有节点queryNode fMATCH (n:{node[0]}) RETURN n# 构建插入语句同步节点synchronize_node(node[0],connector.run(queryNode),node_properties,tugraphConn)# node_name 当前节点标签名

node_result neo4j中查询出的节点结果

tugraphConn tugraph连接器

构建新增节点语句并tugraphConn 执行一次执行300条

CREATE (:node1 {id: 2, name: 李四, num: 001, desc: 李四的信息}),

(:node1 {id: 3, name: 李四, num: 001, desc: 李四的信息});

def synchronize_node(node_name:str,node_result,node_properties,tugraphConn):# 构建Cypher查询语句print(f同步 {node_name} 节点共 {len(node_result)} 记录请等待执行完成…)create_node_cypher_parts []count 0skip_num 0for node in node_result:# print(aa,aa)item node[0]._propertiesproperties_list []is_skip Falsefor key in node_properties:# 如果节点结构与当前节点属性结构不一致则跳过当前节点if key[0] not in item.keys():skip_num 1is_skip Truebreakif is_skip:continuefor key, value in item.items():properties_list.append(f{key}: {value})# if isinstance(value, str):# # 如果是字符串则添加引号# properties_list.append(f{key}: {value})# else:# # 否则直接添加# properties_list.append(f{key}: {value})cypher_query f(:{node_name} {{{, .join(properties_list)}}})create_node_cypher_parts.append(cypher_query)count 1# 每300个节点执行一次TuGraph数据库操作if count % 300 0:create_node_cypher fCREATE {, .join(create_node_cypher_parts)}# print(create_node_cypher) # 打印生成的Cypher查询语句以便调试tugraphConn.run(create_node_cypher)create_node_cypher_parts [] # 清空列表以准备下一批节点# 处理剩余的节点if create_node_cypher_parts:create_node_cypher fCREATE {, .join(create_node_cypher_parts)}# print(create_node_cypher) # 打印生成的Cypher查询语句以便调试tugraphConn.run(create_node_cypher)print(f所有 {node_name} 节点同步完成,共 {len(node_result)} 条记录,不符合要求 {skip_num} 条;成功导入 {count} 条!)# 导入边 def neo4jEdge2Tugrapg(connector,tugraphConn):query CALL db.relationshipTypes() YIELD relationshipTypeRETURN relationshipType;print(Executing query:, query)results_dege connector.run(query)print(f所有的关系:{results_dege})print(f指定的关系:{connector.specified_relation})for edge in results_dege:if edge[0] not in connector.specified_relation and len(connector.specified_relation) ! 0:continue# 获取关系结构query fMATCH (n1)-[r:{edge[0]}]-(n2) UNWIND keys® AS key RETURN DISTINCT key, labels(n1) AS start_node_labels, labels(n2) AS end_node_labelsedge_properties connector.run(query)start_node edge_properties[0][1][0]end_node edge_properties[0][2][0]if start_node not in connector.specified_node or end_node not in connector.specified_node:print(f{edge[0]}关系中存在不符合要求的节点跳过!)continueif tugraphConn.check_label_exists(edge[0],edge):print(edge[0],关系已经存在!)else:print(edge[0],关系不存在,需要新建!)#获取节点结构node_json bulid_edge_json(edge[0],edge_properties, connector.node_pro_key[start_node], connector.node_pro_key[end_node])# 新建不存在的节点tugraphConn.create_edge_labels(node_json)# neo4j中查询出当前节点标签下所有节点queryNode fMATCH (n1)-[r:{edge[0]}]-(n2) RETURN n1,r,n2;results connector.run(queryNode)# 构建插入语句同步节点synchronize_edge(edge[0],results,start_node,end_node,tugraphConn)def synchronize_edge(edge_name:str,edge_results,start_node_name,end_node_name,tugraphConn):# 构建Cypher查询语句print(f同步 {edge_name} 关系共 {len(edge_results)} 记录请等待执行完成…)create_node_cypher_parts []count 0skip_num 0for edge in edge_results:properties_list []for gx in edge:if hasattr(gx, type):if list(gx.start_node.labels)[0] start_node_name and list(gx.end_node.labels)[0] end_node_name:start_node gx.start_nodeend_node gx.end_nodestart_pro_key tugraphConn.node_pro_key[start_node_name]end_pro_key tugraphConn.node_pro_key[end_node_name]start_pro_val start_node[start_pro_key]end_pro_val end_node[end_pro_key]# 创建一个字典来存储所有属性csv_map {# start_pro_key if start_node_name ! end_node_name else start_pro_key1: start_pro_val,# end_pro_key if start_node_name ! end_node_name else end_pro_key2: end_pro_val}csv_map.update(gx)# 将属性字典转换为 JSON 风格的字符串# 构造关系属性字符串rel_props_list [f{key}: {value} for key, value in csv_map.items()]rel_props_str { , .join(rel_props_list) }# todo 批量操作存储属性的# str1 f{{startId:{start_pro_val}, endId:{end_pro_val}, relProps:{rel_props_str}}}# properties_list.append(str1)# create_node_cypher_parts.append(str1)create_edge_cypher fMATCH (n1:{start_node_name} {{{start_pro_key}: {start_pro_val}}}),(n2:{end_node_name} {{{end_pro_key}: {end_pro_val}}})CREATE (n1)-[:{edge_name} {rel_props_str}]-(n2);# print(f执行新增关系[{edge_name}]的cypher:{create_edge_cypher})tugraphConn.run(create_edge_cypher)count 1else:break# 批量操作 (tugraph不支持)# if count % 3 0 and create_node_cypher_parts:# map {## }# queue_cypher f# UNWIND [{, .join(create_node_cypher_parts)}] AS relData# MATCH (a:{start_node_name} {{{tugraphConn.node_pro_key[start_node_name]}: relData.startId}}), (b:{end_node_name} {{{tugraphConn.node_pro_key[end_node_name]}: relData.endId}})# MERGE (a)-[r:{edge_name}]-(b)# SET r relData.relProps# RETURN r;# # print(f执行新增关系[{edge_name}]的cypher:{queue_cypher})# # tugraphConn.run(queue_cypher)# create_node_cypher_parts []# if create_node_cypher_parts:# queue_cypher f# UNWIND [{, .join(create_node_cypher_parts)}] AS relData# MATCH (a:{start_node_name} {{{tugraphConn.node_pro_key[start_node_name]}: relData.startId}}), (b:{end_node_name} {{{tugraphConn.node_pro_key[end_node_name]}: relData.endId}})# MERGE (a)-[r:{edge_name}]-(b)# SET r relData.relProps# RETURN r;# # print(f执行新增关系[{edge_name}]的cypher:{queue_cypher})# tugraphConn.run(queue_cypher)print(f所有 {edge_name} 节点同步完成,共 {len(edge_results)} 条记录,不符合要求 {skip_num} 条;成功导入 {count} 条!)# 创建连接器 def conn_tugraph():# 配置连接信息host 1111port 111user 111password 111db_name test121db_type tugraphconnector GraphConnector.from_uri_db(host, port, user, password, db_name, db_type)return connector def conn_neo4j():# 配置连接信息host 11111port 111user 111password 111111db_name 111db_type neo4jconnector GraphConnector.from_uri_db(host, port, user, password, db_name, db_type)return connectordef main():neo4jConn conn_neo4j()tugraphConn conn_tugraph()print(Successfully connected to Graph!)# 创建TuGraph新图库 - 连接时选中可以手动创建或者在初始化方法中创建tugraphConn.create_graph_new(test121)# 导入节点neo4jNode2Tugrapg(neo4jConn,tugraphConn)# 导入边neo4jEdge2Tugrapg(neo4jConn,tugraphConn)# get_relation_tocsv(connector)# 关闭连接neo4jConn.close()tugraphConn.close()print(Connection closed.)if name main:main() 说明 只是用Python简单写了一个可以执行导入操作的脚本欢迎指正和优化。边的导入比较慢单条导入。 有两种优化思路 一、Cypher 语句: UNWIND [{startId:‘11’, endId:‘21’, relProps:{ hphm: ‘33’, sj: ‘44’ }}, {startId:‘22’, endId:‘23’, relProps:{ hphm: ‘44’, sj: ‘20080102’ }}, {startId:‘33’, endId:‘24’, relProps:{ hphm: 55, sj: ‘20120110’ }}] AS relData MATCH (a:Ren {zjhm: relData.startId}), (b:Che {rowkey: relData.endId}) MERGE (a)-[r:sycl]-(b) SET r relData.relProps RETURN r; 二、 https://tugraph-db.readthedocs.io/zh-cn/latest/development_guide.html Tugraph 官网的批量操作 CALL db.upsertEdge(‘edge1’,{type:‘node1’,key:‘node1_id’}, {type:‘node2’,key:‘node2_id’}, [{node1_id:1,node2_id:2,score:10},{node1_id:3,node2_id:4,score:20}]) 代码里面留了 但 我的版本好像都不支持 后文 欢迎讨论