首頁  >  文章  >  後端開發  >  將資料載入到 Neo4j 中

將資料載入到 Neo4j 中

王林
王林原創
2024-08-19 16:40:031146瀏覽

在上一篇部落格中,我們了解如何使用 2 個插件 APOC 和圖形資料科學庫 - GDS 在本地安裝和設定 neo4j。在這篇部落格中,我將取得一個玩具資料集(電子商務網站中的產品)並將其儲存在 Neo4j 中。

 

為 Neo4j 分配足夠的內存

在開始載入資料之前,如果您的用例中有大量數據,請確保為 Neo4j 分配了足夠的記憶體。為此:

  • 點選開啟右側的三個點

Load Data Into Neo4j

  • 點選開啟資料夾 -> 設定

Load Data Into Neo4j

  • 點選neo4j.conf

Load Data Into Neo4j

  • 在neo4j.conf 中搜尋heap,取消註釋第77、78 行,並將256m 更改為2048m,這樣可以確保Neo4j 中分配2048mb 用於資料儲存.

Load Data Into Neo4j

 
 

建立節點

  • 圖有兩個主要組成部分:節點和關係,我們先建立節點,然後再建立關係。

  • 我正在使用的資料在這裡 - data

  • 使用此處提供的requirements.txt來建立一個python虛擬環境-requirements.txt

  • 讓我們定義各種函數來推送資料。

  • 導入必要的庫

import pandas as pd
from neo4j import GraphDatabase
from openai import OpenAI
  • 我們將使用 openai 生成嵌入
client = OpenAI(api_key="")
product_data_df = pd.read_csv('../data/product_data.csv')
  • 產生嵌入
def get_embedding(text):
    """
    Used to generate embeddings using OpenAI embeddings model
    :param text: str - text that needs to be converted to embeddings
    :return: embedding
    """
    model = "text-embedding-3-small"
    text = text.replace("\n", " ")
    return client.embeddings.create(input=[text], model=model).data[0].embedding
  • 根據我們的資料集,我們可以有兩個唯一的節點標籤,類別:產品類別,產品:產品名稱。讓我們建立類別標籤,neo4j 提供了一種稱為屬性的東西,您可以將它們想像為特定節點的元資料。這裡 nameembedding 是屬性。因此,我們將類別名稱及其相應的嵌入儲存在資料庫中。
def create_category(product_data_df):
    """
    Used to generate queries for creating category nodes in neo4j
    :param product_data_df: pandas dataframe - data
    :return: query_list: list - list containing all create node queries for category
    """
    cat_query = """CREATE (a:Category {name: '%s', embedding: %s})"""
    distinct_category = product_data_df['Category'].unique()
    query_list = []
    for category in distinct_category:
        embedding = get_embedding(category)
        query_list.append(cat_query % (category, embedding))
    return query_list
  • 類似地,我們可以建立產品節點,這裡的屬性將是name, description, price, warranty_period, 🎜>可用庫存評論評級產品發布日期嵌入
def create_product(product_data_df):
    """
    Used to generate queries for creating product nodes in neo4j
    :param product_data_df: pandas dataframe - data 
    :return: query_list: list - list containing all create node queries for product 
    """
    product_query = """CREATE (a:Product {name: '%s', description: '%s', price: %d, warranty_period: %d, 
    available_stock: %d, review_rating: %f, product_release_date: date('%s'), embedding: %s})"""
    query_list = []
    for idx, row in product_data_df.iterrows():
        embedding = get_embedding(row['Product Name'] + " - " + row['Description'])
        query_list.append(product_query % (row['Product Name'], row['Description'], int(row['Price (INR)']),
                                           int(row['Warranty Period (Years)']), int(row['Stock']),
                                           float(row['Review Rating']), str(row['Product Release Date']), embedding))
    return query_list
    現在讓我們建立另一個函數來執行上述兩個函數產生的查詢。適當更新您的使用者名稱和密碼。
def execute_bulk_query(query_list):
    """
    Executes queries is a list one by one
    :param query_list: list - list of cypher queries
    :return: None
    """
    url = "bolt://localhost:7687"
    auth = ("neo4j", "neo4j@123")

    with GraphDatabase.driver(url, auth=auth) as driver:
        with driver.session() as session:
            for query in query_list:
                try:
                    session.run(query)
                except Exception as error:
                    print(f"Error in executing query - {query}, Error - {error}")
    完整程式碼
import pandas as pd
from neo4j import GraphDatabase
from openai import OpenAI

client = OpenAI(api_key="")
product_data_df = pd.read_csv('../data/product_data.csv')


def preprocessing(df, columns_to_replace):
    """
    Used to preprocess certain column in dataframe
    :param df: pandas dataframe - data
    :param columns_to_replace: list - column name list
    :return: df: pandas dataframe - processed data
    """
    df[columns_to_replace] = df[columns_to_replace].apply(lambda col: col.str.replace("'s", "s"))
    df[columns_to_replace] = df[columns_to_replace].apply(lambda col: col.str.replace("'", ""))
    return df


def get_embedding(text):
    """
    Used to generate embeddings using OpenAI embeddings model
    :param text: str - text that needs to be converted to embeddings
    :return: embedding
    """
    model = "text-embedding-3-small"
    text = text.replace("\n", " ")
    return client.embeddings.create(input=[text], model=model).data[0].embedding


def create_category(product_data_df):
    """
    Used to generate queries for creating category nodes in neo4j
    :param product_data_df: pandas dataframe - data
    :return: query_list: list - list containing all create node queries for category
    """
    cat_query = """CREATE (a:Category {name: '%s', embedding: %s})"""
    distinct_category = product_data_df['Category'].unique()
    query_list = []
    for category in distinct_category:
        embedding = get_embedding(category)
        query_list.append(cat_query % (category, embedding))
    return query_list


def create_product(product_data_df):
    """
    Used to generate queries for creating product nodes in neo4j
    :param product_data_df: pandas dataframe - data
    :return: query_list: list - list containing all create node queries for product
    """
    product_query = """CREATE (a:Product {name: '%s', description: '%s', price: %d, warranty_period: %d, 
    available_stock: %d, review_rating: %f, product_release_date: date('%s'), embedding: %s})"""
    query_list = []
    for idx, row in product_data_df.iterrows():
        embedding = get_embedding(row['Product Name'] + " - " + row['Description'])
        query_list.append(product_query % (row['Product Name'], row['Description'], int(row['Price (INR)']),
                                           int(row['Warranty Period (Years)']), int(row['Stock']),
                                           float(row['Review Rating']), str(row['Product Release Date']), embedding))
    return query_list


def execute_bulk_query(query_list):
    """
    Executes queries is a list one by one
    :param query_list: list - list of cypher queries
    :return: None
    """
    url = "bolt://localhost:7687"
    auth = ("neo4j", "neo4j@123")

    with GraphDatabase.driver(url, auth=auth) as driver:
        with driver.session() as session:
            for query in query_list:
                try:
                    session.run(query)
                except Exception as error:
                    print(f"Error in executing query - {query}, Error - {error}")

# PREPROCESSING
product_data_df = preprocessing(product_data_df, ['Product Name', 'Description'])

# CREATE CATEGORY
query_list = create_category(product_data_df)
execute_bulk_query(query_list)

# CREATE PRODUCT
query_list = create_product(product_data_df)
execute_bulk_query(query_list)

 

 

建立關係

    我們將在
  • 類別產品 之間建立關係,關係的名稱為 CATEGORY_CONTAINS_PRODUCT
from neo4j import GraphDatabase
import pandas as pd

product_data_df = pd.read_csv('../data/product_data.csv')


def preprocessing(df, columns_to_replace):
    """
    Used to preprocess certain column in dataframe
    :param df: pandas dataframe - data
    :param columns_to_replace: list - column name list
    :return: df: pandas dataframe - processed data
    """
    df[columns_to_replace] = df[columns_to_replace].apply(lambda col: col.str.replace("'s", "s"))
    df[columns_to_replace] = df[columns_to_replace].apply(lambda col: col.str.replace("'", ""))
    return df


def create_category_food_relationship_query(product_data_df):
    """
    Used to create relationship between category and products
    :param product_data_df: dataframe - data
    :return: query_list: list - cypher queries
    """
    query = """MATCH (c:Category {name: '%s'}), (p:Product {name: '%s'}) CREATE (c)-[:CATEGORY_CONTAINS_PRODUCT]->(p)"""
    query_list = []
    for idx, row in product_data_df.iterrows():
        query_list.append(query % (row['Category'], row['Product Name']))
    return query_list


def execute_bulk_query(query_list):
    """
    Executes queries is a list one by one
    :param query_list: list - list of cypher queries
    :return: None
    """
    url = "bolt://localhost:7687"
    auth = ("neo4j", "neo4j@123")

    with GraphDatabase.driver(url, auth=auth) as driver:
        with driver.session() as session:
            for query in query_list:
                try:
                    session.run(query)
                except Exception as error:
                    print(f"Error in executing query - {query}, Error - {error}")


# PREPROCESSING
product_data_df = preprocessing(product_data_df, ['Product Name', 'Description'])

# CATEGORY - FOOD RELATIONSHIP
query_list = create_category_food_relationship_query(product_data_df)
execute_bulk_query(query_list)

    透過使用 MATCH 查詢來匹配已建立的節點,我們在它們之間建立關係。
 

 

可視化創建的節點

將滑鼠停留在

開啟 圖示上,然後按一下 neo4j 瀏覽器 以視覺化我們建立的節點。
Load Data Into Neo4j

Load Data Into Neo4j

Load Data Into Neo4j

我們的資料連同它們的嵌入一起載入到 Neo4j 中。

 

在接下來的部落格中,我們將看到如何使用 python 建立圖形查詢引擎並使用獲取的資料進行增強生成。

希望這有幫助...再見! ! !

領英 - https://www.linkedin.com/in/praveenr2998/

Github - https://github.com/praveenr2998/Creating-Lightweight-RAG-Systems-With-Graphs/tree/main/push_data_to_db

以上是將資料載入到 Neo4j 中的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn