首頁  >  文章  >  後端開發  >  使用 MongoDB Change Streams 和 Python 進行即時資料處理

使用 MongoDB Change Streams 和 Python 進行即時資料處理

PHPz
PHPz原創
2024-09-12 16:15:12695瀏覽

Real-Time Data Processing with MongoDB Change Streams and Python

介紹

MongoDB 中的變更流可讓您的應用程式立即對即時資料變更做出反應。在這篇文章中,我將向您展示如何使用 Python 設定和使用變更流,而無需深入研究理論。我們將創建一個簡單的程式來監聽資料庫事件,首先專注於插入,然後將其擴展到其他事件類型。

變革流入門

更改流讓您的應用程式能夠偵聽特定的資料庫事件,例如插入或更新,並立即回應。想像用戶更新其個人資料的場景;透過更改流,您可以立即在應用程式中反映此更改,而無需用戶刷新頁面。在此功能之前,您必須不斷輪詢資料庫或使用複雜的方法,例如追蹤 MongoDB Oplog。變更流透過提供更用戶友好的 API 來簡化這一過程。

如果沒有變更流會發生什麼

假設我有一個上傳發票的 API。流程是客戶將發票圖像上傳到 MongoDB,然後我們使用 AI 提取資訊並更新發票。以下是上傳發票的程式碼範例:

from pymongo import MongoClient

class MongoDatabase:
    def __init__(self, config_path: str):
        # Load the YAML configuration file using the provided utility function
        self.config_path = config_path
        self.config = read_config(path=self.config_path)

        # Initialize MongoDB connection
        self.client = MongoClient(self.config['mongodb']['uri'])
        self.db = self.client[self.config['mongodb']['database']]
        self.collection = self.db[self.config['mongodb']['collection']]

    def create_document(self, data: Dict[str, Any]) -> str:
        # Insert a new document and return the automatically generated document ID
        result = self.collection.insert_one(data)
        return str(result.inserted_id)
    def update_document_by_id(self, document_id: str, data: Dict[str, Any]):
        try:
            self.collection.update_one({"_id": document_id}, {"$set": data})
        except PyMongoError as e:
            print(f"Error updating document: {e}")

首先,我將把 pymongo 包裝在一個類別中,以防萬一:))

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        # Generate invoice UUID
        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "last_modified_at": None,
            "last_modified_by": None,
            "status": "not extracted",
            "invoice_image_base64": img,
            "invoice_info": {}
        }

        invoice_uuid = mongo_db.create_document(invoice_document)
        print('Result saved to MongoDB:', invoice_uuid)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"invoice_uuid": invoice_uuid, "message": "Upload successful"}
        )
    except Exception as e:
        # Handle errors
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )

一個合理的問題可能是:為什麼不等到 AI 模型處理完影像後再更新?問題是處理時間大約需要 4-5 分鐘,我們不想影響使用者體驗。

卡夫卡怎麼樣?

另一個選擇是使用 Kafka。我們可以將影像發佈到 Kafka 主題,然後另一個服務將處理資料。

優點:

  • 解耦上傳和處理服務。
  • 高效進行大規模即時資料處理。
  • 改善使用者體驗:使用者上傳發票後立即得到回應。處理是異步處理的。

缺點:

  • 引入了額外的複雜性。
  • 需要設置和維護 Kafka 基礎設施。
  • 對於小型應用程式來說可能有點過分了。

這是一個基本實現,示範如何使用 Kafka 處理發票上傳過程。

使用者透過 API 端點上傳發票。發票影像保存在 MongoDB 中,並向 Kafka 主題發送訊息以進行進一步處理。

from kafka import KafkaProducer
import json
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from datetime import datetime, timezone

app = FastAPI()

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "status": "not extracted",
            "invoice_image_base64": img,
        }

        # Save the document to MongoDB
        invoice_uuid = mongo_db.create_document(invoice_document)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        # Send a message to Kafka topic
        producer.send('invoice_topic', invoice_document)
        producer.flush()

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"message": "Invoice upload received and will be processed"}
        )
    except Exception as e:
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )

Kafka消費者監聽invoice_topic。當它收到訊息時,它會處理發票(例如,從圖像中提取資訊)並更新 MongoDB 中的相應文件。

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'invoice_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    invoice_document = message.value

    # Process the invoice, extract information, and update the document in MongoDB
    invoice_uuid = invoice_document["_id"]
    extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])

    mongo_db.update_document_by_id(invoice_uuid, {
        "invoice_info": extracted_data, 
        "status": "extracted"
    })

    print(f"Processed and updated invoice: {invoice_uuid}")

流程摘要:

  1. 上傳發票:使用者透過API上傳發票。
  2. 儲存到 MongoDB:發票文件保存在 MongoDB 中。
  3. 發送訊息到 Kafka: 包含發票詳細資訊的訊息會傳送到 Kafka 主題 (invoice_topic)。
  4. Kafka 消費者處理發票: Kafka 消費者監聽發票主題,處理發票,並使用提取的資訊更新 MongoDB 中的對應文件。

哇,我不敢相信我自己寫了這個!它確實凸顯了所涉及的努力。這甚至還沒有考慮管理和配置三個服務的複雜性:MongoDB、Kafka 和 Invoice 服務。

使用 MongoDB Change Streams 進行發票處理

這是用 Markdown 重寫的完整程式碼,用於演示 MongoDB 變更流,包括用於處理由變更流觸發的發票處理的其他方法和函數。

我們將首先建立一個 MongoDB 包裝類別來處理資料庫操作,例如建立文件和監聽更改流。

from pymongo import MongoClient
from pymongo.errors import PyMongoError
from typing import Dict, Any
import threading
import yaml

class MongoDatabase:
    # Same code as before #

    def process_invoice(self, invoice_document: Dict[str, Any]):
        """Process the invoice by extracting data and updating the document in MongoDB."""
        try:
            # Simulate extracting information from the invoice image
            extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
            invoice_uuid = invoice_document["_id"]

            # Update the invoice document with the extracted data
            self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"})
            print(f"Processed and updated invoice: {invoice_uuid}")
        except Exception as e:
            print(f"Error processing invoice: {str(e)}")

    def start_change_stream_listener(self):
        """Start listening to the change stream for the collection."""
        def listen():
            try:
                with self.collection.watch() as stream:
                    for change in stream:
                        if change['operationType'] == 'insert':
                            invoice_document = change['fullDocument']
                            print(f"New invoice detected: {invoice_document['_id']}")
                            self.process_invoice(invoice_document)
            except PyMongoError as e:
                print(f"Change stream error: {str(e)}")

        # Start the change stream listener in a separate thread
        listener_thread = threading.Thread(target=listen, daemon=True)
        listener_thread.start()

為了方便起見,我在 MongoDatabase 類別中新增了 process_invoice。但你應該把它留在其他地方

上傳API應該跟原來的一樣。

mongo_db = MongoDatabase(config_path='path_to_your_config.yaml')
mongo_db.start_change_stream_listener()

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json() 
        # same code as before

流程摘要:

  1. 用戶上傳發票:用戶透過API上傳發票。
  2. 儲存到 MongoDB:發票文件保存在 MongoDB 中。
  3. 觸發 MongoDB 變更流: MongoDB 變更流偵測到新文件的插入。
  4. 發票處理: 變更流觸發 process_invoice 函數,該函數處理發票並使用提取的資訊更新 MongoDB 中的文件。

結論

借助 MongoDB 變更流,您可以有效率地處理資料庫中的即時變更。擴展此範例,您可以處理各種事件,例如更新和刪除,使您的應用程式更具反應性和響應性。

參考:

  • https://www.mongodb.com/developer/languages/python/python-change-streams/#listen-to-inserts-from-an-application

以上是使用 MongoDB Change Streams 和 Python 進行即時資料處理的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn