


Introduction
Les flux de modifications dans MongoDB permettent à votre application de réagir instantanément aux modifications de données en temps réel. Dans cet article de blog, je vais vous montrer comment configurer et utiliser des flux de modifications avec Python, sans trop plonger dans la théorie. Nous allons créer un programme simple qui écoute les événements de la base de données, en nous concentrant d'abord sur les insertions, puis en l'étendant à d'autres types d'événements.
Premiers pas avec les flux de modifications
Les flux de modifications permettent à votre application d'écouter des événements de base de données spécifiques, tels que des insertions ou des mises à jour, et de répondre immédiatement. Imaginez un scénario dans lequel un utilisateur met à jour son profil ; avec les flux de modifications, vous pouvez refléter instantanément ce changement dans votre application sans que l'utilisateur ait besoin d'actualiser la page. Avant cette fonctionnalité, vous deviez constamment interroger la base de données ou utiliser des méthodes complexes comme suivre MongoDB Oplog. Les flux de modifications simplifient cela en fournissant une API plus conviviale.
Que se passe-t-il sans flux de changement
Disons que j'ai une API pour télécharger des factures. Le flux est que les clients téléchargent une image de la facture sur MongoDB, puis nous extrayons les informations avec l'IA et mettons à jour la facture. Voici un exemple de code pour télécharger une facture :
from pymongo import MongoClient class MongoDatabase: def __init__(self, config_path: str): # Load the YAML configuration file using the provided utility function self.config_path = config_path self.config = read_config(path=self.config_path) # Initialize MongoDB connection self.client = MongoClient(self.config['mongodb']['uri']) self.db = self.client[self.config['mongodb']['database']] self.collection = self.db[self.config['mongodb']['collection']] def create_document(self, data: Dict[str, Any]) -> str: # Insert a new document and return the automatically generated document ID result = self.collection.insert_one(data) return str(result.inserted_id) def update_document_by_id(self, document_id: str, data: Dict[str, Any]): try: self.collection.update_one({"_id": document_id}, {"$set": data}) except PyMongoError as e: print(f"Error updating document: {e}")
Je vais d'abord envelopper le pymongo dans une classe, juste au cas où :))
@app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) # Generate invoice UUID current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "last_modified_at": None, "last_modified_by": None, "status": "not extracted", "invoice_image_base64": img, "invoice_info": {} } invoice_uuid = mongo_db.create_document(invoice_document) print('Result saved to MongoDB:', invoice_uuid) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) return JSONResponse( status_code=status.HTTP_201_CREATED, content={"invoice_uuid": invoice_uuid, "message": "Upload successful"} ) except Exception as e: # Handle errors return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
Une question raisonnable pourrait être : pourquoi ne pas attendre que le modèle d'IA traite l'image avant de la mettre à jour ? Le problème est que le traitement prend environ 4 à 5 minutes et nous ne voulons pas affecter l'expérience utilisateur.
Et Kafka ?
Une autre option pourrait être d'utiliser Kafka. Nous pourrions publier l'image sur un sujet Kafka et un autre service traiterait les données.
Avantages :
- Dissocie les services de téléchargement et de traitement.
- Efficace pour le traitement de données à grande échelle en temps réel.
- Expérience utilisateur améliorée : les utilisateurs reçoivent une réponse immédiate après avoir téléchargé la facture. Le traitement est géré de manière asynchrone.
Inconvénients :
- Introduit une complexité supplémentaire.
- Nécessite la configuration et la maintenance de l'infrastructure Kafka.
- Peut être excessif pour les applications à petite échelle.
Voici une implémentation de base pour démontrer l'utilisation de Kafka pour gérer le processus de téléchargement des factures.
L'utilisateur télécharge une facture via un point de terminaison API. L'image de la facture est enregistrée dans MongoDB et un message est envoyé à un sujet Kafka pour un traitement ultérieur.
from kafka import KafkaProducer import json from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse from datetime import datetime, timezone app = FastAPI() producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "status": "not extracted", "invoice_image_base64": img, } # Save the document to MongoDB invoice_uuid = mongo_db.create_document(invoice_document) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) # Send a message to Kafka topic producer.send('invoice_topic', invoice_document) producer.flush() return JSONResponse( status_code=status.HTTP_201_CREATED, content={"message": "Invoice upload received and will be processed"} ) except Exception as e: return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
Le consommateur Kafka écoute le bill_topic. Lorsqu'il reçoit un message, il traite la facture (par exemple en extrayant les informations de l'image) et met à jour le document correspondant dans MongoDB.
from kafka import KafkaConsumer import json consumer = KafkaConsumer( 'invoice_topic', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) for message in consumer: invoice_document = message.value # Process the invoice, extract information, and update the document in MongoDB invoice_uuid = invoice_document["_id"] extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) mongo_db.update_document_by_id(invoice_uuid, { "invoice_info": extracted_data, "status": "extracted" }) print(f"Processed and updated invoice: {invoice_uuid}")
Résumé du flux :
- Télécharger la facture : L'utilisateur télécharge une facture via l'API.
- Enregistrer dans MongoDB : Le document de facturation est enregistré dans MongoDB.
- Envoyer un message à Kafka : Un message contenant les détails de la facture est envoyé à un sujet Kafka (invoice_topic).
- Kafka Consumer Processes Invoice : Un consommateur Kafka écoute bill_topic, traite la facture et met à jour le document correspondant dans MongoDB avec les informations extraites.
Wow, je n'arrive pas à croire que j'ai réussi à écrire ça tout seul ! Cela met vraiment en valeur l’effort impliqué. Et cela sans même tenir compte de la complexité de la gestion et de la configuration des trois services : MongoDB, Kafka et le service Invoice.
Traitement des factures avec MongoDB Change Streams
Voici le code complet réécrit dans Markdown pour démontrer les flux de modifications MongoDB, y compris des méthodes et fonctions supplémentaires pour gérer le traitement des factures déclenché par le flux de modifications.
Nous allons commencer par créer une classe wrapper MongoDB qui gère les opérations de base de données telles que la création de documents et l'écoute des flux de modifications.
from pymongo import MongoClient from pymongo.errors import PyMongoError from typing import Dict, Any import threading import yaml class MongoDatabase: # Same code as before # def process_invoice(self, invoice_document: Dict[str, Any]): """Process the invoice by extracting data and updating the document in MongoDB.""" try: # Simulate extracting information from the invoice image extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) invoice_uuid = invoice_document["_id"] # Update the invoice document with the extracted data self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"}) print(f"Processed and updated invoice: {invoice_uuid}") except Exception as e: print(f"Error processing invoice: {str(e)}") def start_change_stream_listener(self): """Start listening to the change stream for the collection.""" def listen(): try: with self.collection.watch() as stream: for change in stream: if change['operationType'] == 'insert': invoice_document = change['fullDocument'] print(f"New invoice detected: {invoice_document['_id']}") self.process_invoice(invoice_document) except PyMongoError as e: print(f"Change stream error: {str(e)}") # Start the change stream listener in a separate thread listener_thread = threading.Thread(target=listen, daemon=True) listener_thread.start()
Pour faciliter les choses, j'ajoute process_invoice dans la classe MongoDatabase. Mais tu devrais le laisser ailleurs
L'API de téléchargement doit être comme celle d'origine.
mongo_db = MongoDatabase(config_path='path_to_your_config.yaml') mongo_db.start_change_stream_listener() @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() # same code as before
Résumé du flux :
- L'utilisateur télécharge une facture : L'utilisateur télécharge une facture via l'API.
- Enregistrer dans MongoDB : Le document de facturation est enregistré dans MongoDB.
- MongoDB Change Stream Déclenché : Le flux de modifications MongoDB détecte l'insertion du nouveau document.
- Traitement des factures : Le flux de modifications déclenche la fonction process_invoice, qui traite la facture et met à jour le document dans MongoDB avec les informations extraites.
Conclusion
Avec les flux de modifications MongoDB, vous pouvez traiter efficacement les modifications en temps réel dans votre base de données. En étendant cet exemple, vous pouvez gérer divers événements tels que les mises à jour et les suppressions, rendant votre application plus réactive et réactive.
Référence:
- https://www.mongodb.com/developer/linguals/python/python-change-streams/#listen-to-inserts-from-an-application
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Créer des tableaux multidimensionnels avec Numpy peut être réalisé via les étapes suivantes: 1) Utilisez la fonction numpy.array () pour créer un tableau, tel que np.array ([[1,2,3], [4,5,6]]) pour créer un tableau 2D; 2) utiliser np.zeros (), np.ones (), np.random.random () et d'autres fonctions pour créer un tableau rempli de valeurs spécifiques; 3) Comprendre les propriétés de forme et de taille du tableau pour vous assurer que la longueur du sous-réseau est cohérente et éviter les erreurs; 4) Utilisez la fonction NP.Reshape () pour modifier la forme du tableau; 5) Faites attention à l'utilisation de la mémoire pour vous assurer que le code est clair et efficace.

BroadcastingInNumpyIsAmethodToperformOperations OnerwaysofdifferentShapesByAutomAticalAligningThem.itImplienScode, améliore la réadabilité et BoostsTerformance.He'showitwork

Forpythondatastorage, chooseListsforflexibilitywithMixedDatatyS, array.Arrayformmemory-efficienthomogeneousnumericalData, andNumpyArraysforaSvancedNumericalComputing.ListaSaRaySatilebutless

PythonlistsArebetterThanArraysformMagingDiversEDATATYPES.1) ListScan HoldingElementoSoFferentTypes, 2) Ils ont été aaredamique, permettant à la manière dont 4) ils ne sont pas entièrement efficaces et les opérations sont en train de les affirmer.

ToaccesElementsInapythonArray, useIndexing: my_array [2] AccessEstheThirdElement, returning3.pythonusZero-basedIndexing.

L'article discute de l'impossibilité de la compréhension des tuples dans Python en raison de l'ambiguïté de la syntaxe. Des alternatives comme l'utilisation de Tuple () avec des expressions de générateur sont suggérées pour créer efficacement les tuples. (159 caractères)

L'article explique les modules et les packages dans Python, leurs différences et leur utilisation. Les modules sont des fichiers uniques, tandis que les packages sont des répertoires avec un fichier __init__.py, organisant des modules connexes hiérarchiquement.

L'article traite des docstrings dans Python, de leur utilisation et des avantages. Problème principal: Importance des docstrings pour la documentation du code et l'accessibilité.


Outils d'IA chauds

Undresser.AI Undress
Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover
Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool
Images de déshabillage gratuites

Clothoff.io
Dissolvant de vêtements AI

Video Face Swap
Échangez les visages dans n'importe quelle vidéo sans effort grâce à notre outil d'échange de visage AI entièrement gratuit !

Article chaud

Outils chauds

Bloc-notes++7.3.1
Éditeur de code facile à utiliser et gratuit

SublimeText3 Linux nouvelle version
Dernière version de SublimeText3 Linux

Version Mac de WebStorm
Outils de développement JavaScript utiles

MinGW - GNU minimaliste pour Windows
Ce projet est en cours de migration vers osdn.net/projects/mingw, vous pouvez continuer à nous suivre là-bas. MinGW : un port Windows natif de GNU Compiler Collection (GCC), des bibliothèques d'importation et des fichiers d'en-tête librement distribuables pour la création d'applications Windows natives ; inclut des extensions du runtime MSVC pour prendre en charge la fonctionnalité C99. Tous les logiciels MinGW peuvent fonctionner sur les plates-formes Windows 64 bits.

PhpStorm version Mac
Le dernier (2018.2.1) outil de développement intégré PHP professionnel
