cari
Rumahpembangunan bahagian belakangTutorial PythonPemprosesan Data Masa Nyata dengan MongoDB Change Streams dan Python

Real-Time Data Processing with MongoDB Change Streams and Python

pengenalan

Tukar strim dalam MongoDB membolehkan aplikasi anda bertindak balas terhadap perubahan data masa nyata serta-merta. Dalam catatan blog ini, saya akan menunjukkan kepada anda cara menyediakan dan menggunakan strim perubahan dengan Python, tanpa mendalami teori. Kami akan mencipta atur cara mudah yang mendengar acara pangkalan data, memfokuskan pada sisipan dahulu, kemudian memanjangkannya kepada jenis acara lain.

Bermula dengan Tukar Strim

Tukar strim membenarkan apl anda mendengar acara pangkalan data tertentu, seperti sisipan atau kemas kini dan bertindak balas dengan segera. Bayangkan senario di mana pengguna mengemas kini profil mereka; dengan strim perubahan, anda boleh mencerminkan serta-merta perubahan ini merentas apl anda tanpa memerlukan pengguna memuat semula halaman. Sebelum ciri ini, anda perlu sentiasa meninjau pangkalan data atau menggunakan kaedah yang kompleks seperti mengikuti Oplog MongoDB. Tukar strim memudahkan perkara ini dengan menyediakan API yang lebih mesra pengguna.

Apa yang Berlaku Tanpa Perubahan Strim

Katakan saya mempunyai API untuk memuat naik invois. Alirannya ialah pelanggan akan memuat naik imej invois ke MongoDB, kemudian kami mengekstrak maklumat dengan AI dan mengemas kini invois. Berikut ialah contoh kod untuk memuat naik invois:

from pymongo import MongoClient

class MongoDatabase:
    def __init__(self, config_path: str):
        # Load the YAML configuration file using the provided utility function
        self.config_path = config_path
        self.config = read_config(path=self.config_path)

        # Initialize MongoDB connection
        self.client = MongoClient(self.config['mongodb']['uri'])
        self.db = self.client[self.config['mongodb']['database']]
        self.collection = self.db[self.config['mongodb']['collection']]

    def create_document(self, data: Dict[str, Any]) -> str:
        # Insert a new document and return the automatically generated document ID
        result = self.collection.insert_one(data)
        return str(result.inserted_id)
    def update_document_by_id(self, document_id: str, data: Dict[str, Any]):
        try:
            self.collection.update_one({"_id": document_id}, {"$set": data})
        except PyMongoError as e:
            print(f"Error updating document: {e}")

Mula-mula saya akan membungkus pymongo di dalam kelas, untuk berjaga-jaga :))

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        # Generate invoice UUID
        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "last_modified_at": None,
            "last_modified_by": None,
            "status": "not extracted",
            "invoice_image_base64": img,
            "invoice_info": {}
        }

        invoice_uuid = mongo_db.create_document(invoice_document)
        print('Result saved to MongoDB:', invoice_uuid)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"invoice_uuid": invoice_uuid, "message": "Upload successful"}
        )
    except Exception as e:
        # Handle errors
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )

Persoalan munasabah mungkin: mengapa tidak menunggu sehingga model AI memproses imej sebelum mengemas kini? Masalahnya ialah proses itu mengambil masa sekitar 4-5 minit dan kami tidak mahu menjejaskan pengalaman pengguna.

Bagaimana dengan Kafka?

Pilihan lain boleh menggunakan Kafka. Kami boleh menerbitkan imej ke topik Kafka, dan perkhidmatan lain akan memproses data.

Kebaikan:

  • Mengasingkan perkhidmatan muat naik dan pemprosesan.
  • Cekap untuk pemprosesan data masa nyata berskala besar.
  • Pengalaman pengguna yang dipertingkatkan: Pengguna mendapat respons segera selepas memuat naik invois. Pemprosesan dikendalikan secara tidak segerak.

Keburukan:

  • Memperkenalkan kerumitan tambahan.
  • Memerlukan persediaan dan penyelenggaraan infrastruktur Kafka.
  • Mungkin berlebihan untuk aplikasi berskala kecil.

Berikut ialah pelaksanaan asas untuk menunjukkan penggunaan Kafka untuk mengendalikan proses muat naik invois.

Pengguna memuat naik invois melalui titik akhir API. Imej invois disimpan dalam MongoDB dan mesej dihantar ke topik Kafka untuk diproses selanjutnya.

from kafka import KafkaProducer
import json
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from datetime import datetime, timezone

app = FastAPI()

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "status": "not extracted",
            "invoice_image_base64": img,
        }

        # Save the document to MongoDB
        invoice_uuid = mongo_db.create_document(invoice_document)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        # Send a message to Kafka topic
        producer.send('invoice_topic', invoice_document)
        producer.flush()

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"message": "Invoice upload received and will be processed"}
        )
    except Exception as e:
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )

Pengguna Kafka mendengar topik_invois. Apabila ia menerima mesej, ia memproses invois (cth., mengekstrak maklumat daripada imej) dan mengemas kini dokumen yang sepadan dalam MongoDB.

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'invoice_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    invoice_document = message.value

    # Process the invoice, extract information, and update the document in MongoDB
    invoice_uuid = invoice_document["_id"]
    extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])

    mongo_db.update_document_by_id(invoice_uuid, {
        "invoice_info": extracted_data, 
        "status": "extracted"
    })

    print(f"Processed and updated invoice: {invoice_uuid}")

Ringkasan Aliran:

  1. Muat Naik Invois: Pengguna memuat naik invois melalui API.
  2. Simpan ke MongoDB: Dokumen invois disimpan dalam MongoDB.
  3. Hantar Mesej kepada Kafka: Mesej yang mengandungi butiran invois dihantar ke topik Kafka (topik_invois).
  4. Invois Proses Pengguna Kafka: Pengguna Kafka mendengar invois_topic, memproses invois dan mengemas kini dokumen yang sepadan dalam MongoDB dengan maklumat yang diekstrak.

Wah, saya tidak percaya saya berjaya menulis ini sendiri! Ia benar-benar menyerlahkan usaha yang terlibat. Dan itu tidak mengambil kira kerumitan mengurus dan mengkonfigurasi tiga perkhidmatan: MongoDB, Kafka dan perkhidmatan Invois.

Pemprosesan Invois dengan MongoDB Change Streams

Berikut ialah kod lengkap yang ditulis semula dalam Markdown untuk menunjukkan aliran perubahan MongoDB, termasuk kaedah dan fungsi tambahan untuk mengendalikan pemprosesan invois yang dicetuskan oleh aliran perubahan.

Kami akan mulakan dengan mencipta kelas pembalut MongoDB yang mengendalikan operasi pangkalan data seperti mencipta dokumen dan mendengar menukar strim.

from pymongo import MongoClient
from pymongo.errors import PyMongoError
from typing import Dict, Any
import threading
import yaml

class MongoDatabase:
    # Same code as before #

    def process_invoice(self, invoice_document: Dict[str, Any]):
        """Process the invoice by extracting data and updating the document in MongoDB."""
        try:
            # Simulate extracting information from the invoice image
            extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
            invoice_uuid = invoice_document["_id"]

            # Update the invoice document with the extracted data
            self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"})
            print(f"Processed and updated invoice: {invoice_uuid}")
        except Exception as e:
            print(f"Error processing invoice: {str(e)}")

    def start_change_stream_listener(self):
        """Start listening to the change stream for the collection."""
        def listen():
            try:
                with self.collection.watch() as stream:
                    for change in stream:
                        if change['operationType'] == 'insert':
                            invoice_document = change['fullDocument']
                            print(f"New invoice detected: {invoice_document['_id']}")
                            self.process_invoice(invoice_document)
            except PyMongoError as e:
                print(f"Change stream error: {str(e)}")

        # Start the change stream listener in a separate thread
        listener_thread = threading.Thread(target=listen, daemon=True)
        listener_thread.start()

Untuk memudahkan saya menambah process_invoice di dalam kelas MongoDatabase. Tetapi anda harus meninggalkannya di tempat lain

API muat naik hendaklah seperti yang asal.

mongo_db = MongoDatabase(config_path='path_to_your_config.yaml')
mongo_db.start_change_stream_listener()

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json() 
        # same code as before

Ringkasan Aliran:

  1. Invois Muat Naik Pengguna: Pengguna memuat naik invois melalui API.
  2. Simpan ke MongoDB: Dokumen invois disimpan dalam MongoDB.
  3. Strim Perubahan MongoDB Dicetuskan: Strim perubahan MongoDB mengesan penyisipan dokumen baharu.
  4. Pemprosesan Invois: Strim perubahan mencetuskan fungsi process_invois, yang memproses invois dan mengemas kini dokumen dalam MongoDB dengan maklumat yang diekstrak.

Kesimpulan

Dengan aliran perubahan MongoDB, anda boleh memproses perubahan masa nyata dengan cekap dalam pangkalan data anda. Memanjangkan contoh ini, anda boleh mengendalikan pelbagai acara seperti kemas kini dan pemadaman, menjadikan aplikasi anda lebih reaktif dan responsif.

Rujukan:

  • https://www.mongodb.com/developer/languages/python/python-change-streams/#listen-to-inserts-from-an-application

Atas ialah kandungan terperinci Pemprosesan Data Masa Nyata dengan MongoDB Change Streams dan Python. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Python vs C: Memahami perbezaan utamaPython vs C: Memahami perbezaan utamaApr 21, 2025 am 12:18 AM

Python dan C masing -masing mempunyai kelebihan sendiri, dan pilihannya harus berdasarkan keperluan projek. 1) Python sesuai untuk pembangunan pesat dan pemprosesan data kerana sintaks ringkas dan menaip dinamik. 2) C sesuai untuk prestasi tinggi dan pengaturcaraan sistem kerana menaip statik dan pengurusan memori manual.

Python vs C: Bahasa mana yang harus dipilih untuk projek anda?Python vs C: Bahasa mana yang harus dipilih untuk projek anda?Apr 21, 2025 am 12:17 AM

Memilih Python atau C bergantung kepada keperluan projek: 1) Jika anda memerlukan pembangunan pesat, pemprosesan data dan reka bentuk prototaip, pilih Python; 2) Jika anda memerlukan prestasi tinggi, latensi rendah dan kawalan perkakasan yang rapat, pilih C.

Mencapai matlamat python anda: kekuatan 2 jam sehariMencapai matlamat python anda: kekuatan 2 jam sehariApr 20, 2025 am 12:21 AM

Dengan melabur 2 jam pembelajaran python setiap hari, anda dapat meningkatkan kemahiran pengaturcaraan anda dengan berkesan. 1. Ketahui Pengetahuan Baru: Baca dokumen atau tutorial menonton. 2. Amalan: Tulis kod dan latihan lengkap. 3. Kajian: Menyatukan kandungan yang telah anda pelajari. 4. Amalan Projek: Sapukan apa yang telah anda pelajari dalam projek sebenar. Pelan pembelajaran berstruktur seperti ini dapat membantu anda menguasai Python secara sistematik dan mencapai matlamat kerjaya.

Memaksimumkan 2 Jam: Strategi Pembelajaran Python BerkesanMemaksimumkan 2 Jam: Strategi Pembelajaran Python BerkesanApr 20, 2025 am 12:20 AM

Kaedah untuk belajar python dengan cekap dalam masa dua jam termasuk: 1. Semak pengetahuan asas dan pastikan anda sudah biasa dengan pemasangan Python dan sintaks asas; 2. Memahami konsep teras python, seperti pembolehubah, senarai, fungsi, dan lain -lain; 3. Menguasai penggunaan asas dan lanjutan dengan menggunakan contoh; 4. Belajar kesilapan biasa dan teknik debugging; 5. Memohon pengoptimuman prestasi dan amalan terbaik, seperti menggunakan komprehensif senarai dan mengikuti panduan gaya PEP8.

Memilih antara python dan c: bahasa yang sesuai untuk andaMemilih antara python dan c: bahasa yang sesuai untuk andaApr 20, 2025 am 12:20 AM

Python sesuai untuk pemula dan sains data, dan C sesuai untuk pengaturcaraan sistem dan pembangunan permainan. 1. Python adalah mudah dan mudah digunakan, sesuai untuk sains data dan pembangunan web. 2.C menyediakan prestasi dan kawalan yang tinggi, sesuai untuk pembangunan permainan dan pengaturcaraan sistem. Pilihan harus berdasarkan keperluan projek dan kepentingan peribadi.

Python vs C: Analisis perbandingan bahasa pengaturcaraanPython vs C: Analisis perbandingan bahasa pengaturcaraanApr 20, 2025 am 12:14 AM

Python lebih sesuai untuk sains data dan perkembangan pesat, manakala C lebih sesuai untuk prestasi tinggi dan pengaturcaraan sistem. 1. Sintaks Python adalah ringkas dan mudah dipelajari, sesuai untuk pemprosesan data dan pengkomputeran saintifik. 2.C mempunyai sintaks kompleks tetapi prestasi yang sangat baik dan sering digunakan dalam pembangunan permainan dan pengaturcaraan sistem.

2 jam sehari: potensi pembelajaran python2 jam sehari: potensi pembelajaran pythonApr 20, 2025 am 12:14 AM

Adalah mungkin untuk melabur dua jam sehari untuk belajar Python. 1. Belajar Pengetahuan Baru: Ketahui konsep baru dalam satu jam, seperti senarai dan kamus. 2. Amalan dan Amalan: Gunakan satu jam untuk melakukan latihan pengaturcaraan, seperti menulis program kecil. Melalui perancangan dan ketekunan yang munasabah, anda boleh menguasai konsep teras Python dalam masa yang singkat.

Python vs C: Lengkung pembelajaran dan kemudahan penggunaanPython vs C: Lengkung pembelajaran dan kemudahan penggunaanApr 19, 2025 am 12:20 AM

Python lebih mudah dipelajari dan digunakan, manakala C lebih kuat tetapi kompleks. 1. Sintaks Python adalah ringkas dan sesuai untuk pemula. Penaipan dinamik dan pengurusan memori automatik menjadikannya mudah digunakan, tetapi boleh menyebabkan kesilapan runtime. 2.C menyediakan kawalan peringkat rendah dan ciri-ciri canggih, sesuai untuk aplikasi berprestasi tinggi, tetapi mempunyai ambang pembelajaran yang tinggi dan memerlukan memori manual dan pengurusan keselamatan jenis.

See all articles

Alat AI Hot

Undresser.AI Undress

Undresser.AI Undress

Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover

AI Clothes Remover

Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool

Undress AI Tool

Gambar buka pakaian secara percuma

Clothoff.io

Clothoff.io

Penyingkiran pakaian AI

Video Face Swap

Video Face Swap

Tukar muka dalam mana-mana video dengan mudah menggunakan alat tukar muka AI percuma kami!

Alat panas

Notepad++7.3.1

Notepad++7.3.1

Editor kod yang mudah digunakan dan percuma

DVWA

DVWA

Damn Vulnerable Web App (DVWA) ialah aplikasi web PHP/MySQL yang sangat terdedah. Matlamat utamanya adalah untuk menjadi bantuan bagi profesional keselamatan untuk menguji kemahiran dan alatan mereka dalam persekitaran undang-undang, untuk membantu pembangun web lebih memahami proses mengamankan aplikasi web, dan untuk membantu guru/pelajar mengajar/belajar dalam persekitaran bilik darjah Aplikasi web keselamatan. Matlamat DVWA adalah untuk mempraktikkan beberapa kelemahan web yang paling biasa melalui antara muka yang mudah dan mudah, dengan pelbagai tahap kesukaran. Sila ambil perhatian bahawa perisian ini

MantisBT

MantisBT

Mantis ialah alat pengesan kecacatan berasaskan web yang mudah digunakan yang direka untuk membantu dalam pengesanan kecacatan produk. Ia memerlukan PHP, MySQL dan pelayan web. Lihat perkhidmatan demo dan pengehosan kami.

SublimeText3 versi Cina

SublimeText3 versi Cina

Versi Cina, sangat mudah digunakan

mPDF

mPDF

mPDF ialah perpustakaan PHP yang boleh menjana fail PDF daripada HTML yang dikodkan UTF-8. Pengarang asal, Ian Back, menulis mPDF untuk mengeluarkan fail PDF "dengan cepat" dari tapak webnya dan mengendalikan bahasa yang berbeza. Ia lebih perlahan dan menghasilkan fail yang lebih besar apabila menggunakan fon Unicode daripada skrip asal seperti HTML2FPDF, tetapi menyokong gaya CSS dsb. dan mempunyai banyak peningkatan. Menyokong hampir semua bahasa, termasuk RTL (Arab dan Ibrani) dan CJK (Cina, Jepun dan Korea). Menyokong elemen peringkat blok bersarang (seperti P, DIV),