首頁  >  文章  >  後端開發  >  在 Python 中使用 OpenSearch 掌握 CRUD 操作:實用指南

在 Python 中使用 OpenSearch 掌握 CRUD 操作:實用指南

Susan Sarandon
Susan Sarandon原創
2024-09-21 22:15:091002瀏覽

Mastering CRUD Operations with OpenSearch in Python: A Practical Guide

OpenSearch,是Elasticsearch 的開源替代品,是一個強大的搜尋和分析引擎,旨在處理大型數據集 輕鬆。在本部落格中,我們將示範如何使用Python在OpenSearch中執行基本的CRUD(建立、讀取、更新、刪除)操作。

先決條件

  • Python 3.7+
  • 使用 Docker 在本機安裝 OpenSearch
  • 熟悉 RESTful API

第 1 步:使用 Docker 在本機設定 OpenSearch

首先,我們需要一個本地 OpenSearch 實例。下面是一個簡單的 docker-compose.yml 文件,它啟動 OpenSearch 和 OpenSearch 儀表板

version: '3'
services:
  opensearch-test-node-1:
    image: opensearchproject/opensearch:2.13.0
    container_name: opensearch-test-node-1
    environment:
      - cluster.name=opensearch-test-cluster
      - node.name=opensearch-test-node-1
      - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2
      - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - "DISABLE_INSTALL_DEMO_CONFIG=true"
      - "DISABLE_SECURITY_PLUGIN=true"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-test-data1:/usr/share/opensearch/data
    ports:
      - 9200:9200
      - 9600:9600
    networks:
      - opensearch-test-net

  opensearch-test-node-2:
    image: opensearchproject/opensearch:2.13.0
    container_name: opensearch-test-node-2
    environment:
      - cluster.name=opensearch-test-cluster
      - node.name=opensearch-test-node-2
      - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2
      - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - "DISABLE_INSTALL_DEMO_CONFIG=true"
      - "DISABLE_SECURITY_PLUGIN=true"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-test-data2:/usr/share/opensearch/data
    networks:
      - opensearch-test-net

  opensearch-test-dashboards:
    image: opensearchproject/opensearch-dashboards:2.13.0
    container_name: opensearch-test-dashboards
    ports:
      - 5601:5601
    expose:
      - "5601"
    environment:
      - 'OPENSEARCH_HOSTS=["http://opensearch-test-node-1:9200","http://opensearch-test-node-2:9200"]'
      - "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true"
    networks:
      - opensearch-test-net

volumes:
  opensearch-test-data1:
  opensearch-test-data2:

networks:
  opensearch-test-net:

執行下列指令來啟動您的 OpenSearch 執行個體:
docker-compose up
OpenSearch 可透過 http://localhost:9200 存取。

第2步:設定Python環境

python -m venv .venv
source .venv/bin/activate
pip install opensearch-py

我們也會以以下方式建構我們的專案:

├── interfaces.py
├── main.py
├── searchservice.py
├── docker-compose.yml

第 3 步:定義介面與資源 (interfaces.py)

在interfaces.py 檔案中,我們定義了Resource 和Resources 類別。這些將有助於我們動態處理 OpenSearch 中的不同資源類型(在本例中為使用者)。

from dataclasses import dataclass, field

@dataclass
class Resource:
    name: str

    def __post_init__(self) -> None:
        self.name = self.name.lower()

@dataclass
class Resources:
    users: Resource = field(default_factory=lambda: Resource("Users"))

第 4 步:使用 OpenSearch 進行 CRUD 操作 (searchservice.py)

在searchservice.py中,我們定義了一個抽象類別SearchService來概述所需的操作。然後,HTTPOpenSearchService 類別實作這些 CRUD 方法,與 OpenSearch 用戶端互動。

# coding: utf-8

import abc
import logging
import typing as t
from dataclasses import dataclass
from uuid import UUID

from interfaces import Resource, Resources
from opensearchpy import NotFoundError, OpenSearch

resources = Resources()


class SearchService(abc.ABC):
    def search(
        self,
        kinds: t.List[Resource],
        tenants_id: UUID,
        companies_id: UUID,
        query: t.Dict[str, t.Any],
    ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]:
        raise NotImplementedError

    def delete_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> None:
        raise NotImplementedError

    def index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> t.Dict[str, t.Any]:
        raise NotImplementedError

    def delete_document(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        document_id: str,
    ) -> t.Optional[t.Dict[str, t.Any]]:
        raise NotImplementedError

    def create_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> None:
        raise NotImplementedError


@dataclass(frozen=True)
class HTTPOpenSearchService(SearchService):
    client: OpenSearch

    def _gen_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
    ) -> str:
        return (
            f"tenant_{str(UUID(str(tenants_id)))}"
            f"_company_{str(UUID(str(companies_id)))}"
            f"_kind_{kind.name}"
        )

    def index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> t.Dict[str, t.Any]:
        self.client.index(
            index=self._gen_index(kind, tenants_id, companies_id),
            body=data,
            id=data.get("id"),
        )
        return data

    def delete_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
    ) -> None:
        try:
            index = self._gen_index(kind, tenants_id, companies_id)
            if self.client.indices.exists(index):
                self.client.indices.delete(index)
        except NotFoundError:
            pass

    def create_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
    ) -> None:
        body: t.Dict[str, t.Any] = {}
        self.client.indices.create(
            index=self._gen_index(kind, tenants_id, companies_id),
            body=body,
        )

    def search(
        self,
        kinds: t.List[Resource],
        tenants_id: UUID,
        companies_id: UUID,
        query: t.Dict[str, t.Any],
    ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]:
        return self.client.search(
            index=",".join(
                [self._gen_index(kind, tenants_id, companies_id) for kind in kinds]
            ),
            body={"query": query},
        )

    def delete_document(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        document_id: str,
    ) -> t.Optional[t.Dict[str, t.Any]]:
        try:
            response = self.client.delete(
                index=self._gen_index(kind, tenants_id, companies_id),
                id=document_id,
            )
            return response
        except Exception as e:
            logging.error(f"Error deleting document: {e}")
            return None

第5步:在Main中實作CRUD(main.py)

在 main.py 中,我們示範如何:

  • 在 OpenSearch 中建立索引
  • 索引文件以及範例使用者資料。
  • 根據查詢搜尋文件。
  • 使用文件 ID 刪除文件。

main.py

# coding=utf-8

import logging
import os
import typing as t
from uuid import uuid4

import searchservice
from interfaces import Resources
from opensearchpy import OpenSearch

resources = Resources()

logging.basicConfig(level=logging.INFO)

search_service = searchservice.HTTPOpenSearchService(
    client=OpenSearch(
        hosts=[
            {
                "host": os.getenv("OPENSEARCH_HOST", "localhost"),
                "port": os.getenv("OPENSEARCH_PORT", "9200"),
            }
        ],
        http_auth=(
            os.getenv("OPENSEARCH_USERNAME", ""),
            os.getenv("OPENSEARCH_PASSWORD", ""),
        ),
        use_ssl=False,
        verify_certs=False,
    ),
)

tenants_id: str = "f0835e2d-bd68-406c-99a7-ad63a51e9ef9"
companies_id: str = "bf58c749-c90a-41e2-b66f-6d98aae17a6c"
search_str: str = "frank"
document_id_to_delete: str = str(uuid4())

fake_data: t.List[t.Dict[str, t.Any]] = [
    {"id": document_id_to_delete, "name": "Franklin", "tech": "python,node,golang"},
    {"id": str(uuid4()), "name": "Jarvis", "tech": "AI"},
    {"id": str(uuid4()), "name": "Parry", "tech": "Golang"},
    {"id": str(uuid4()), "name": "Steve", "tech": "iOS"},
    {"id": str(uuid4()), "name": "Frank", "tech": "node"},
]

search_service.delete_index(
    kind=resources.users, tenants_id=tenants_id, companies_id=companies_id
)

search_service.create_index(
    kind=resources.users,
    tenants_id=tenants_id,
    companies_id=companies_id,
)

for item in fake_data:
    search_service.index(
        kind=resources.users,
        tenants_id=tenants_id,
        companies_id=companies_id,
        data=dict(tenants_id=tenants_id, companies_id=companies_id, **item),
    )

search_query: t.Dict[str, t.Any] = {
    "bool": {
        "must": [],
        "must_not": [],
        "should": [],
        "filter": [
            {"term": {"tenants_id.keyword": tenants_id}},
            {"term": {"companies_id.keyword": companies_id}},
        ],
    }
}
search_query["bool"]["must"].append(
    {
        "multi_match": {
            "query": search_str,
            "type": "phrase_prefix",
            "fields": ["name", "tech"],
        }
    }
)

search_results = search_service.search(
    kinds=[resources.users],
    tenants_id=tenants_id,
    companies_id=companies_id,
    query=search_query,
)

final_result = search_results.get("hits", {}).get("hits", [])
for item in final_result:
    logging.info(["Item -> ", item.get("_source", {})])

deleted_result = search_service.delete_document(
    kind=resources.users,
    tenants_id=tenants_id,
    companies_id=companies_id,
    document_id=document_id_to_delete,
)
logging.info(["Deleted result -> ", deleted_result])

第 6 步:運行項目

docker 組成
python main.py

結果

它應該會列印找到和刪除的記錄資訊。

第 7 步:結論

在本部落格中,我們示範了如何使用Docker 在本機設定OpenSearch 並使用CRUD 執行基本操作🎜> PythonOpenSearch 為管理和查詢大型資料集提供了強大且可擴展的解決方案。雖然本指南重點介紹OpenSearch 與虛擬資料的集成,但在實際應用程式中,OpenSearch 通常用作讀取最佳化儲存更快 資料檢索。在這種情況下,通常會實作不同的索引策略,透過同時更新主資料庫和OpenSearch來確保資料一致性。

這可確保

OpenSearch與您的主要數據來源保持同步,優化性能準確性 在資料檢索中。

參考文獻

https://github.com/FranklinThaker/opensearch-integration-example

以上是在 Python 中使用 OpenSearch 掌握 CRUD 操作:實用指南的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn