Rumah  >  Artikel  >  pembangunan bahagian belakang  >  Pemprosesan Data Masa Nyata dengan MongoDB Change Streams dan Python

Pemprosesan Data Masa Nyata dengan MongoDB Change Streams dan Python

PHPz
PHPzasal
2024-09-12 16:15:12477semak imbas

Real-Time Data Processing with MongoDB Change Streams and Python

Introduction

Les flux de modifications dans MongoDB permettent à votre application de réagir instantanément aux modifications de données en temps réel. Dans cet article de blog, je vais vous montrer comment configurer et utiliser des flux de modifications avec Python, sans trop plonger dans la théorie. Nous allons créer un programme simple qui écoute les événements de la base de données, en nous concentrant d'abord sur les insertions, puis en l'étendant à d'autres types d'événements.

Premiers pas avec les flux de modifications

Les flux de modifications permettent à votre application d'écouter des événements de base de données spécifiques, tels que des insertions ou des mises à jour, et de répondre immédiatement. Imaginez un scénario dans lequel un utilisateur met à jour son profil ; avec les flux de modifications, vous pouvez refléter instantanément ce changement dans votre application sans que l'utilisateur ait besoin d'actualiser la page. Avant cette fonctionnalité, vous deviez constamment interroger la base de données ou utiliser des méthodes complexes comme suivre MongoDB Oplog. Les flux de modifications simplifient cela en fournissant une API plus conviviale.

Que se passe-t-il sans flux de changement

Disons que j'ai une API pour télécharger des factures. Le flux est que les clients téléchargent une image de la facture sur MongoDB, puis nous extrayons les informations avec l'IA et mettons à jour la facture. Voici un exemple de code pour télécharger une facture :

from pymongo import MongoClient

class MongoDatabase:
    def __init__(self, config_path: str):
        # Load the YAML configuration file using the provided utility function
        self.config_path = config_path
        self.config = read_config(path=self.config_path)

        # Initialize MongoDB connection
        self.client = MongoClient(self.config['mongodb']['uri'])
        self.db = self.client[self.config['mongodb']['database']]
        self.collection = self.db[self.config['mongodb']['collection']]

    def create_document(self, data: Dict[str, Any]) -> str:
        # Insert a new document and return the automatically generated document ID
        result = self.collection.insert_one(data)
        return str(result.inserted_id)
    def update_document_by_id(self, document_id: str, data: Dict[str, Any]):
        try:
            self.collection.update_one({"_id": document_id}, {"$set": data})
        except PyMongoError as e:
            print(f"Error updating document: {e}")

Je vais d'abord envelopper le pymongo dans une classe, juste au cas où :))

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        # Generate invoice UUID
        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "last_modified_at": None,
            "last_modified_by": None,
            "status": "not extracted",
            "invoice_image_base64": img,
            "invoice_info": {}
        }

        invoice_uuid = mongo_db.create_document(invoice_document)
        print('Result saved to MongoDB:', invoice_uuid)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"invoice_uuid": invoice_uuid, "message": "Upload successful"}
        )
    except Exception as e:
        # Handle errors
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )

Une question raisonnable pourrait être : pourquoi ne pas attendre que le modèle d'IA traite l'image avant de la mettre à jour ? Le problème est que le traitement prend environ 4 à 5 minutes et nous ne voulons pas affecter l'expérience utilisateur.

Et Kafka ?

Une autre option pourrait être d'utiliser Kafka. Nous pourrions publier l'image sur un sujet Kafka et un autre service traiterait les données.

Avantages :

  • Dissocie les services de téléchargement et de traitement.
  • Efficace pour le traitement de données à grande échelle en temps réel.
  • Expérience utilisateur améliorée : les utilisateurs reçoivent une réponse immédiate après avoir téléchargé la facture. Le traitement est géré de manière asynchrone.

Inconvénients :

  • Introduit une complexité supplémentaire.
  • Nécessite la configuration et la maintenance de l'infrastructure Kafka.
  • Peut être excessif pour les applications à petite échelle.

Voici une implémentation de base pour démontrer l'utilisation de Kafka pour gérer le processus de téléchargement des factures.

L'utilisateur télécharge une facture via un point de terminaison API. L'image de la facture est enregistrée dans MongoDB et un message est envoyé à un sujet Kafka pour un traitement ultérieur.

from kafka import KafkaProducer
import json
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from datetime import datetime, timezone

app = FastAPI()

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "status": "not extracted",
            "invoice_image_base64": img,
        }

        # Save the document to MongoDB
        invoice_uuid = mongo_db.create_document(invoice_document)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        # Send a message to Kafka topic
        producer.send('invoice_topic', invoice_document)
        producer.flush()

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"message": "Invoice upload received and will be processed"}
        )
    except Exception as e:
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )

Le consommateur Kafka écoute le bill_topic. Lorsqu'il reçoit un message, il traite la facture (par exemple en extrayant les informations de l'image) et met à jour le document correspondant dans MongoDB.

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'invoice_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    invoice_document = message.value

    # Process the invoice, extract information, and update the document in MongoDB
    invoice_uuid = invoice_document["_id"]
    extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])

    mongo_db.update_document_by_id(invoice_uuid, {
        "invoice_info": extracted_data, 
        "status": "extracted"
    })

    print(f"Processed and updated invoice: {invoice_uuid}")

Résumé du flux :

  1. Télécharger la facture : L'utilisateur télécharge une facture via l'API.
  2. Enregistrer dans MongoDB : Le document de facturation est enregistré dans MongoDB.
  3. Envoyer un message à Kafka : Un message contenant les détails de la facture est envoyé à un sujet Kafka (invoice_topic).
  4. Kafka Consumer Processes Invoice : Un consommateur Kafka écoute bill_topic, traite la facture et met à jour le document correspondant dans MongoDB avec les informations extraites.

Wow, je n'arrive pas à croire que j'ai réussi à écrire ça tout seul ! Cela met vraiment en valeur l’effort impliqué. Et cela sans même tenir compte de la complexité de la gestion et de la configuration des trois services : MongoDB, Kafka et le service Invoice.

Traitement des factures avec MongoDB Change Streams

Voici le code complet réécrit dans Markdown pour démontrer les flux de modifications MongoDB, y compris des méthodes et fonctions supplémentaires pour gérer le traitement des factures déclenché par le flux de modifications.

Nous allons commencer par créer une classe wrapper MongoDB qui gère les opérations de base de données telles que la création de documents et l'écoute des flux de modifications.

from pymongo import MongoClient
from pymongo.errors import PyMongoError
from typing import Dict, Any
import threading
import yaml

class MongoDatabase:
    # Same code as before #

    def process_invoice(self, invoice_document: Dict[str, Any]):
        """Process the invoice by extracting data and updating the document in MongoDB."""
        try:
            # Simulate extracting information from the invoice image
            extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
            invoice_uuid = invoice_document["_id"]

            # Update the invoice document with the extracted data
            self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"})
            print(f"Processed and updated invoice: {invoice_uuid}")
        except Exception as e:
            print(f"Error processing invoice: {str(e)}")

    def start_change_stream_listener(self):
        """Start listening to the change stream for the collection."""
        def listen():
            try:
                with self.collection.watch() as stream:
                    for change in stream:
                        if change['operationType'] == 'insert':
                            invoice_document = change['fullDocument']
                            print(f"New invoice detected: {invoice_document['_id']}")
                            self.process_invoice(invoice_document)
            except PyMongoError as e:
                print(f"Change stream error: {str(e)}")

        # Start the change stream listener in a separate thread
        listener_thread = threading.Thread(target=listen, daemon=True)
        listener_thread.start()

Pour faciliter les choses, j'ajoute process_invoice dans la classe MongoDatabase. Mais tu devrais le laisser ailleurs

L'API de téléchargement doit être comme celle d'origine.

mongo_db = MongoDatabase(config_path='path_to_your_config.yaml')
mongo_db.start_change_stream_listener()

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json() 
        # same code as before

Résumé du flux :

  1. Invois Muat Naik Pengguna: Pengguna memuat naik invois melalui API.
  2. Simpan ke MongoDB: Dokumen invois disimpan dalam MongoDB.
  3. Strim Perubahan MongoDB Dicetuskan: Strim perubahan MongoDB mengesan penyisipan dokumen baharu.
  4. Pemprosesan Invois: Strim perubahan mencetuskan fungsi process_invoice, yang memproses invois dan mengemas kini dokumen dalam MongoDB dengan maklumat yang diekstrak.

Kesimpulan

Dengan aliran perubahan MongoDB, anda boleh memproses perubahan masa nyata dengan cekap dalam pangkalan data anda. Memanjangkan contoh ini, anda boleh mengendalikan pelbagai acara seperti kemas kini dan pemadaman, menjadikan aplikasi anda lebih reaktif dan responsif.

Rujukan:

  • https://www.mongodb.com/developer/languages/python/python-change-streams/#listen-to-inserts-from-an-application

Atas ialah kandungan terperinci Pemprosesan Data Masa Nyata dengan MongoDB Change Streams dan Python. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn