suchen
HeimBackend-EntwicklungPython-TutorialEchtzeit-Datenverarbeitung mit MongoDB Change Streams und Python

Real-Time Data Processing with MongoDB Change Streams and Python

pengenalan

Tukar strim dalam MongoDB membolehkan aplikasi anda bertindak balas terhadap perubahan data masa nyata serta-merta. Dalam catatan blog ini, saya akan menunjukkan kepada anda cara menyediakan dan menggunakan strim perubahan dengan Python, tanpa mendalami teori. Kami akan mencipta atur cara mudah yang mendengar acara pangkalan data, memfokuskan pada sisipan dahulu, kemudian memanjangkannya kepada jenis acara lain.

Bermula dengan Tukar Strim

Tukar strim membenarkan apl anda mendengar acara pangkalan data tertentu, seperti sisipan atau kemas kini dan bertindak balas dengan segera. Bayangkan senario di mana pengguna mengemas kini profil mereka; dengan strim perubahan, anda boleh mencerminkan serta-merta perubahan ini merentas apl anda tanpa memerlukan pengguna memuat semula halaman. Sebelum ciri ini, anda perlu sentiasa meninjau pangkalan data atau menggunakan kaedah yang kompleks seperti mengikuti Oplog MongoDB. Tukar strim memudahkan perkara ini dengan menyediakan API yang lebih mesra pengguna.

Apa yang Berlaku Tanpa Perubahan Strim

Katakan saya mempunyai API untuk memuat naik invois. Alirannya ialah pelanggan akan memuat naik imej invois ke MongoDB, kemudian kami mengekstrak maklumat dengan AI dan mengemas kini invois. Berikut ialah contoh kod untuk memuat naik invois:

from pymongo import MongoClient

class MongoDatabase:
    def __init__(self, config_path: str):
        # Load the YAML configuration file using the provided utility function
        self.config_path = config_path
        self.config = read_config(path=self.config_path)

        # Initialize MongoDB connection
        self.client = MongoClient(self.config['mongodb']['uri'])
        self.db = self.client[self.config['mongodb']['database']]
        self.collection = self.db[self.config['mongodb']['collection']]

    def create_document(self, data: Dict[str, Any]) -> str:
        # Insert a new document and return the automatically generated document ID
        result = self.collection.insert_one(data)
        return str(result.inserted_id)
    def update_document_by_id(self, document_id: str, data: Dict[str, Any]):
        try:
            self.collection.update_one({"_id": document_id}, {"$set": data})
        except PyMongoError as e:
            print(f"Error updating document: {e}")

Mula-mula saya akan membungkus pymongo di dalam kelas, untuk berjaga-jaga :))

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        # Generate invoice UUID
        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "last_modified_at": None,
            "last_modified_by": None,
            "status": "not extracted",
            "invoice_image_base64": img,
            "invoice_info": {}
        }

        invoice_uuid = mongo_db.create_document(invoice_document)
        print('Result saved to MongoDB:', invoice_uuid)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"invoice_uuid": invoice_uuid, "message": "Upload successful"}
        )
    except Exception as e:
        # Handle errors
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )

Persoalan munasabah mungkin: mengapa tidak menunggu sehingga model AI memproses imej sebelum mengemas kini? Masalahnya ialah proses itu mengambil masa sekitar 4-5 minit dan kami tidak mahu menjejaskan pengalaman pengguna.

Bagaimana dengan Kafka?

Pilihan lain boleh menggunakan Kafka. Kami boleh menerbitkan imej ke topik Kafka, dan perkhidmatan lain akan memproses data.

Kebaikan:

  • Mengasingkan perkhidmatan muat naik dan pemprosesan.
  • Cekap untuk pemprosesan data masa nyata berskala besar.
  • Pengalaman pengguna yang dipertingkatkan: Pengguna mendapat respons segera selepas memuat naik invois. Pemprosesan dikendalikan secara tidak segerak.

Keburukan:

  • Memperkenalkan kerumitan tambahan.
  • Memerlukan persediaan dan penyelenggaraan infrastruktur Kafka.
  • Mungkin berlebihan untuk aplikasi berskala kecil.

Berikut ialah pelaksanaan asas untuk menunjukkan penggunaan Kafka untuk mengendalikan proses muat naik invois.

Pengguna memuat naik invois melalui titik akhir API. Imej invois disimpan dalam MongoDB dan mesej dihantar ke topik Kafka untuk diproses selanjutnya.

from kafka import KafkaProducer
import json
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from datetime import datetime, timezone

app = FastAPI()

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "status": "not extracted",
            "invoice_image_base64": img,
        }

        # Save the document to MongoDB
        invoice_uuid = mongo_db.create_document(invoice_document)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        # Send a message to Kafka topic
        producer.send('invoice_topic', invoice_document)
        producer.flush()

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"message": "Invoice upload received and will be processed"}
        )
    except Exception as e:
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )

Pengguna Kafka mendengar topik_invois. Apabila ia menerima mesej, ia memproses invois (cth., mengekstrak maklumat daripada imej) dan mengemas kini dokumen yang sepadan dalam MongoDB.

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'invoice_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    invoice_document = message.value

    # Process the invoice, extract information, and update the document in MongoDB
    invoice_uuid = invoice_document["_id"]
    extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])

    mongo_db.update_document_by_id(invoice_uuid, {
        "invoice_info": extracted_data, 
        "status": "extracted"
    })

    print(f"Processed and updated invoice: {invoice_uuid}")

Ringkasan Aliran:

  1. Muat Naik Invois: Pengguna memuat naik invois melalui API.
  2. Simpan ke MongoDB: Dokumen invois disimpan dalam MongoDB.
  3. Hantar Mesej kepada Kafka: Mesej yang mengandungi butiran invois dihantar ke topik Kafka (topik_invois).
  4. Invois Proses Pengguna Kafka: Pengguna Kafka mendengar invois_topic, memproses invois dan mengemas kini dokumen yang sepadan dalam MongoDB dengan maklumat yang diekstrak.

Wah, saya tidak percaya saya berjaya menulis ini sendiri! Ia benar-benar menyerlahkan usaha yang terlibat. Dan itu tidak mengambil kira kerumitan mengurus dan mengkonfigurasi tiga perkhidmatan: MongoDB, Kafka dan perkhidmatan Invois.

Pemprosesan Invois dengan MongoDB Change Streams

Berikut ialah kod lengkap yang ditulis semula dalam Markdown untuk menunjukkan aliran perubahan MongoDB, termasuk kaedah dan fungsi tambahan untuk mengendalikan pemprosesan invois yang dicetuskan oleh aliran perubahan.

Kami akan mulakan dengan mencipta kelas pembalut MongoDB yang mengendalikan operasi pangkalan data seperti mencipta dokumen dan mendengar menukar strim.

from pymongo import MongoClient
from pymongo.errors import PyMongoError
from typing import Dict, Any
import threading
import yaml

class MongoDatabase:
    # Same code as before #

    def process_invoice(self, invoice_document: Dict[str, Any]):
        """Process the invoice by extracting data and updating the document in MongoDB."""
        try:
            # Simulate extracting information from the invoice image
            extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
            invoice_uuid = invoice_document["_id"]

            # Update the invoice document with the extracted data
            self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"})
            print(f"Processed and updated invoice: {invoice_uuid}")
        except Exception as e:
            print(f"Error processing invoice: {str(e)}")

    def start_change_stream_listener(self):
        """Start listening to the change stream for the collection."""
        def listen():
            try:
                with self.collection.watch() as stream:
                    for change in stream:
                        if change['operationType'] == 'insert':
                            invoice_document = change['fullDocument']
                            print(f"New invoice detected: {invoice_document['_id']}")
                            self.process_invoice(invoice_document)
            except PyMongoError as e:
                print(f"Change stream error: {str(e)}")

        # Start the change stream listener in a separate thread
        listener_thread = threading.Thread(target=listen, daemon=True)
        listener_thread.start()

Untuk memudahkan saya menambah process_invoice di dalam kelas MongoDatabase. Tetapi anda harus meninggalkannya di tempat lain

API muat naik hendaklah seperti yang asal.

mongo_db = MongoDatabase(config_path='path_to_your_config.yaml')
mongo_db.start_change_stream_listener()

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json() 
        # same code as before

Ringkasan Aliran:

  1. Benutzer lädt Rechnung hoch: Der Benutzer lädt eine Rechnung über die API hoch.
  2. In MongoDB speichern: Das Rechnungsdokument wird in MongoDB gespeichert.
  3. MongoDB-Änderungsstream ausgelöst: Der MongoDB-Änderungsstream erkennt das Einfügen des neuen Dokuments.
  4. Rechnungsverarbeitung: Der Änderungsstrom löst die Funktion „process_invoice“ aus, die die Rechnung verarbeitet und das Dokument in MongoDB mit den extrahierten Informationen aktualisiert.

Abschluss

Mit MongoDB-Änderungsströmen können Sie Echtzeitänderungen in Ihrer Datenbank effizient verarbeiten. Wenn Sie dieses Beispiel erweitern, können Sie verschiedene Ereignisse wie Aktualisierungen und Löschungen verarbeiten, wodurch Ihre Anwendung reaktiver und reaktionsfähiger wird.

Referenz:

  • https://www.mongodb.com/developer/linguals/python/python-change-streams/#listen-to-inserts-from-an-application

Das obige ist der detaillierte Inhalt vonEchtzeit-Datenverarbeitung mit MongoDB Change Streams und Python. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Wie wirkt sich die Auswahl zwischen Listen und Arrays auf die Gesamtleistung einer Python -Anwendung aus, die sich mit großen Datensätzen befasst?Wie wirkt sich die Auswahl zwischen Listen und Arrays auf die Gesamtleistung einer Python -Anwendung aus, die sich mit großen Datensätzen befasst?May 03, 2025 am 12:11 AM

ForHandlinglargedatasetsinpython, Usenumpyarraysforbetterperformance.1) Numpyarraysarememory-Effiction und FasterFornumericaloperations.2) meidenunnötiger Anbieter.3) HebelVectorisationFecedTimeComplexity.4) ManagemememoryusageSageWithEffizienceDeffictureWitheseffizienz

Erklären Sie, wie das Speicher für Listen gegenüber Arrays in Python zugewiesen wird.Erklären Sie, wie das Speicher für Listen gegenüber Arrays in Python zugewiesen wird.May 03, 2025 am 12:10 AM

Inpython, listEUSUutsynamicMemoryAllocationWithover-Accocation, whilenumpyarraysalcodeFixedMemory.1) ListSallocatemoremoryThanneded intellig, vereitelte, dass die sterbliche Größe von Zeitpunkte, OfferingPredictableSageStoageStloseflexeflexibilität.

Wie geben Sie den Datentyp der Elemente in einem Python -Array an?Wie geben Sie den Datentyp der Elemente in einem Python -Array an?May 03, 2025 am 12:06 AM

Inpython, youcansspecthedatatypeyFelemeremodelerernspant.1) Usenpynernrump.1) Usenpynerp.dloatp.Ploatm64, Formor -Präzise -Preciscontrolatatypen.

Was ist Numpy und warum ist es wichtig für das numerische Computing in Python?Was ist Numpy und warum ist es wichtig für das numerische Computing in Python?May 03, 2025 am 12:03 AM

NumpyisessentialfornumericalComputingInpythonduetoitsSpeed, GedächtnisEffizienz und kompetentiertemaMatematical-Funktionen.1) ITSFACTBECAUSPERFORMATIONSOPERATIONS.2) NumpyarraysSaremoremory-Effecthonpythonlists.3) iTofferSAgyarraysAremoremory-Effizieren

Diskutieren Sie das Konzept der 'zusammenhängenden Speicherzuweisung' und seine Bedeutung für Arrays.Diskutieren Sie das Konzept der 'zusammenhängenden Speicherzuweisung' und seine Bedeutung für Arrays.May 03, 2025 am 12:01 AM

ContInuuousMemoryAllocationScrucialforAraysBecauseAltoLowsFofficy und Fastelement Access.1) iTenablesconstantTimeAccess, O (1), Duetodirectaddresscalculation.2) itimProvesefficienceByallowing -MultipleTeLementFetchesperCacheline.3) Es wird gestellt

Wie schneiden Sie eine Python -Liste?Wie schneiden Sie eine Python -Liste?May 02, 2025 am 12:14 AM

SlicingPapythonListisDoneUsingthesyntaxlist [Start: Stop: Stufe] .here'Showitworks: 1) StartIndexoFtheFirstelementtoinclude.2) stopiStheIndexoFtheFirstelementtoexclude.3) StepisTheincrementBetweenelesfulFulForForforexcractioningPorporionsporporionsPorporionsporporesporsporsporsporsporsporsporsporsporionsporsPorsPorsPorsPorsporsporsporsporsporsporsAntionsporsporesporesporesporsPorsPorsporsPorsPorsporsporspors,

Was sind einige gängige Operationen, die an Numpy -Arrays ausgeführt werden können?Was sind einige gängige Operationen, die an Numpy -Arrays ausgeführt werden können?May 02, 2025 am 12:09 AM

Numpyallowsforvariousoperationssonarrays: 1) BasicarithmeticliKeaddition, Subtraktion, Multiplikation und Division; 2) AdvancedoperationssuchasmatrixMultiplication;

Wie werden Arrays in der Datenanalyse mit Python verwendet?Wie werden Arrays in der Datenanalyse mit Python verwendet?May 02, 2025 am 12:09 AM

Arraysinpython, insbesondere ThroughNumpyandpandas, areessentialfordataanalyse, öfterspeedandeffizienz.1) numpyarraysenableAnalysHandlingoflargedatasets und CompompexoperationslikemovingAverages.2) Pandasextendsnumpy'ScapaBilitiesWithDaTataforsForstruc

See all articles

Heiße KI -Werkzeuge

Undresser.AI Undress

Undresser.AI Undress

KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover

AI Clothes Remover

Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Undress AI Tool

Undress AI Tool

Ausziehbilder kostenlos

Clothoff.io

Clothoff.io

KI-Kleiderentferner

Video Face Swap

Video Face Swap

Tauschen Sie Gesichter in jedem Video mühelos mit unserem völlig kostenlosen KI-Gesichtstausch-Tool aus!

Heiße Werkzeuge

Dreamweaver CS6

Dreamweaver CS6

Visuelle Webentwicklungstools

SublimeText3 Englische Version

SublimeText3 Englische Version

Empfohlen: Win-Version, unterstützt Code-Eingabeaufforderungen!

SublimeText3 chinesische Version

SublimeText3 chinesische Version

Chinesische Version, sehr einfach zu bedienen

WebStorm-Mac-Version

WebStorm-Mac-Version

Nützliche JavaScript-Entwicklungstools

VSCode Windows 64-Bit-Download

VSCode Windows 64-Bit-Download

Ein kostenloser und leistungsstarker IDE-Editor von Microsoft