Rumah > Artikel > pembangunan bahagian belakang > Menguasai Operasi CRUD dengan OpenSearch dalam Python: Panduan Praktikal
OpenSearch, alternatif sumber terbuka kepada Elasticsearch, ialah enjin carian dan analitis berkuasa yang dibina untuk mengendalikan set data yang besar dengan mudah. Dalam blog ini, kami akan menunjukkan cara melaksanakan operasi asas CRUD (Buat, Baca, Kemas Kini, Padam) dalam OpenSearch menggunakan Python.
Untuk bermula, kami memerlukan contoh OpenSearch tempatan. Di bawah ialah fail docker-compose.yml ringkas yang memutarkan OpenSearch dan OpenSearch Papan Pemuka.
version: '3' services: opensearch-test-node-1: image: opensearchproject/opensearch:2.13.0 container_name: opensearch-test-node-1 environment: - cluster.name=opensearch-test-cluster - node.name=opensearch-test-node-1 - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2 - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2 - bootstrap.memory_lock=true - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" - "DISABLE_INSTALL_DEMO_CONFIG=true" - "DISABLE_SECURITY_PLUGIN=true" ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 volumes: - opensearch-test-data1:/usr/share/opensearch/data ports: - 9200:9200 - 9600:9600 networks: - opensearch-test-net opensearch-test-node-2: image: opensearchproject/opensearch:2.13.0 container_name: opensearch-test-node-2 environment: - cluster.name=opensearch-test-cluster - node.name=opensearch-test-node-2 - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2 - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2 - bootstrap.memory_lock=true - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" - "DISABLE_INSTALL_DEMO_CONFIG=true" - "DISABLE_SECURITY_PLUGIN=true" ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 volumes: - opensearch-test-data2:/usr/share/opensearch/data networks: - opensearch-test-net opensearch-test-dashboards: image: opensearchproject/opensearch-dashboards:2.13.0 container_name: opensearch-test-dashboards ports: - 5601:5601 expose: - "5601" environment: - 'OPENSEARCH_HOSTS=["http://opensearch-test-node-1:9200","http://opensearch-test-node-2:9200"]' - "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true" networks: - opensearch-test-net volumes: opensearch-test-data1: opensearch-test-data2: networks: opensearch-test-net:
Jalankan arahan berikut untuk memaparkan contoh OpenSearch anda:
docker-compose up
OpenSearch boleh diakses di http://localhost:9200.
python -m venv .venv source .venv/bin/activate pip install opensearch-py
Kami juga akan menstrukturkan projek kami seperti berikut:
├── interfaces.py ├── main.py ├── searchservice.py ├── docker-compose.yml
Dalam fail interfaces.py, kami mentakrifkan kelas Sumber dan Sumber kami. Ini akan membantu kami mengendalikan jenis sumber yang berbeza secara dinamik dalam OpenSearch (dalam kes ini, pengguna).
from dataclasses import dataclass, field @dataclass class Resource: name: str def __post_init__(self) -> None: self.name = self.name.lower() @dataclass class Resources: users: Resource = field(default_factory=lambda: Resource("Users"))
Dalam searchservice.py, kami mentakrifkan kelas abstrak SearchService untuk menggariskan operasi yang diperlukan. Kelas HTTPOpenSearchService kemudian melaksanakan kaedah CRUD ini, berinteraksi dengan klien OpenSearch.
# coding: utf-8 import abc import logging import typing as t from dataclasses import dataclass from uuid import UUID from interfaces import Resource, Resources from opensearchpy import NotFoundError, OpenSearch resources = Resources() class SearchService(abc.ABC): def search( self, kinds: t.List[Resource], tenants_id: UUID, companies_id: UUID, query: t.Dict[str, t.Any], ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]: raise NotImplementedError def delete_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> None: raise NotImplementedError def index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> t.Dict[str, t.Any]: raise NotImplementedError def delete_document( self, kind: Resource, tenants_id: UUID, companies_id: UUID, document_id: str, ) -> t.Optional[t.Dict[str, t.Any]]: raise NotImplementedError def create_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> None: raise NotImplementedError @dataclass(frozen=True) class HTTPOpenSearchService(SearchService): client: OpenSearch def _gen_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, ) -> str: return ( f"tenant_{str(UUID(str(tenants_id)))}" f"_company_{str(UUID(str(companies_id)))}" f"_kind_{kind.name}" ) def index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> t.Dict[str, t.Any]: self.client.index( index=self._gen_index(kind, tenants_id, companies_id), body=data, id=data.get("id"), ) return data def delete_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, ) -> None: try: index = self._gen_index(kind, tenants_id, companies_id) if self.client.indices.exists(index): self.client.indices.delete(index) except NotFoundError: pass def create_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, ) -> None: body: t.Dict[str, t.Any] = {} self.client.indices.create( index=self._gen_index(kind, tenants_id, companies_id), body=body, ) def search( self, kinds: t.List[Resource], tenants_id: UUID, companies_id: UUID, query: t.Dict[str, t.Any], ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]: return self.client.search( index=",".join( [self._gen_index(kind, tenants_id, companies_id) for kind in kinds] ), body={"query": query}, ) def delete_document( self, kind: Resource, tenants_id: UUID, companies_id: UUID, document_id: str, ) -> t.Optional[t.Dict[str, t.Any]]: try: response = self.client.delete( index=self._gen_index(kind, tenants_id, companies_id), id=document_id, ) return response except Exception as e: logging.error(f"Error deleting document: {e}") return None
Dalam main.py, kami menunjukkan cara untuk:
- Buat indeks dalam OpenSearch.
- Indeks dokumen dengan sampel data pengguna.
- Cari untuk dokumen berdasarkan pertanyaan.
- Padam dokumen menggunakan IDnya.
main.py
# coding=utf-8 import logging import os import typing as t from uuid import uuid4 import searchservice from interfaces import Resources from opensearchpy import OpenSearch resources = Resources() logging.basicConfig(level=logging.INFO) search_service = searchservice.HTTPOpenSearchService( client=OpenSearch( hosts=[ { "host": os.getenv("OPENSEARCH_HOST", "localhost"), "port": os.getenv("OPENSEARCH_PORT", "9200"), } ], http_auth=( os.getenv("OPENSEARCH_USERNAME", ""), os.getenv("OPENSEARCH_PASSWORD", ""), ), use_ssl=False, verify_certs=False, ), ) tenants_id: str = "f0835e2d-bd68-406c-99a7-ad63a51e9ef9" companies_id: str = "bf58c749-c90a-41e2-b66f-6d98aae17a6c" search_str: str = "frank" document_id_to_delete: str = str(uuid4()) fake_data: t.List[t.Dict[str, t.Any]] = [ {"id": document_id_to_delete, "name": "Franklin", "tech": "python,node,golang"}, {"id": str(uuid4()), "name": "Jarvis", "tech": "AI"}, {"id": str(uuid4()), "name": "Parry", "tech": "Golang"}, {"id": str(uuid4()), "name": "Steve", "tech": "iOS"}, {"id": str(uuid4()), "name": "Frank", "tech": "node"}, ] search_service.delete_index( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id ) search_service.create_index( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id, ) for item in fake_data: search_service.index( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id, data=dict(tenants_id=tenants_id, companies_id=companies_id, **item), ) search_query: t.Dict[str, t.Any] = { "bool": { "must": [], "must_not": [], "should": [], "filter": [ {"term": {"tenants_id.keyword": tenants_id}}, {"term": {"companies_id.keyword": companies_id}}, ], } } search_query["bool"]["must"].append( { "multi_match": { "query": search_str, "type": "phrase_prefix", "fields": ["name", "tech"], } } ) search_results = search_service.search( kinds=[resources.users], tenants_id=tenants_id, companies_id=companies_id, query=search_query, ) final_result = search_results.get("hits", {}).get("hits", []) for item in final_result: logging.info(["Item -> ", item.get("_source", {})]) deleted_result = search_service.delete_document( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id, document_id=document_id_to_delete, ) logging.info(["Deleted result -> ", deleted_result])
karang buruh pelabuhan
python main.py
Ia harus mencetak maklumat rekod yang ditemui & dipadamkan.
Dalam blog ini, kami telah menunjukkan cara menyediakan OpenSearch secara setempat menggunakan Docker dan melaksanakan operasi asas CRUD dengan Python. OpenSearch menyediakan penyelesaian yang berkuasa dan berskala untuk mengurus dan menanyakan set data yang besar. Walaupun panduan ini menumpukan pada penyepaduan OpenSearch dengan data dummy, dalam aplikasi dunia sebenar, OpenSearch sering digunakan sebagai kedai dioptimumkan baca untuk lebih pantas pengambilan data. Dalam kes sedemikian, adalah perkara biasa untuk melaksanakan strategi pengindeksan yang berbeza untuk memastikan ketekalan data dengan mengemas kini kedua-dua pangkalan data utama dan OpenSearch serentak.
Ini memastikan OpenSearch kekal selaras dengan sumber data utama anda, mengoptimumkan kedua-dua prestasi dan ketepatan dalam pengambilan data.
https://github.com/FranklinThaker/opensearch-integration-example
Atas ialah kandungan terperinci Menguasai Operasi CRUD dengan OpenSearch dalam Python: Panduan Praktikal. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!