Rumah  >  Artikel  >  pembangunan bahagian belakang  >  Pemprosesan Data Masa Nyata dengan MongoDB Change Streams dan Python

Pemprosesan Data Masa Nyata dengan MongoDB Change Streams dan Python

PHPz
PHPzasal
2024-09-12 16:15:12719semak imbas

Real-Time Data Processing with MongoDB Change Streams and Python

pengenalan

Tukar strim dalam MongoDB membolehkan aplikasi anda bertindak balas terhadap perubahan data masa nyata serta-merta. Dalam catatan blog ini, saya akan menunjukkan kepada anda cara menyediakan dan menggunakan strim perubahan dengan Python, tanpa mendalami teori. Kami akan mencipta atur cara mudah yang mendengar acara pangkalan data, memfokuskan pada sisipan dahulu, kemudian memanjangkannya kepada jenis acara lain.

Bermula dengan Tukar Strim

Tukar strim membenarkan apl anda mendengar acara pangkalan data tertentu, seperti sisipan atau kemas kini dan bertindak balas dengan segera. Bayangkan senario di mana pengguna mengemas kini profil mereka; dengan strim perubahan, anda boleh mencerminkan serta-merta perubahan ini merentas apl anda tanpa memerlukan pengguna memuat semula halaman. Sebelum ciri ini, anda perlu sentiasa meninjau pangkalan data atau menggunakan kaedah yang kompleks seperti mengikuti Oplog MongoDB. Tukar strim memudahkan perkara ini dengan menyediakan API yang lebih mesra pengguna.

Apa yang Berlaku Tanpa Perubahan Strim

Katakan saya mempunyai API untuk memuat naik invois. Alirannya ialah pelanggan akan memuat naik imej invois ke MongoDB, kemudian kami mengekstrak maklumat dengan AI dan mengemas kini invois. Berikut ialah contoh kod untuk memuat naik invois:

from pymongo import MongoClient

class MongoDatabase:
    def __init__(self, config_path: str):
        # Load the YAML configuration file using the provided utility function
        self.config_path = config_path
        self.config = read_config(path=self.config_path)

        # Initialize MongoDB connection
        self.client = MongoClient(self.config['mongodb']['uri'])
        self.db = self.client[self.config['mongodb']['database']]
        self.collection = self.db[self.config['mongodb']['collection']]

    def create_document(self, data: Dict[str, Any]) -> str:
        # Insert a new document and return the automatically generated document ID
        result = self.collection.insert_one(data)
        return str(result.inserted_id)
    def update_document_by_id(self, document_id: str, data: Dict[str, Any]):
        try:
            self.collection.update_one({"_id": document_id}, {"$set": data})
        except PyMongoError as e:
            print(f"Error updating document: {e}")

Mula-mula saya akan membungkus pymongo di dalam kelas, untuk berjaga-jaga :))

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        # Generate invoice UUID
        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "last_modified_at": None,
            "last_modified_by": None,
            "status": "not extracted",
            "invoice_image_base64": img,
            "invoice_info": {}
        }

        invoice_uuid = mongo_db.create_document(invoice_document)
        print('Result saved to MongoDB:', invoice_uuid)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"invoice_uuid": invoice_uuid, "message": "Upload successful"}
        )
    except Exception as e:
        # Handle errors
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )

Persoalan munasabah mungkin: mengapa tidak menunggu sehingga model AI memproses imej sebelum mengemas kini? Masalahnya ialah proses itu mengambil masa sekitar 4-5 minit dan kami tidak mahu menjejaskan pengalaman pengguna.

Bagaimana dengan Kafka?

Pilihan lain boleh menggunakan Kafka. Kami boleh menerbitkan imej ke topik Kafka, dan perkhidmatan lain akan memproses data.

Kebaikan:

  • Mengasingkan perkhidmatan muat naik dan pemprosesan.
  • Cekap untuk pemprosesan data masa nyata berskala besar.
  • Pengalaman pengguna yang dipertingkatkan: Pengguna mendapat respons segera selepas memuat naik invois. Pemprosesan dikendalikan secara tidak segerak.

Keburukan:

  • Memperkenalkan kerumitan tambahan.
  • Memerlukan persediaan dan penyelenggaraan infrastruktur Kafka.
  • Mungkin berlebihan untuk aplikasi berskala kecil.

Berikut ialah pelaksanaan asas untuk menunjukkan penggunaan Kafka untuk mengendalikan proses muat naik invois.

Pengguna memuat naik invois melalui titik akhir API. Imej invois disimpan dalam MongoDB dan mesej dihantar ke topik Kafka untuk diproses selanjutnya.

from kafka import KafkaProducer
import json
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from datetime import datetime, timezone

app = FastAPI()

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "status": "not extracted",
            "invoice_image_base64": img,
        }

        # Save the document to MongoDB
        invoice_uuid = mongo_db.create_document(invoice_document)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        # Send a message to Kafka topic
        producer.send('invoice_topic', invoice_document)
        producer.flush()

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"message": "Invoice upload received and will be processed"}
        )
    except Exception as e:
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )

Pengguna Kafka mendengar topik_invois. Apabila ia menerima mesej, ia memproses invois (cth., mengekstrak maklumat daripada imej) dan mengemas kini dokumen yang sepadan dalam MongoDB.

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'invoice_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    invoice_document = message.value

    # Process the invoice, extract information, and update the document in MongoDB
    invoice_uuid = invoice_document["_id"]
    extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])

    mongo_db.update_document_by_id(invoice_uuid, {
        "invoice_info": extracted_data, 
        "status": "extracted"
    })

    print(f"Processed and updated invoice: {invoice_uuid}")

Ringkasan Aliran:

  1. Muat Naik Invois: Pengguna memuat naik invois melalui API.
  2. Simpan ke MongoDB: Dokumen invois disimpan dalam MongoDB.
  3. Hantar Mesej kepada Kafka: Mesej yang mengandungi butiran invois dihantar ke topik Kafka (topik_invois).
  4. Invois Proses Pengguna Kafka: Pengguna Kafka mendengar invois_topic, memproses invois dan mengemas kini dokumen yang sepadan dalam MongoDB dengan maklumat yang diekstrak.

Wah, saya tidak percaya saya berjaya menulis ini sendiri! Ia benar-benar menyerlahkan usaha yang terlibat. Dan itu tidak mengambil kira kerumitan mengurus dan mengkonfigurasi tiga perkhidmatan: MongoDB, Kafka dan perkhidmatan Invois.

Pemprosesan Invois dengan MongoDB Change Streams

Berikut ialah kod lengkap yang ditulis semula dalam Markdown untuk menunjukkan aliran perubahan MongoDB, termasuk kaedah dan fungsi tambahan untuk mengendalikan pemprosesan invois yang dicetuskan oleh aliran perubahan.

Kami akan mulakan dengan mencipta kelas pembalut MongoDB yang mengendalikan operasi pangkalan data seperti mencipta dokumen dan mendengar menukar strim.

from pymongo import MongoClient
from pymongo.errors import PyMongoError
from typing import Dict, Any
import threading
import yaml

class MongoDatabase:
    # Same code as before #

    def process_invoice(self, invoice_document: Dict[str, Any]):
        """Process the invoice by extracting data and updating the document in MongoDB."""
        try:
            # Simulate extracting information from the invoice image
            extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
            invoice_uuid = invoice_document["_id"]

            # Update the invoice document with the extracted data
            self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"})
            print(f"Processed and updated invoice: {invoice_uuid}")
        except Exception as e:
            print(f"Error processing invoice: {str(e)}")

    def start_change_stream_listener(self):
        """Start listening to the change stream for the collection."""
        def listen():
            try:
                with self.collection.watch() as stream:
                    for change in stream:
                        if change['operationType'] == 'insert':
                            invoice_document = change['fullDocument']
                            print(f"New invoice detected: {invoice_document['_id']}")
                            self.process_invoice(invoice_document)
            except PyMongoError as e:
                print(f"Change stream error: {str(e)}")

        # Start the change stream listener in a separate thread
        listener_thread = threading.Thread(target=listen, daemon=True)
        listener_thread.start()

Untuk memudahkan saya menambah process_invoice di dalam kelas MongoDatabase. Tetapi anda harus meninggalkannya di tempat lain

API muat naik hendaklah seperti yang asal.

mongo_db = MongoDatabase(config_path='path_to_your_config.yaml')
mongo_db.start_change_stream_listener()

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json() 
        # same code as before

Ringkasan Aliran:

  1. Invois Muat Naik Pengguna: Pengguna memuat naik invois melalui API.
  2. Simpan ke MongoDB: Dokumen invois disimpan dalam MongoDB.
  3. Strim Perubahan MongoDB Dicetuskan: Strim perubahan MongoDB mengesan penyisipan dokumen baharu.
  4. Pemprosesan Invois: Strim perubahan mencetuskan fungsi process_invois, yang memproses invois dan mengemas kini dokumen dalam MongoDB dengan maklumat yang diekstrak.

Kesimpulan

Dengan aliran perubahan MongoDB, anda boleh memproses perubahan masa nyata dengan cekap dalam pangkalan data anda. Memanjangkan contoh ini, anda boleh mengendalikan pelbagai acara seperti kemas kini dan pemadaman, menjadikan aplikasi anda lebih reaktif dan responsif.

Rujukan:

  • https://www.mongodb.com/developer/languages/python/python-change-streams/#listen-to-inserts-from-an-application

Atas ialah kandungan terperinci Pemprosesan Data Masa Nyata dengan MongoDB Change Streams dan Python. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn