>백엔드 개발 >파이썬 튜토리얼 >Python에서 OpenSearch를 사용하여 CRUD 작업 마스터하기: 실용 가이드

Python에서 OpenSearch를 사용하여 CRUD 작업 마스터하기: 실용 가이드

Susan Sarandon
Susan Sarandon원래의
2024-09-21 22:15:091219검색

Mastering CRUD Operations with OpenSearch in Python: A Practical Guide

OpenSearchElasticsearch의 오픈 소스 대안으로, 대규모 데이터 세트 쉽게. 이 블로그에서는 Python을 사용하여 OpenSearch에서 기본 CRUD(생성, 읽기, 업데이트, 삭제) 작업을 수행하는 방법을 시연하겠습니다.

전제 조건:

  • 파이썬 3.7+
  • Docker를 사용하여 로컬에 설치된 OpenSearch
  • RESTful API에 대한 지식

1단계: Docker를 사용하여 로컬로 OpenSearch 설정

시작하려면 로컬 OpenSearch 인스턴스가 필요합니다. 다음은 OpenSearch 및 OpenSearch 대시보드를 실행하는 간단한 docker-compose.yml 파일입니다.

version: '3'
services:
  opensearch-test-node-1:
    image: opensearchproject/opensearch:2.13.0
    container_name: opensearch-test-node-1
    environment:
      - cluster.name=opensearch-test-cluster
      - node.name=opensearch-test-node-1
      - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2
      - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - "DISABLE_INSTALL_DEMO_CONFIG=true"
      - "DISABLE_SECURITY_PLUGIN=true"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-test-data1:/usr/share/opensearch/data
    ports:
      - 9200:9200
      - 9600:9600
    networks:
      - opensearch-test-net

  opensearch-test-node-2:
    image: opensearchproject/opensearch:2.13.0
    container_name: opensearch-test-node-2
    environment:
      - cluster.name=opensearch-test-cluster
      - node.name=opensearch-test-node-2
      - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2
      - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - "DISABLE_INSTALL_DEMO_CONFIG=true"
      - "DISABLE_SECURITY_PLUGIN=true"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-test-data2:/usr/share/opensearch/data
    networks:
      - opensearch-test-net

  opensearch-test-dashboards:
    image: opensearchproject/opensearch-dashboards:2.13.0
    container_name: opensearch-test-dashboards
    ports:
      - 5601:5601
    expose:
      - "5601"
    environment:
      - 'OPENSEARCH_HOSTS=["http://opensearch-test-node-1:9200","http://opensearch-test-node-2:9200"]'
      - "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true"
    networks:
      - opensearch-test-net

volumes:
  opensearch-test-data1:
  opensearch-test-data2:

networks:
  opensearch-test-net:

OpenSearch 인스턴스를 불러오려면 다음 명령을 실행하세요.
도커-작성
OpenSearch는 http://localhost:9200에서 액세스할 수 있습니다.

2단계: Python 환경 설정

python -m venv .venv
source .venv/bin/activate
pip install opensearch-py

또한 프로젝트를 다음과 같이 구성할 예정입니다.

├── interfaces.py
├── main.py
├── searchservice.py
├── docker-compose.yml

3단계: 인터페이스 및 리소스 정의(interfaces.py)

interfaces.py 파일에서 Resource 및 Resources 클래스를 정의합니다. 이는 OpenSearch에서 다양한 리소스 유형(이 경우 사용자)을 동적으로 처리하는 데 도움이 됩니다.

from dataclasses import dataclass, field

@dataclass
class Resource:
    name: str

    def __post_init__(self) -> None:
        self.name = self.name.lower()

@dataclass
class Resources:
    users: Resource = field(default_factory=lambda: Resource("Users"))

4단계: OpenSearch(searchservice.py)를 사용한 CRUD 작업

searchservice.py에서는 필요한 작업의 개요를 설명하기 위해 SearchService 추상 클래스를 정의합니다. 그런 다음 HTTPOpenSearchService 클래스는 OpenSearch 클라이언트와 상호 작용하면서 이러한 CRUD 메서드를 구현합니다.

# coding: utf-8

import abc
import logging
import typing as t
from dataclasses import dataclass
from uuid import UUID

from interfaces import Resource, Resources
from opensearchpy import NotFoundError, OpenSearch

resources = Resources()


class SearchService(abc.ABC):
    def search(
        self,
        kinds: t.List[Resource],
        tenants_id: UUID,
        companies_id: UUID,
        query: t.Dict[str, t.Any],
    ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]:
        raise NotImplementedError

    def delete_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> None:
        raise NotImplementedError

    def index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> t.Dict[str, t.Any]:
        raise NotImplementedError

    def delete_document(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        document_id: str,
    ) -> t.Optional[t.Dict[str, t.Any]]:
        raise NotImplementedError

    def create_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> None:
        raise NotImplementedError


@dataclass(frozen=True)
class HTTPOpenSearchService(SearchService):
    client: OpenSearch

    def _gen_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
    ) -> str:
        return (
            f"tenant_{str(UUID(str(tenants_id)))}"
            f"_company_{str(UUID(str(companies_id)))}"
            f"_kind_{kind.name}"
        )

    def index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> t.Dict[str, t.Any]:
        self.client.index(
            index=self._gen_index(kind, tenants_id, companies_id),
            body=data,
            id=data.get("id"),
        )
        return data

    def delete_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
    ) -> None:
        try:
            index = self._gen_index(kind, tenants_id, companies_id)
            if self.client.indices.exists(index):
                self.client.indices.delete(index)
        except NotFoundError:
            pass

    def create_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
    ) -> None:
        body: t.Dict[str, t.Any] = {}
        self.client.indices.create(
            index=self._gen_index(kind, tenants_id, companies_id),
            body=body,
        )

    def search(
        self,
        kinds: t.List[Resource],
        tenants_id: UUID,
        companies_id: UUID,
        query: t.Dict[str, t.Any],
    ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]:
        return self.client.search(
            index=",".join(
                [self._gen_index(kind, tenants_id, companies_id) for kind in kinds]
            ),
            body={"query": query},
        )

    def delete_document(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        document_id: str,
    ) -> t.Optional[t.Dict[str, t.Any]]:
        try:
            response = self.client.delete(
                index=self._gen_index(kind, tenants_id, companies_id),
                id=document_id,
            )
            return response
        except Exception as e:
            logging.error(f"Error deleting document: {e}")
            return None

5단계: Main(main.py)에서 CRUD 구현

main.py에서는 다음 방법을 보여줍니다.

  • OpenSearch에서 색인을 생성하세요.
  • 색인 문서 샘플 사용자 데이터
  • 쿼리를 기반으로 문서를 검색
  • 해당 ID를 사용하는 문서를 삭제

main.py

# coding=utf-8

import logging
import os
import typing as t
from uuid import uuid4

import searchservice
from interfaces import Resources
from opensearchpy import OpenSearch

resources = Resources()

logging.basicConfig(level=logging.INFO)

search_service = searchservice.HTTPOpenSearchService(
    client=OpenSearch(
        hosts=[
            {
                "host": os.getenv("OPENSEARCH_HOST", "localhost"),
                "port": os.getenv("OPENSEARCH_PORT", "9200"),
            }
        ],
        http_auth=(
            os.getenv("OPENSEARCH_USERNAME", ""),
            os.getenv("OPENSEARCH_PASSWORD", ""),
        ),
        use_ssl=False,
        verify_certs=False,
    ),
)

tenants_id: str = "f0835e2d-bd68-406c-99a7-ad63a51e9ef9"
companies_id: str = "bf58c749-c90a-41e2-b66f-6d98aae17a6c"
search_str: str = "frank"
document_id_to_delete: str = str(uuid4())

fake_data: t.List[t.Dict[str, t.Any]] = [
    {"id": document_id_to_delete, "name": "Franklin", "tech": "python,node,golang"},
    {"id": str(uuid4()), "name": "Jarvis", "tech": "AI"},
    {"id": str(uuid4()), "name": "Parry", "tech": "Golang"},
    {"id": str(uuid4()), "name": "Steve", "tech": "iOS"},
    {"id": str(uuid4()), "name": "Frank", "tech": "node"},
]

search_service.delete_index(
    kind=resources.users, tenants_id=tenants_id, companies_id=companies_id
)

search_service.create_index(
    kind=resources.users,
    tenants_id=tenants_id,
    companies_id=companies_id,
)

for item in fake_data:
    search_service.index(
        kind=resources.users,
        tenants_id=tenants_id,
        companies_id=companies_id,
        data=dict(tenants_id=tenants_id, companies_id=companies_id, **item),
    )

search_query: t.Dict[str, t.Any] = {
    "bool": {
        "must": [],
        "must_not": [],
        "should": [],
        "filter": [
            {"term": {"tenants_id.keyword": tenants_id}},
            {"term": {"companies_id.keyword": companies_id}},
        ],
    }
}
search_query["bool"]["must"].append(
    {
        "multi_match": {
            "query": search_str,
            "type": "phrase_prefix",
            "fields": ["name", "tech"],
        }
    }
)

search_results = search_service.search(
    kinds=[resources.users],
    tenants_id=tenants_id,
    companies_id=companies_id,
    query=search_query,
)

final_result = search_results.get("hits", {}).get("hits", [])
for item in final_result:
    logging.info(["Item -> ", item.get("_source", {})])

deleted_result = search_service.delete_document(
    kind=resources.users,
    tenants_id=tenants_id,
    companies_id=companies_id,
    document_id=document_id_to_delete,
)
logging.info(["Deleted result -> ", deleted_result])

6단계: 프로젝트 실행

도커 컴포지트업
파이썬 메인.py

결과:

발견 및 삭제된 기록 정보를 출력해야 합니다.

7단계: 결론

이 블로그에서는 Docker를 사용하여 로컬에서 OpenSearch를 설정하고 CRUD 작업을 로 수행하는 방법을 시연했습니다. 🎜>파이썬. OpenSearch는 대규모 데이터 세트를 관리하고 쿼리하기 위한 강력하고 확장 가능한 솔루션을 제공합니다. 이 가이드는 OpenSearch를 더미 데이터와 통합하는 데 중점을 두고 있지만 실제 애플리케이션에서는 OpenSearch가 더 빠른 읽기를 위해 읽기에 최적화된 저장소로 자주 사용됩니다. 데이터 검색. 이러한 경우 기본 데이터베이스와 OpenSearch를 동시에 업데이트하여 데이터 일관성을 보장하기 위해 다양한 인덱싱 전략을 구현하는 것이 일반적입니다.

이를 통해 OpenSearch기본 데이터 소스와 동기화되어 성능정확성최적화할 수 있습니다. > 데이터 검색 중

참고자료:

https://github.com/FranklinThaker/opensearch-integration-example

위 내용은 Python에서 OpenSearch를 사용하여 CRUD 작업 마스터하기: 실용 가이드의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.