Heim >Backend-Entwicklung >Python-Tutorial >Ein Leitfaden für Anfänger zu Kafka mit Python: Echtzeit-Datenverarbeitung und -anwendungen

Ein Leitfaden für Anfänger zu Kafka mit Python: Echtzeit-Datenverarbeitung und -anwendungen

Mary-Kate Olsen
Mary-Kate OlsenOriginal
2024-11-05 17:41:031116Durchsuche

A Beginner’s Guide to Kafka with Python: Real-Time Data Processing and Applications

Einführung in Kafka

  • Kafka ist eine von Apache entwickelte Open-Source-Plattform für verteiltes Event-Streaming.
  • Ursprünglich von LinkedIn entwickelt, wurde es für hohen Durchsatz, Fehlertoleranz und Echtzeit-Datenstreaming entwickelt.
  • Kafka ermöglicht es Systemen, Datenströme (Nachrichten) zu veröffentlichen und zu abonnieren, sie zu verarbeiten und effizient zu speichern.

Warum wird Kafka verwendet?

  • Hoher Durchsatz: Kafka kann Millionen von Nachrichten pro Sekunde verarbeiten.
  • Fehlertoleranz: Kafka ist verteilt, was bedeutet, dass es Daten über mehrere Knoten hinweg replizieren kann, um Zuverlässigkeit zu gewährleisten.
  • Haltbarkeit: Kafka speichert Daten auf der Festplatte und kann Nachrichten wiedergeben, wodurch die Zuverlässigkeit bei der Nachrichtenübermittlung gewährleistet wird.
  • Echtzeitverarbeitung: Kafka kann Datenströme in Echtzeit verarbeiten, ideal für Anwendungen wie Überwachung, Analyse oder ereignisgesteuerte Systeme.
  • Skalierbarkeit: Kafka kann problemlos skaliert werden, indem weitere Broker hinzugefügt werden, um große Datenmengen zu verarbeiten.
  • Systeme entkoppeln: Kafka fungiert als mittlere Schicht für die Nachrichtenübermittlung und ermöglicht die asynchrone Kommunikation verschiedener Systeme.

Kafka-Architektur

A Beginner’s Guide to Kafka with Python: Real-Time Data Processing and Applications

Komponenten:

Produzenten:
Dies sind die Anwendungen oder Dienste, die Daten/Nachrichten an Kafka senden. Produzenten pushen Nachrichten zu bestimmten Themen innerhalb von Kafka.

Themen:
Ein Thema ist eine Kategorie oder ein Feedname, unter dem Datensätze veröffentlicht werden. Die Themen sind partitioniert, um Skalierbarkeit und Parallelität zu ermöglichen.

Partitionen:

  • Jedes Thema ist in eine oder mehrere Partitionen unterteilt.
  • Partitionen ermöglichen es Kafka, mehr Nachrichten zu verarbeiten und zu unterstützen Parallelverarbeitung.
  • Jede Partition hat eine eindeutige ID und kann eine Teilmenge davon speichern Themendaten.

Makler:

  • Kafka läuft als Cluster von Brokern (Servern), von denen jeder Daten verarbeitet für mehrere Themen und Partitionen.
  • Broker speichern und verwalten Partitionen und verarbeiten Lese- und Schreibvorgänge Anfragen von Produzenten und Verbrauchern.
  • Jeder Broker wird durch eine eindeutige ID identifiziert.

Verbraucher:

Verbraucher sind Anwendungen oder Dienste, die Nachrichten aus Themen lesen.
Verbraucher abonnieren Themen und beziehen Daten von Kafka-Brokern.

Verbrauchergruppen:

  • Verbraucher sind in Verbrauchergruppen organisiert.
  • Jede Nachricht innerhalb einer Partition wird nur an einen Verbraucher innerhalb der Gruppe übermittelt, was einen Lastausgleich über mehrere Verbraucher hinweg ermöglicht.

ZooKeeper:

  • ZooKeeper verwaltet und koordiniert Kafka-Broker und behält den Überblick über Broker, Themen und Partitionen.
  • Es hilft bei der Verwaltung der Leiterwahl für Partitionen und überwacht den Clusterzustand.

Anwendungsfälle von Kafka

  • Echtzeitanalysen: Unternehmen nutzen Kafka, um Datenströme in Echtzeit für Überwachungssysteme wie die Analyse von Finanztransaktionen zu verarbeiten und zu analysieren.
  • Protokollaggregation: Kafka konsolidiert Protokolle von mehreren Diensten oder Anwendungen zur Verarbeitung, Warnung oder Speicherung.
  • Datenpipelines: Kafka wird als Rückgrat für die Übertragung großer Datenmengen zwischen verschiedenen Systemen oder Diensten (ETL-Pipelines) verwendet.
  • IoT-Anwendungen: Kafka kann die Datenströme von IoT-Sensoren verarbeiten und ermöglicht so Echtzeitanalysen und -reaktionen.
  • Microservices-Kommunikation: Kafka dient als zuverlässige Messaging-Plattform für Microservices-Architekturen und ermöglicht asynchrone, entkoppelte Kommunikation.
  • Fahrzeugverfolgung in Echtzeit: Das folgende Beispiel zeigt, wie Kafka zur Echtzeitverfolgung von Fahrzeugen verwendet wird.

Beispiel mit Python, um zu demonstrieren, wie Kafka in einem Echtzeitszenario verwendet werden kann:

Standortverfolgung für eine Mitfahr-App.

Der Einfachheit halber verwenden wir die Kafka-Python-Bibliothek, um sowohl einen Produzenten (um einen Treiber zu simulieren, der Standortaktualisierungen sendet) als auch einen Verbraucher (um einen Dienst zu simulieren, der diese Standortaktualisierungen verarbeitet) zu erstellen.

1. Kafka einrichten
Stellen Sie sicher, dass Kafka lokal ausgeführt wird, oder nutzen Sie einen Cloud-Anbieter. Sie können Kafka herunterladen und lokal ausführen, indem Sie der Kafka-Schnellstartanleitung folgen.

2. Installieren Sie die Kafka Python-Bibliothek
Sie können die Kafka-Python-Bibliothek mit pip:
installieren

pip install kafka-python

3. Python Kafka Producer (Simulation von Treiberstandortaktualisierungen)
Der Produzent simuliert einen Treiber, der Standortaktualisierungen an ein Kafka-Thema (Treiberstandort) sendet.

from kafka import KafkaProducer
import json
import time
import random

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data to JSON
)

def send_location_updates(driver_id):
    while True:
        # Simulating random GPS coordinates (latitude, longitude)
        location = {
            "driver_id": driver_id,
            "latitude": round(random.uniform(40.0, 41.0), 6),
            "longitude": round(random.uniform(-74.0, -73.0), 6),
            "timestamp": time.time()
        }
        # Send location data to Kafka
        producer.send('driver-location', location)
        print(f"Sent: {location}")
        time.sleep(5)  # Sleep for 5 seconds to simulate real-time updates

# Start sending updates for driver_id = 101
send_location_updates(driver_id=101)

4. Python Kafka Consumer (Simulierender Ride-Matching-Dienst)
Der Verbraucher liest die Standortaktualisierungen aus dem Treiberstandortthema und verarbeitet sie.

from kafka import KafkaConsumer
import json

# Kafka Consumer
consumer = KafkaConsumer(
    'driver-location',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',  # Start from the earliest message
    enable_auto_commit=True,
    group_id='location-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))  # Deserialize data from JSON
)

def process_location_updates():
    print("Waiting for location updates...")
    for message in consumer:
        location = message.value
        driver_id = location['driver_id']
        latitude = location['latitude']
        longitude = location['longitude']
        timestamp = location['timestamp']
        print(f"Received location update for Driver {driver_id}: ({latitude}, {longitude}) at {timestamp}")

# Start consuming location updates
process_location_updates()

Erklärung:

Produzent (Treiber, der Standortaktualisierungen sendet):

  • Der Produzent sendet ein JSON-Objekt an das Kafka-Thema „driver-location“ mit Feldern wie „driver_id“, „Breitengrad“, „Längengrad“ und „Zeitstempel“.
  • Der Hersteller simuliert Echtzeit-GPS-Updates, indem er alle 5 Sekunden Standortdaten sendet.

Verbraucher (Mitfahrservice):

  • Der Verbraucher abonniert das Thema „Treiberstandort“ und wartet auf Aktualisierungen.
  • Jedes Mal, wenn eine Standortaktualisierung in Kafka veröffentlicht wird, verarbeitet und druckt der Verbraucher sie und simuliert so einen Dienst, der diese Daten verwendet, um Fahrer und Mitfahrer abzugleichen.

Ausführen des Beispiels (ich verwende einen Windows-Rechner):

  1. Starten Sie den Zookeeper
pip install kafka-python
  1. Starten Sie Ihren lokalen Kafka-Server.
from kafka import KafkaProducer
import json
import time
import random

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data to JSON
)

def send_location_updates(driver_id):
    while True:
        # Simulating random GPS coordinates (latitude, longitude)
        location = {
            "driver_id": driver_id,
            "latitude": round(random.uniform(40.0, 41.0), 6),
            "longitude": round(random.uniform(-74.0, -73.0), 6),
            "timestamp": time.time()
        }
        # Send location data to Kafka
        producer.send('driver-location', location)
        print(f"Sent: {location}")
        time.sleep(5)  # Sleep for 5 seconds to simulate real-time updates

# Start sending updates for driver_id = 101
send_location_updates(driver_id=101)

Führen Sie nun den Produzenten und den Verbraucher in zwei separaten Terminalfenstern mit Python aus.

  1. Führen Sie das Producer-Skript aus, um den Treiber zu simulieren, der Standortaktualisierungen sendet.

  2. Führen Sie das Verbraucherskript aus, um zu sehen, wie der Ride-Matching-Dienst die Standortaktualisierungen in Echtzeit verarbeitet.

Fazit
Apache Kafka bietet eine außergewöhnliche Plattform für die Verwaltung von Echtzeit-Datenströmen. Durch die Kombination von Kafka mit Python können Entwickler leistungsstarke Datenpipelines und Echtzeit-Analyselösungen erstellen.

Ob Fahrzeugverfolgung, IoT-Daten oder Echtzeit-Dashboards, Kafka mit Python ist hoch skalierbar und kann an verschiedene Anwendungsfälle angepasst werden. Beginnen Sie also mit Kafka zu experimentieren und Sie werden von seinem Potenzial in realen Anwendungen begeistert sein!

Das obige ist der detaillierte Inhalt vonEin Leitfaden für Anfänger zu Kafka mit Python: Echtzeit-Datenverarbeitung und -anwendungen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn