Maison > Article > développement back-end > Traitement des données en temps réel avec MongoDB Change Streams et Python
Les flux de modifications dans MongoDB permettent à votre application de réagir instantanément aux modifications de données en temps réel. Dans cet article de blog, je vais vous montrer comment configurer et utiliser des flux de modifications avec Python, sans trop plonger dans la théorie. Nous allons créer un programme simple qui écoute les événements de la base de données, en nous concentrant d'abord sur les insertions, puis en l'étendant à d'autres types d'événements.
Les flux de modifications permettent à votre application d'écouter des événements de base de données spécifiques, tels que des insertions ou des mises à jour, et de répondre immédiatement. Imaginez un scénario dans lequel un utilisateur met à jour son profil ; avec les flux de modifications, vous pouvez refléter instantanément ce changement dans votre application sans que l'utilisateur ait besoin d'actualiser la page. Avant cette fonctionnalité, vous deviez constamment interroger la base de données ou utiliser des méthodes complexes comme suivre MongoDB Oplog. Les flux de modifications simplifient cela en fournissant une API plus conviviale.
Disons que j'ai une API pour télécharger des factures. Le flux est que les clients téléchargent une image de la facture sur MongoDB, puis nous extrayons les informations avec l'IA et mettons à jour la facture. Voici un exemple de code pour télécharger une facture :
from pymongo import MongoClient class MongoDatabase: def __init__(self, config_path: str): # Load the YAML configuration file using the provided utility function self.config_path = config_path self.config = read_config(path=self.config_path) # Initialize MongoDB connection self.client = MongoClient(self.config['mongodb']['uri']) self.db = self.client[self.config['mongodb']['database']] self.collection = self.db[self.config['mongodb']['collection']] def create_document(self, data: Dict[str, Any]) -> str: # Insert a new document and return the automatically generated document ID result = self.collection.insert_one(data) return str(result.inserted_id) def update_document_by_id(self, document_id: str, data: Dict[str, Any]): try: self.collection.update_one({"_id": document_id}, {"$set": data}) except PyMongoError as e: print(f"Error updating document: {e}")
Je vais d'abord envelopper le pymongo dans une classe, juste au cas où :))
@app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) # Generate invoice UUID current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "last_modified_at": None, "last_modified_by": None, "status": "not extracted", "invoice_image_base64": img, "invoice_info": {} } invoice_uuid = mongo_db.create_document(invoice_document) print('Result saved to MongoDB:', invoice_uuid) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) return JSONResponse( status_code=status.HTTP_201_CREATED, content={"invoice_uuid": invoice_uuid, "message": "Upload successful"} ) except Exception as e: # Handle errors return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
Une question raisonnable pourrait être : pourquoi ne pas attendre que le modèle d'IA traite l'image avant de la mettre à jour ? Le problème est que le traitement prend environ 4 à 5 minutes et nous ne voulons pas affecter l'expérience utilisateur.
Une autre option pourrait être d'utiliser Kafka. Nous pourrions publier l'image sur un sujet Kafka et un autre service traiterait les données.
Avantages :
Inconvénients :
Voici une implémentation de base pour démontrer l'utilisation de Kafka pour gérer le processus de téléchargement des factures.
L'utilisateur télécharge une facture via un point de terminaison API. L'image de la facture est enregistrée dans MongoDB et un message est envoyé à un sujet Kafka pour un traitement ultérieur.
from kafka import KafkaProducer import json from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse from datetime import datetime, timezone app = FastAPI() producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "status": "not extracted", "invoice_image_base64": img, } # Save the document to MongoDB invoice_uuid = mongo_db.create_document(invoice_document) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) # Send a message to Kafka topic producer.send('invoice_topic', invoice_document) producer.flush() return JSONResponse( status_code=status.HTTP_201_CREATED, content={"message": "Invoice upload received and will be processed"} ) except Exception as e: return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
Le consommateur Kafka écoute le bill_topic. Lorsqu'il reçoit un message, il traite la facture (par exemple en extrayant les informations de l'image) et met à jour le document correspondant dans MongoDB.
from kafka import KafkaConsumer import json consumer = KafkaConsumer( 'invoice_topic', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) for message in consumer: invoice_document = message.value # Process the invoice, extract information, and update the document in MongoDB invoice_uuid = invoice_document["_id"] extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) mongo_db.update_document_by_id(invoice_uuid, { "invoice_info": extracted_data, "status": "extracted" }) print(f"Processed and updated invoice: {invoice_uuid}")
Résumé du flux :
Wow, je n'arrive pas à croire que j'ai réussi à écrire ça tout seul ! Cela met vraiment en valeur l’effort impliqué. Et cela sans même tenir compte de la complexité de la gestion et de la configuration des trois services : MongoDB, Kafka et le service Invoice.
Voici le code complet réécrit dans Markdown pour démontrer les flux de modifications MongoDB, y compris des méthodes et fonctions supplémentaires pour gérer le traitement des factures déclenché par le flux de modifications.
Nous allons commencer par créer une classe wrapper MongoDB qui gère les opérations de base de données telles que la création de documents et l'écoute des flux de modifications.
from pymongo import MongoClient from pymongo.errors import PyMongoError from typing import Dict, Any import threading import yaml class MongoDatabase: # Same code as before # def process_invoice(self, invoice_document: Dict[str, Any]): """Process the invoice by extracting data and updating the document in MongoDB.""" try: # Simulate extracting information from the invoice image extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) invoice_uuid = invoice_document["_id"] # Update the invoice document with the extracted data self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"}) print(f"Processed and updated invoice: {invoice_uuid}") except Exception as e: print(f"Error processing invoice: {str(e)}") def start_change_stream_listener(self): """Start listening to the change stream for the collection.""" def listen(): try: with self.collection.watch() as stream: for change in stream: if change['operationType'] == 'insert': invoice_document = change['fullDocument'] print(f"New invoice detected: {invoice_document['_id']}") self.process_invoice(invoice_document) except PyMongoError as e: print(f"Change stream error: {str(e)}") # Start the change stream listener in a separate thread listener_thread = threading.Thread(target=listen, daemon=True) listener_thread.start()
Pour faciliter les choses, j'ajoute process_invoice dans la classe MongoDatabase. Mais tu devrais le laisser ailleurs
L'API de téléchargement doit être comme celle d'origine.
mongo_db = MongoDatabase(config_path='path_to_your_config.yaml') mongo_db.start_change_stream_listener() @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() # same code as before
Résumé du flux :
Avec les flux de modifications MongoDB, vous pouvez traiter efficacement les modifications en temps réel dans votre base de données. En étendant cet exemple, vous pouvez gérer divers événements tels que les mises à jour et les suppressions, rendant votre application plus réactive et réactive.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!