suchen
HeimBackend-EntwicklungPython-TutorialEchtzeit-Datenverarbeitung mit MongoDB Change Streams und Python

Real-Time Data Processing with MongoDB Change Streams and Python

pengenalan

Tukar strim dalam MongoDB membolehkan aplikasi anda bertindak balas terhadap perubahan data masa nyata serta-merta. Dalam catatan blog ini, saya akan menunjukkan kepada anda cara menyediakan dan menggunakan strim perubahan dengan Python, tanpa mendalami teori. Kami akan mencipta atur cara mudah yang mendengar acara pangkalan data, memfokuskan pada sisipan dahulu, kemudian memanjangkannya kepada jenis acara lain.

Bermula dengan Tukar Strim

Tukar strim membenarkan apl anda mendengar acara pangkalan data tertentu, seperti sisipan atau kemas kini dan bertindak balas dengan segera. Bayangkan senario di mana pengguna mengemas kini profil mereka; dengan strim perubahan, anda boleh mencerminkan serta-merta perubahan ini merentas apl anda tanpa memerlukan pengguna memuat semula halaman. Sebelum ciri ini, anda perlu sentiasa meninjau pangkalan data atau menggunakan kaedah yang kompleks seperti mengikuti Oplog MongoDB. Tukar strim memudahkan perkara ini dengan menyediakan API yang lebih mesra pengguna.

Apa yang Berlaku Tanpa Perubahan Strim

Katakan saya mempunyai API untuk memuat naik invois. Alirannya ialah pelanggan akan memuat naik imej invois ke MongoDB, kemudian kami mengekstrak maklumat dengan AI dan mengemas kini invois. Berikut ialah contoh kod untuk memuat naik invois:

from pymongo import MongoClient

class MongoDatabase:
    def __init__(self, config_path: str):
        # Load the YAML configuration file using the provided utility function
        self.config_path = config_path
        self.config = read_config(path=self.config_path)

        # Initialize MongoDB connection
        self.client = MongoClient(self.config['mongodb']['uri'])
        self.db = self.client[self.config['mongodb']['database']]
        self.collection = self.db[self.config['mongodb']['collection']]

    def create_document(self, data: Dict[str, Any]) -> str:
        # Insert a new document and return the automatically generated document ID
        result = self.collection.insert_one(data)
        return str(result.inserted_id)
    def update_document_by_id(self, document_id: str, data: Dict[str, Any]):
        try:
            self.collection.update_one({"_id": document_id}, {"$set": data})
        except PyMongoError as e:
            print(f"Error updating document: {e}")

Mula-mula saya akan membungkus pymongo di dalam kelas, untuk berjaga-jaga :))

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        # Generate invoice UUID
        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "last_modified_at": None,
            "last_modified_by": None,
            "status": "not extracted",
            "invoice_image_base64": img,
            "invoice_info": {}
        }

        invoice_uuid = mongo_db.create_document(invoice_document)
        print('Result saved to MongoDB:', invoice_uuid)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"invoice_uuid": invoice_uuid, "message": "Upload successful"}
        )
    except Exception as e:
        # Handle errors
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )

Persoalan munasabah mungkin: mengapa tidak menunggu sehingga model AI memproses imej sebelum mengemas kini? Masalahnya ialah proses itu mengambil masa sekitar 4-5 minit dan kami tidak mahu menjejaskan pengalaman pengguna.

Bagaimana dengan Kafka?

Pilihan lain boleh menggunakan Kafka. Kami boleh menerbitkan imej ke topik Kafka, dan perkhidmatan lain akan memproses data.

Kebaikan:

  • Mengasingkan perkhidmatan muat naik dan pemprosesan.
  • Cekap untuk pemprosesan data masa nyata berskala besar.
  • Pengalaman pengguna yang dipertingkatkan: Pengguna mendapat respons segera selepas memuat naik invois. Pemprosesan dikendalikan secara tidak segerak.

Keburukan:

  • Memperkenalkan kerumitan tambahan.
  • Memerlukan persediaan dan penyelenggaraan infrastruktur Kafka.
  • Mungkin berlebihan untuk aplikasi berskala kecil.

Berikut ialah pelaksanaan asas untuk menunjukkan penggunaan Kafka untuk mengendalikan proses muat naik invois.

Pengguna memuat naik invois melalui titik akhir API. Imej invois disimpan dalam MongoDB dan mesej dihantar ke topik Kafka untuk diproses selanjutnya.

from kafka import KafkaProducer
import json
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from datetime import datetime, timezone

app = FastAPI()

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "status": "not extracted",
            "invoice_image_base64": img,
        }

        # Save the document to MongoDB
        invoice_uuid = mongo_db.create_document(invoice_document)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        # Send a message to Kafka topic
        producer.send('invoice_topic', invoice_document)
        producer.flush()

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"message": "Invoice upload received and will be processed"}
        )
    except Exception as e:
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )

Pengguna Kafka mendengar topik_invois. Apabila ia menerima mesej, ia memproses invois (cth., mengekstrak maklumat daripada imej) dan mengemas kini dokumen yang sepadan dalam MongoDB.

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'invoice_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    invoice_document = message.value

    # Process the invoice, extract information, and update the document in MongoDB
    invoice_uuid = invoice_document["_id"]
    extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])

    mongo_db.update_document_by_id(invoice_uuid, {
        "invoice_info": extracted_data, 
        "status": "extracted"
    })

    print(f"Processed and updated invoice: {invoice_uuid}")

Ringkasan Aliran:

  1. Muat Naik Invois: Pengguna memuat naik invois melalui API.
  2. Simpan ke MongoDB: Dokumen invois disimpan dalam MongoDB.
  3. Hantar Mesej kepada Kafka: Mesej yang mengandungi butiran invois dihantar ke topik Kafka (topik_invois).
  4. Invois Proses Pengguna Kafka: Pengguna Kafka mendengar invois_topic, memproses invois dan mengemas kini dokumen yang sepadan dalam MongoDB dengan maklumat yang diekstrak.

Wah, saya tidak percaya saya berjaya menulis ini sendiri! Ia benar-benar menyerlahkan usaha yang terlibat. Dan itu tidak mengambil kira kerumitan mengurus dan mengkonfigurasi tiga perkhidmatan: MongoDB, Kafka dan perkhidmatan Invois.

Pemprosesan Invois dengan MongoDB Change Streams

Berikut ialah kod lengkap yang ditulis semula dalam Markdown untuk menunjukkan aliran perubahan MongoDB, termasuk kaedah dan fungsi tambahan untuk mengendalikan pemprosesan invois yang dicetuskan oleh aliran perubahan.

Kami akan mulakan dengan mencipta kelas pembalut MongoDB yang mengendalikan operasi pangkalan data seperti mencipta dokumen dan mendengar menukar strim.

from pymongo import MongoClient
from pymongo.errors import PyMongoError
from typing import Dict, Any
import threading
import yaml

class MongoDatabase:
    # Same code as before #

    def process_invoice(self, invoice_document: Dict[str, Any]):
        """Process the invoice by extracting data and updating the document in MongoDB."""
        try:
            # Simulate extracting information from the invoice image
            extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
            invoice_uuid = invoice_document["_id"]

            # Update the invoice document with the extracted data
            self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"})
            print(f"Processed and updated invoice: {invoice_uuid}")
        except Exception as e:
            print(f"Error processing invoice: {str(e)}")

    def start_change_stream_listener(self):
        """Start listening to the change stream for the collection."""
        def listen():
            try:
                with self.collection.watch() as stream:
                    for change in stream:
                        if change['operationType'] == 'insert':
                            invoice_document = change['fullDocument']
                            print(f"New invoice detected: {invoice_document['_id']}")
                            self.process_invoice(invoice_document)
            except PyMongoError as e:
                print(f"Change stream error: {str(e)}")

        # Start the change stream listener in a separate thread
        listener_thread = threading.Thread(target=listen, daemon=True)
        listener_thread.start()

Untuk memudahkan saya menambah process_invoice di dalam kelas MongoDatabase. Tetapi anda harus meninggalkannya di tempat lain

API muat naik hendaklah seperti yang asal.

mongo_db = MongoDatabase(config_path='path_to_your_config.yaml')
mongo_db.start_change_stream_listener()

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json() 
        # same code as before

Ringkasan Aliran:

  1. Benutzer lädt Rechnung hoch: Der Benutzer lädt eine Rechnung über die API hoch.
  2. In MongoDB speichern: Das Rechnungsdokument wird in MongoDB gespeichert.
  3. MongoDB-Änderungsstream ausgelöst: Der MongoDB-Änderungsstream erkennt das Einfügen des neuen Dokuments.
  4. Rechnungsverarbeitung: Der Änderungsstrom löst die Funktion „process_invoice“ aus, die die Rechnung verarbeitet und das Dokument in MongoDB mit den extrahierten Informationen aktualisiert.

Abschluss

Mit MongoDB-Änderungsströmen können Sie Echtzeitänderungen in Ihrer Datenbank effizient verarbeiten. Wenn Sie dieses Beispiel erweitern, können Sie verschiedene Ereignisse wie Aktualisierungen und Löschungen verarbeiten, wodurch Ihre Anwendung reaktiver und reaktionsfähiger wird.

Referenz:

  • https://www.mongodb.com/developer/linguals/python/python-change-streams/#listen-to-inserts-from-an-application

Das obige ist der detaillierte Inhalt vonEchtzeit-Datenverarbeitung mit MongoDB Change Streams und Python. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Der 2-stündige Python-Plan: ein realistischer AnsatzDer 2-stündige Python-Plan: ein realistischer AnsatzApr 11, 2025 am 12:04 AM

Sie können grundlegende Programmierkonzepte und Fähigkeiten von Python innerhalb von 2 Stunden lernen. 1. Lernen Sie Variablen und Datentypen, 2. Master Control Flow (bedingte Anweisungen und Schleifen), 3.. Verstehen Sie die Definition und Verwendung von Funktionen, 4. Beginnen Sie schnell mit der Python -Programmierung durch einfache Beispiele und Code -Snippets.

Python: Erforschen der primären AnwendungenPython: Erforschen der primären AnwendungenApr 10, 2025 am 09:41 AM

Python wird in den Bereichen Webentwicklung, Datenwissenschaft, maschinelles Lernen, Automatisierung und Skripten häufig verwendet. 1) In der Webentwicklung vereinfachen Django und Flask Frameworks den Entwicklungsprozess. 2) In den Bereichen Datenwissenschaft und maschinelles Lernen bieten Numpy-, Pandas-, Scikit-Learn- und TensorFlow-Bibliotheken eine starke Unterstützung. 3) In Bezug auf Automatisierung und Skript ist Python für Aufgaben wie automatisiertes Test und Systemmanagement geeignet.

Wie viel Python können Sie in 2 Stunden lernen?Wie viel Python können Sie in 2 Stunden lernen?Apr 09, 2025 pm 04:33 PM

Sie können die Grundlagen von Python innerhalb von zwei Stunden lernen. 1. Lernen Sie Variablen und Datentypen, 2. Master -Steuerungsstrukturen wie wenn Aussagen und Schleifen, 3. Verstehen Sie die Definition und Verwendung von Funktionen. Diese werden Ihnen helfen, einfache Python -Programme zu schreiben.

Wie lehre ich innerhalb von 10 Stunden die Grundlagen für Computer-Anfänger-Programmierbasis in Projekt- und problemorientierten Methoden?Wie lehre ich innerhalb von 10 Stunden die Grundlagen für Computer-Anfänger-Programmierbasis in Projekt- und problemorientierten Methoden?Apr 02, 2025 am 07:18 AM

Wie lehre ich innerhalb von 10 Stunden die Grundlagen für Computer -Anfänger für Programmierungen? Wenn Sie nur 10 Stunden Zeit haben, um Computer -Anfänger zu unterrichten, was Sie mit Programmierkenntnissen unterrichten möchten, was würden Sie dann beibringen ...

Wie kann man vom Browser vermeiden, wenn man überall Fiddler für das Lesen des Menschen in der Mitte verwendet?Wie kann man vom Browser vermeiden, wenn man überall Fiddler für das Lesen des Menschen in der Mitte verwendet?Apr 02, 2025 am 07:15 AM

Wie kann man nicht erkannt werden, wenn Sie Fiddlereverywhere für Man-in-the-Middle-Lesungen verwenden, wenn Sie FiddLereverywhere verwenden ...

Was soll ich tun, wenn das Modul '__builtin__' beim Laden der Gurkendatei in Python 3.6 nicht gefunden wird?Was soll ich tun, wenn das Modul '__builtin__' beim Laden der Gurkendatei in Python 3.6 nicht gefunden wird?Apr 02, 2025 am 07:12 AM

Laden Sie Gurkendateien in Python 3.6 Umgebungsbericht Fehler: ModulenotFoundError: Nomodulennamen ...

Wie verbessert man die Genauigkeit der Jiebeba -Wortsegmentierung in der malerischen Spot -Kommentaranalyse?Wie verbessert man die Genauigkeit der Jiebeba -Wortsegmentierung in der malerischen Spot -Kommentaranalyse?Apr 02, 2025 am 07:09 AM

Wie löste ich das Problem der Jiebeba -Wortsegmentierung in der malerischen Spot -Kommentaranalyse? Wenn wir malerische Spot -Kommentare und -analysen durchführen, verwenden wir häufig das Jieba -Word -Segmentierungstool, um den Text zu verarbeiten ...

Wie benutze ich den regulären Ausdruck, um das erste geschlossene Tag zu entsprechen und anzuhalten?Wie benutze ich den regulären Ausdruck, um das erste geschlossene Tag zu entsprechen und anzuhalten?Apr 02, 2025 am 07:06 AM

Wie benutze ich den regulären Ausdruck, um das erste geschlossene Tag zu entsprechen und anzuhalten? Im Umgang mit HTML oder anderen Markup -Sprachen sind häufig regelmäßige Ausdrücke erforderlich, um ...

See all articles

Heiße KI -Werkzeuge

Undresser.AI Undress

Undresser.AI Undress

KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover

AI Clothes Remover

Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Undress AI Tool

Undress AI Tool

Ausziehbilder kostenlos

Clothoff.io

Clothoff.io

KI-Kleiderentferner

AI Hentai Generator

AI Hentai Generator

Erstellen Sie kostenlos Ai Hentai.

Heißer Artikel

R.E.P.O. Energiekristalle erklärten und was sie tun (gelber Kristall)
3 Wochen vorBy尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Beste grafische Einstellungen
3 Wochen vorBy尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. So reparieren Sie Audio, wenn Sie niemanden hören können
3 Wochen vorBy尊渡假赌尊渡假赌尊渡假赌
WWE 2K25: Wie man alles in Myrise freischaltet
3 Wochen vorBy尊渡假赌尊渡假赌尊渡假赌

Heiße Werkzeuge

VSCode Windows 64-Bit-Download

VSCode Windows 64-Bit-Download

Ein kostenloser und leistungsstarker IDE-Editor von Microsoft

SublimeText3 Englische Version

SublimeText3 Englische Version

Empfohlen: Win-Version, unterstützt Code-Eingabeaufforderungen!

Senden Sie Studio 13.0.1

Senden Sie Studio 13.0.1

Leistungsstarke integrierte PHP-Entwicklungsumgebung

mPDF

mPDF

mPDF ist eine PHP-Bibliothek, die PDF-Dateien aus UTF-8-codiertem HTML generieren kann. Der ursprüngliche Autor, Ian Back, hat mPDF geschrieben, um PDF-Dateien „on the fly“ von seiner Website auszugeben und verschiedene Sprachen zu verarbeiten. Es ist langsamer und erzeugt bei der Verwendung von Unicode-Schriftarten größere Dateien als Originalskripte wie HTML2FPDF, unterstützt aber CSS-Stile usw. und verfügt über viele Verbesserungen. Unterstützt fast alle Sprachen, einschließlich RTL (Arabisch und Hebräisch) und CJK (Chinesisch, Japanisch und Koreanisch). Unterstützt verschachtelte Elemente auf Blockebene (wie P, DIV),

SublimeText3 Mac-Version

SublimeText3 Mac-Version

Codebearbeitungssoftware auf Gottesniveau (SublimeText3)