ホームページ  >  記事  >  バックエンド開発  >  MongoDB 変更ストリームと Python を使用したリアルタイム データ処理

MongoDB 変更ストリームと Python を使用したリアルタイム データ処理

PHPz
PHPzオリジナル
2024-09-12 16:15:12729ブラウズ

Real-Time Data Processing with MongoDB Change Streams and Python

導入

MongoDB の変更ストリームにより、アプリケーションはリアルタイムのデータ変更に即座に対応できます。このブログ投稿では、理論にはあまり立ち入りませんが、Python で変更ストリームを設定して使用する方法を説明します。最初に挿入に焦点を当て、それを他のイベント タイプに拡張して、データベース イベントをリッスンする単純なプログラムを作成します。

変更ストリームの使用を開始する

変更ストリームを使用すると、アプリは挿入や更新などの特定のデータベース イベントをリッスンし、すぐに応答できます。ユーザーが自分のプロフィールを更新するシナリオを想像してください。変更ストリームを使用すると、ユーザーがページを更新しなくても、この変更をアプリ全体に即座に反映できます。この機能が登場する前は、データベースを継続的にポーリングするか、MongoDB Oplog を追跡するなどの複雑な方法を使用する必要がありました。変更ストリームは、よりユーザーフレンドリーな API を提供することでこれを簡素化します。

変更ストリームがないとどうなるか

請求書をアップロードするための API があるとします。お客様が請求書の画像をMongoDBにアップロードしていただき、AIで情報を抽出して請求書を更新するという流れになります。請求書をアップロードするコードの例は次のとおりです:

from pymongo import MongoClient

class MongoDatabase:
    def __init__(self, config_path: str):
        # Load the YAML configuration file using the provided utility function
        self.config_path = config_path
        self.config = read_config(path=self.config_path)

        # Initialize MongoDB connection
        self.client = MongoClient(self.config['mongodb']['uri'])
        self.db = self.client[self.config['mongodb']['database']]
        self.collection = self.db[self.config['mongodb']['collection']]

    def create_document(self, data: Dict[str, Any]) -> str:
        # Insert a new document and return the automatically generated document ID
        result = self.collection.insert_one(data)
        return str(result.inserted_id)
    def update_document_by_id(self, document_id: str, data: Dict[str, Any]):
        try:
            self.collection.update_one({"_id": document_id}, {"$set": data})
        except PyMongoError as e:
            print(f"Error updating document: {e}")

念のため、まず pymongo をクラス内にラップします:))

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        # Generate invoice UUID
        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "last_modified_at": None,
            "last_modified_by": None,
            "status": "not extracted",
            "invoice_image_base64": img,
            "invoice_info": {}
        }

        invoice_uuid = mongo_db.create_document(invoice_document)
        print('Result saved to MongoDB:', invoice_uuid)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"invoice_uuid": invoice_uuid, "message": "Upload successful"}
        )
    except Exception as e:
        # Handle errors
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )

合理的な質問は次のとおりです。AI モデルが画像を処理するまで待ってから更新しないのはなぜですか?問題は、処理に約 4 ~ 5 分かかることですが、ユーザー エクスペリエンスに影響を与えたくないということです。

カフカはどうでしょうか?

もう 1 つのオプションは、Kafka を使用することです。イメージを Kafka トピックに公開すると、別のサービスがデータを処理できます。

長所:

  • アップロードと処理サービスを分離します。
  • 大規模なリアルタイム データ処理に効率的です。
  • ユーザー エクスペリエンスの向上: ユーザーは請求書をアップロードした後、すぐに応答を受け取ります。処理は非同期で処理されます。

短所:

  • さらに複雑になります。
  • Kafka インフラストラクチャのセットアップとメンテナンスが必要です。
  • 小規模なアプリケーションには過剰になる可能性があります。

ここでは、Kafka を使用して請求書のアップロード プロセスを処理する方法を示す基本的な実装を示します。

ユーザーは API エンドポイントを通じて請求書をアップロードします。請求書の画像は MongoDB に保存され、メッセージはさらなる処理のために Kafka トピックに送信されます。

from kafka import KafkaProducer
import json
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from datetime import datetime, timezone

app = FastAPI()

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "status": "not extracted",
            "invoice_image_base64": img,
        }

        # Save the document to MongoDB
        invoice_uuid = mongo_db.create_document(invoice_document)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        # Send a message to Kafka topic
        producer.send('invoice_topic', invoice_document)
        producer.flush()

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"message": "Invoice upload received and will be processed"}
        )
    except Exception as e:
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )

Kafka コンシューマーは、invoice_topic をリッスンします。メッセージを受信すると、請求書を処理し (画像から情報を抽出するなど)、MongoDB 内の対応するドキュメントを更新します。

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'invoice_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    invoice_document = message.value

    # Process the invoice, extract information, and update the document in MongoDB
    invoice_uuid = invoice_document["_id"]
    extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])

    mongo_db.update_document_by_id(invoice_uuid, {
        "invoice_info": extracted_data, 
        "status": "extracted"
    })

    print(f"Processed and updated invoice: {invoice_uuid}")

フローの概要:

  1. 請求書のアップロード: ユーザーは API を通じて請求書をアップロードします。
  2. MongoDB に保存: 請求書ドキュメントは MongoDB に保存されます。
  3. Kafka にメッセージを送信: 請求書の詳細を含むメッセージが Kafka トピック (invoice_topic) に送信されます。
  4. Kafka Consumer Processes Invoice: Kafka Consumer は、invoice_topic をリッスンし、請求書を処理し、抽出された情報で MongoDB 内の対応するドキュメントを更新します。

わあ、これを自分で書くことができたなんて信じられません!それは、それに伴う努力を本当に強調しています。それには、MongoDB、Kafka、Invoice サービスの 3 つのサービスの管理と構成の複雑さは考慮されていません。

MongoDB Change Streams を使用した請求書処理

ここでは、MongoDB 変更ストリームを示すために Markdown で書き直された完全なコードを示します。これには、変更ストリームによってトリガーされる請求書処理を処理する追加のメソッドと関数が含まれます。

ドキュメントの作成や変更ストリームの待機などのデータベース操作を処理する MongoDB ラッパー クラスを作成することから始めます。

from pymongo import MongoClient
from pymongo.errors import PyMongoError
from typing import Dict, Any
import threading
import yaml

class MongoDatabase:
    # Same code as before #

    def process_invoice(self, invoice_document: Dict[str, Any]):
        """Process the invoice by extracting data and updating the document in MongoDB."""
        try:
            # Simulate extracting information from the invoice image
            extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
            invoice_uuid = invoice_document["_id"]

            # Update the invoice document with the extracted data
            self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"})
            print(f"Processed and updated invoice: {invoice_uuid}")
        except Exception as e:
            print(f"Error processing invoice: {str(e)}")

    def start_change_stream_listener(self):
        """Start listening to the change stream for the collection."""
        def listen():
            try:
                with self.collection.watch() as stream:
                    for change in stream:
                        if change['operationType'] == 'insert':
                            invoice_document = change['fullDocument']
                            print(f"New invoice detected: {invoice_document['_id']}")
                            self.process_invoice(invoice_document)
            except PyMongoError as e:
                print(f"Change stream error: {str(e)}")

        # Start the change stream listener in a separate thread
        listener_thread = threading.Thread(target=listen, daemon=True)
        listener_thread.start()

簡単にするために、MongoDatabase クラス内に process_invoice を追加します。でも、それは別の場所に置いておくべきです

アップロード API はオリジナルのものと同様である必要があります。

mongo_db = MongoDatabase(config_path='path_to_your_config.yaml')
mongo_db.start_change_stream_listener()

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json() 
        # same code as before

フローの概要:

  1. ユーザーが請求書をアップロード: ユーザーは API を通じて請求書をアップロードします。
  2. MongoDB に保存: 請求書ドキュメントは MongoDB に保存されます。
  3. MongoDB 変更ストリームがトリガーされました: MongoDB 変更ストリームは、新しいドキュメントの挿入を検出します。
  4. 請求書の処理: 変更ストリームは process_invoice 関数をトリガーし、請求書を処理し、抽出された情報で MongoDB 内のドキュメントを更新します。

結論

MongoDB 変更ストリームを使用すると、データベース内のリアルタイムの変更を効率的に処理できます。この例を拡張すると、更新や削除などのさまざまなイベントを処理でき、アプリケーションの反応性と応答性が向上します。

参照:

  • https://www.mongodb.com/developer/langages/python/python-change-streams/#listen-to-inserts-from-an-application

以上がMongoDB 変更ストリームと Python を使用したリアルタイム データ処理の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。