MongoDB 中的變更流可讓您的應用程式立即對即時資料變更做出反應。在這篇文章中,我將向您展示如何使用 Python 設定和使用變更流,而無需深入研究理論。我們將創建一個簡單的程式來監聽資料庫事件,首先專注於插入,然後將其擴展到其他事件類型。
更改流讓您的應用程式能夠偵聽特定的資料庫事件,例如插入或更新,並立即回應。想像用戶更新其個人資料的場景;透過更改流,您可以立即在應用程式中反映此更改,而無需用戶刷新頁面。在此功能之前,您必須不斷輪詢資料庫或使用複雜的方法,例如追蹤 MongoDB Oplog。變更流透過提供更用戶友好的 API 來簡化這一過程。
假設我有一個上傳發票的 API。流程是客戶將發票圖像上傳到 MongoDB,然後我們使用 AI 提取資訊並更新發票。以下是上傳發票的程式碼範例:
from pymongo import MongoClient class MongoDatabase: def __init__(self, config_path: str): # Load the YAML configuration file using the provided utility function self.config_path = config_path self.config = read_config(path=self.config_path) # Initialize MongoDB connection self.client = MongoClient(self.config['mongodb']['uri']) self.db = self.client[self.config['mongodb']['database']] self.collection = self.db[self.config['mongodb']['collection']] def create_document(self, data: Dict[str, Any]) -> str: # Insert a new document and return the automatically generated document ID result = self.collection.insert_one(data) return str(result.inserted_id) def update_document_by_id(self, document_id: str, data: Dict[str, Any]): try: self.collection.update_one({"_id": document_id}, {"$set": data}) except PyMongoError as e: print(f"Error updating document: {e}")
首先,我將把 pymongo 包裝在一個類別中,以防萬一:))
@app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) # Generate invoice UUID current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "last_modified_at": None, "last_modified_by": None, "status": "not extracted", "invoice_image_base64": img, "invoice_info": {} } invoice_uuid = mongo_db.create_document(invoice_document) print('Result saved to MongoDB:', invoice_uuid) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) return JSONResponse( status_code=status.HTTP_201_CREATED, content={"invoice_uuid": invoice_uuid, "message": "Upload successful"} ) except Exception as e: # Handle errors return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
一個合理的問題可能是:為什麼不等到 AI 模型處理完影像後再更新?問題是處理時間大約需要 4-5 分鐘,我們不想影響使用者體驗。
另一個選擇是使用 Kafka。我們可以將影像發佈到 Kafka 主題,然後另一個服務將處理資料。
優點:
缺點:
這是一個基本實現,示範如何使用 Kafka 處理發票上傳過程。
使用者透過 API 端點上傳發票。發票影像保存在 MongoDB 中,並向 Kafka 主題發送訊息以進行進一步處理。
from kafka import KafkaProducer import json from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse from datetime import datetime, timezone app = FastAPI() producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "status": "not extracted", "invoice_image_base64": img, } # Save the document to MongoDB invoice_uuid = mongo_db.create_document(invoice_document) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) # Send a message to Kafka topic producer.send('invoice_topic', invoice_document) producer.flush() return JSONResponse( status_code=status.HTTP_201_CREATED, content={"message": "Invoice upload received and will be processed"} ) except Exception as e: return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
Kafka消費者監聽invoice_topic。當它收到訊息時,它會處理發票(例如,從圖像中提取資訊)並更新 MongoDB 中的相應文件。
from kafka import KafkaConsumer import json consumer = KafkaConsumer( 'invoice_topic', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) for message in consumer: invoice_document = message.value # Process the invoice, extract information, and update the document in MongoDB invoice_uuid = invoice_document["_id"] extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) mongo_db.update_document_by_id(invoice_uuid, { "invoice_info": extracted_data, "status": "extracted" }) print(f"Processed and updated invoice: {invoice_uuid}")
流程摘要:
哇,我不敢相信我自己寫了這個!它確實凸顯了所涉及的努力。這甚至還沒有考慮管理和配置三個服務的複雜性:MongoDB、Kafka 和 Invoice 服務。
這是用 Markdown 重寫的完整程式碼,用於演示 MongoDB 變更流,包括用於處理由變更流觸發的發票處理的其他方法和函數。
我們將首先建立一個 MongoDB 包裝類別來處理資料庫操作,例如建立文件和監聽更改流。
from pymongo import MongoClient from pymongo.errors import PyMongoError from typing import Dict, Any import threading import yaml class MongoDatabase: # Same code as before # def process_invoice(self, invoice_document: Dict[str, Any]): """Process the invoice by extracting data and updating the document in MongoDB.""" try: # Simulate extracting information from the invoice image extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) invoice_uuid = invoice_document["_id"] # Update the invoice document with the extracted data self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"}) print(f"Processed and updated invoice: {invoice_uuid}") except Exception as e: print(f"Error processing invoice: {str(e)}") def start_change_stream_listener(self): """Start listening to the change stream for the collection.""" def listen(): try: with self.collection.watch() as stream: for change in stream: if change['operationType'] == 'insert': invoice_document = change['fullDocument'] print(f"New invoice detected: {invoice_document['_id']}") self.process_invoice(invoice_document) except PyMongoError as e: print(f"Change stream error: {str(e)}") # Start the change stream listener in a separate thread listener_thread = threading.Thread(target=listen, daemon=True) listener_thread.start()
為了方便起見,我在 MongoDatabase 類別中新增了 process_invoice。但你應該把它留在其他地方
上傳API應該跟原來的一樣。
mongo_db = MongoDatabase(config_path='path_to_your_config.yaml') mongo_db.start_change_stream_listener() @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() # same code as before
流程摘要:
借助 MongoDB 變更流,您可以有效率地處理資料庫中的即時變更。擴展此範例,您可以處理各種事件,例如更新和刪除,使您的應用程式更具反應性和響應性。
以上是使用 MongoDB Change Streams 和 Python 進行即時資料處理的詳細內容。更多資訊請關注PHP中文網其他相關文章!