OpenSearch, an open-source alternative to Elasticsearch, is a powerful search and analytics engine built to handle large datasets with ease. In this blog, we’ll demonstrate how to perform basic CRUD (Create, Read, Update, Delete) operations in OpenSearch using Python.
To get started, we need a local OpenSearch instance. Below is a simple docker-compose.yml file that spins up OpenSearch and OpenSearch Dashboards.
version: '3' services: opensearch-test-node-1: image: opensearchproject/opensearch:2.13.0 container_name: opensearch-test-node-1 environment: - cluster.name=opensearch-test-cluster - node.name=opensearch-test-node-1 - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2 - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2 - bootstrap.memory_lock=true - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" - "DISABLE_INSTALL_DEMO_CONFIG=true" - "DISABLE_SECURITY_PLUGIN=true" ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 volumes: - opensearch-test-data1:/usr/share/opensearch/data ports: - 9200:9200 - 9600:9600 networks: - opensearch-test-net opensearch-test-node-2: image: opensearchproject/opensearch:2.13.0 container_name: opensearch-test-node-2 environment: - cluster.name=opensearch-test-cluster - node.name=opensearch-test-node-2 - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2 - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2 - bootstrap.memory_lock=true - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" - "DISABLE_INSTALL_DEMO_CONFIG=true" - "DISABLE_SECURITY_PLUGIN=true" ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 volumes: - opensearch-test-data2:/usr/share/opensearch/data networks: - opensearch-test-net opensearch-test-dashboards: image: opensearchproject/opensearch-dashboards:2.13.0 container_name: opensearch-test-dashboards ports: - 5601:5601 expose: - "5601" environment: - 'OPENSEARCH_HOSTS=["http://opensearch-test-node-1:9200","http://opensearch-test-node-2:9200"]' - "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true" networks: - opensearch-test-net volumes: opensearch-test-data1: opensearch-test-data2: networks: opensearch-test-net:
Run the following command to bring up your OpenSearch instance:
docker-compose up
OpenSearch will be accessible at http://localhost:9200.
python -m venv .venv source .venv/bin/activate pip install opensearch-py
We'll also structure our project as follows:
├── interfaces.py ├── main.py ├── searchservice.py ├── docker-compose.yml
In the interfaces.py file, we define our Resource and Resources classes. These will help us dynamically handle different resource types in OpenSearch (in this case, users).
from dataclasses import dataclass, field @dataclass class Resource: name: str def __post_init__(self) -> None: self.name = self.name.lower() @dataclass class Resources: users: Resource = field(default_factory=lambda: Resource("Users"))
In searchservice.py, we define an abstract class SearchService to outline the required operations. The HTTPOpenSearchService class then implements these CRUD methods, interacting with the OpenSearch client.
# coding: utf-8 import abc import logging import typing as t from dataclasses import dataclass from uuid import UUID from interfaces import Resource, Resources from opensearchpy import NotFoundError, OpenSearch resources = Resources() class SearchService(abc.ABC): def search( self, kinds: t.List[Resource], tenants_id: UUID, companies_id: UUID, query: t.Dict[str, t.Any], ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]: raise NotImplementedError def delete_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> None: raise NotImplementedError def index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> t.Dict[str, t.Any]: raise NotImplementedError def delete_document( self, kind: Resource, tenants_id: UUID, companies_id: UUID, document_id: str, ) -> t.Optional[t.Dict[str, t.Any]]: raise NotImplementedError def create_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> None: raise NotImplementedError @dataclass(frozen=True) class HTTPOpenSearchService(SearchService): client: OpenSearch def _gen_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, ) -> str: return ( f"tenant_{str(UUID(str(tenants_id)))}" f"_company_{str(UUID(str(companies_id)))}" f"_kind_{kind.name}" ) def index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> t.Dict[str, t.Any]: self.client.index( index=self._gen_index(kind, tenants_id, companies_id), body=data, id=data.get("id"), ) return data def delete_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, ) -> None: try: index = self._gen_index(kind, tenants_id, companies_id) if self.client.indices.exists(index): self.client.indices.delete(index) except NotFoundError: pass def create_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, ) -> None: body: t.Dict[str, t.Any] = {} self.client.indices.create( index=self._gen_index(kind, tenants_id, companies_id), body=body, ) def search( self, kinds: t.List[Resource], tenants_id: UUID, companies_id: UUID, query: t.Dict[str, t.Any], ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]: return self.client.search( index=",".join( [self._gen_index(kind, tenants_id, companies_id) for kind in kinds] ), body={"query": query}, ) def delete_document( self, kind: Resource, tenants_id: UUID, companies_id: UUID, document_id: str, ) -> t.Optional[t.Dict[str, t.Any]]: try: response = self.client.delete( index=self._gen_index(kind, tenants_id, companies_id), id=document_id, ) return response except Exception as e: logging.error(f"Error deleting document: {e}") return None
In main.py, we demonstrate how to:
- Create an index in OpenSearch.
- Index documents with sample user data.
- Search for documents based on a query.
- Delete a document using its ID.
main.py
# coding=utf-8 import logging import os import typing as t from uuid import uuid4 import searchservice from interfaces import Resources from opensearchpy import OpenSearch resources = Resources() logging.basicConfig(level=logging.INFO) search_service = searchservice.HTTPOpenSearchService( client=OpenSearch( hosts=[ { "host": os.getenv("OPENSEARCH_HOST", "localhost"), "port": os.getenv("OPENSEARCH_PORT", "9200"), } ], http_auth=( os.getenv("OPENSEARCH_USERNAME", ""), os.getenv("OPENSEARCH_PASSWORD", ""), ), use_ssl=False, verify_certs=False, ), ) tenants_id: str = "f0835e2d-bd68-406c-99a7-ad63a51e9ef9" companies_id: str = "bf58c749-c90a-41e2-b66f-6d98aae17a6c" search_str: str = "frank" document_id_to_delete: str = str(uuid4()) fake_data: t.List[t.Dict[str, t.Any]] = [ {"id": document_id_to_delete, "name": "Franklin", "tech": "python,node,golang"}, {"id": str(uuid4()), "name": "Jarvis", "tech": "AI"}, {"id": str(uuid4()), "name": "Parry", "tech": "Golang"}, {"id": str(uuid4()), "name": "Steve", "tech": "iOS"}, {"id": str(uuid4()), "name": "Frank", "tech": "node"}, ] search_service.delete_index( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id ) search_service.create_index( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id, ) for item in fake_data: search_service.index( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id, data=dict(tenants_id=tenants_id, companies_id=companies_id, **item), ) search_query: t.Dict[str, t.Any] = { "bool": { "must": [], "must_not": [], "should": [], "filter": [ {"term": {"tenants_id.keyword": tenants_id}}, {"term": {"companies_id.keyword": companies_id}}, ], } } search_query["bool"]["must"].append( { "multi_match": { "query": search_str, "type": "phrase_prefix", "fields": ["name", "tech"], } } ) search_results = search_service.search( kinds=[resources.users], tenants_id=tenants_id, companies_id=companies_id, query=search_query, ) final_result = search_results.get("hits", {}).get("hits", []) for item in final_result: logging.info(["Item -> ", item.get("_source", {})]) deleted_result = search_service.delete_document( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id, document_id=document_id_to_delete, ) logging.info(["Deleted result -> ", deleted_result])
docker compose up
python main.py
It should print found & deleted records information.
In this blog, we’ve demonstrated how to set up OpenSearch locally using Docker and perform basic CRUD operations with Python. OpenSearch provides a powerful and scalable solution for managing and querying large datasets. While this guide focuses on integrating OpenSearch with dummy data, in real-world applications, OpenSearch is often used as a read-optimized store for faster data retrieval. In such cases, it is common to implement different indexing strategies to ensure data consistency by updating both the primary database and OpenSearch concurrently.
This ensures that OpenSearch remains in sync with your primary data source, optimizing both performance and accuracy in data retrieval.
https://github.com/FranklinThaker/opensearch-integration-example
以上是Mastering CRUD Operations with OpenSearch in Python: A Practical Guide的详细内容。更多信息请关注PHP中文网其他相关文章!