Maison >développement back-end >Tutoriel Python >Guide du débutant sur Kafka avec Python : traitement des données et applications en temps réel

Guide du débutant sur Kafka avec Python : traitement des données et applications en temps réel

Mary-Kate Olsen
Mary-Kate Olsenoriginal
2024-11-05 17:41:031042parcourir

A Beginner’s Guide to Kafka with Python: Real-Time Data Processing and Applications

Introduction à Kafka

  • Kafka est une plateforme de streaming d'événements distribués open source développée par Apache.
  • Créé à l'origine par LinkedIn, il a été conçu pour gérer le streaming de données à haut débit, tolérant aux pannes et en temps réel.
  • Kafka permet aux systèmes de publier et de s'abonner à des flux d'enregistrements (messages), de les traiter et de les stocker efficacement.

Pourquoi Kafka est-il utilisé ?

  • Haut débit : Kafka peut gérer des millions de messages par seconde.
  • Tolérance aux pannes : Kafka est distribué, ce qui signifie qu'il peut répliquer les données sur plusieurs nœuds pour garantir la fiabilité.
  • Durabilité : Kafka conserve les données sur le disque et peut relire les messages, garantissant ainsi la fiabilité de la transmission des messages.
  • Traitement en temps réel : Kafka peut traiter des flux de données en temps réel, ce qui est idéal pour les applications telles que la surveillance, l'analyse ou les systèmes pilotés par événements.
  • Évolutivité : Kafka peut facilement évoluer en ajoutant davantage de courtiers pour gérer de gros volumes de données.
  • Systèmes de découplage : Kafka agit comme une couche intermédiaire pour la messagerie, permettant à différents systèmes de communiquer de manière asynchrone.

Architecture Kafka

A Beginner’s Guide to Kafka with Python: Real-Time Data Processing and Applications

Composants :

Producteurs :
Ce sont les applications ou services qui envoient des données/messages à Kafka. Les producteurs envoient des messages vers des sujets spécifiques dans Kafka.

Sujets :
Un sujet est une catégorie ou un nom de flux dans lequel les enregistrements sont publiés. Les sujets sont cloisonnés pour permettre l'évolutivité et le parallélisme.

Partitions :

  • Chaque sujet est divisé en une ou plusieurs partitions.
  • Les partitions permettent à Kafka de gérer plus de messages et d'assistance traitement parallèle.
  • Chaque partition possède un identifiant unique et peut stocker un sous-ensemble des données du sujet.

Courtiers :

  • Kafka fonctionne comme un cluster de courtiers (serveurs), chacun gérant des données pour plusieurs sujets et partitions.
  • Les courtiers stockent et gèrent les partitions, gérant la lecture et l'écriture demandes des producteurs et des consommateurs.
  • Chaque courtier est identifié par un identifiant unique.

Consommateurs :

Les consommateurs sont des applications ou des services qui lisent les messages des sujets.
Les consommateurs s'abonnent à des sujets et extraient les données des courtiers Kafka.

Groupes de consommateurs :

  • Les consommateurs sont organisés en groupes de consommateurs.
  • Chaque message au sein d'une partition est transmis à un seul consommateur au sein du groupe, ce qui permet d'équilibrer la charge entre plusieurs consommateurs.

ZooKeeper :

  • ZooKeeper gère et coordonne les courtiers Kafka, en gardant une trace des courtiers, des sujets et des partitions.
  • Il aide à gérer l'élection du leader pour les partitions et surveille l'état du cluster.

Cas d'utilisation de Kafka

  • Analyse en temps réel : les entreprises utilisent Kafka pour traiter et analyser des flux de données en temps réel pour les systèmes de surveillance, comme l'analyse des transactions financières.
  • Agrégation de journaux : Kafka consolide les journaux de plusieurs services ou applications à des fins de traitement, d'alerte ou de stockage.
  • Pipelines de données : Kafka est utilisé comme épine dorsale pour transférer de grandes quantités de données entre différents systèmes ou services (pipelines ETL).
  • Applications IoT : Kafka peut gérer les flux de données des capteurs IoT, permettant une analyse et des réponses en temps réel.
  • Communication par microservices : Kafka sert de plate-forme de messagerie fiable pour les architectures de microservices, permettant une communication asynchrone et découplée.
  • Suivi des véhicules en temps réel : l'exemple suivant illustre comment Kafka est utilisé pour suivre les véhicules en temps réel.

Exemple d'utilisation de Python pour démontrer comment Kafka peut être utilisé dans un scénario en temps réel :

Suivi de localisation pour une application de covoiturage.

Pour plus de simplicité, nous utiliserons la bibliothèque kafka-python pour créer à la fois un producteur (pour simuler un pilote envoyant des mises à jour de localisation) et un consommateur (pour simuler un service qui traite ces mises à jour de localisation).

1. Configurer Kafka
Assurez-vous que Kafka est exécuté localement ou utilisez un fournisseur de cloud. Vous pouvez télécharger et exécuter Kafka localement en suivant le guide de démarrage rapide Kafka.

2. Installer la bibliothèque Kafka Python
Vous pouvez installer la bibliothèque Kafka Python en utilisant pip :

pip install kafka-python

3. Python Kafka Producer (simulation des mises à jour de l'emplacement des pilotes)
Le producteur simule un pilote envoyant des mises à jour de localisation à un sujet Kafka (driver-location).

from kafka import KafkaProducer
import json
import time
import random

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data to JSON
)

def send_location_updates(driver_id):
    while True:
        # Simulating random GPS coordinates (latitude, longitude)
        location = {
            "driver_id": driver_id,
            "latitude": round(random.uniform(40.0, 41.0), 6),
            "longitude": round(random.uniform(-74.0, -73.0), 6),
            "timestamp": time.time()
        }
        # Send location data to Kafka
        producer.send('driver-location', location)
        print(f"Sent: {location}")
        time.sleep(5)  # Sleep for 5 seconds to simulate real-time updates

# Start sending updates for driver_id = 101
send_location_updates(driver_id=101)

4. Python Kafka Consumer (service de simulation de trajet)
Le consommateur lit les mises à jour de localisation à partir du sujet de localisation du conducteur et les traite.

from kafka import KafkaConsumer
import json

# Kafka Consumer
consumer = KafkaConsumer(
    'driver-location',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',  # Start from the earliest message
    enable_auto_commit=True,
    group_id='location-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))  # Deserialize data from JSON
)

def process_location_updates():
    print("Waiting for location updates...")
    for message in consumer:
        location = message.value
        driver_id = location['driver_id']
        latitude = location['latitude']
        longitude = location['longitude']
        timestamp = location['timestamp']
        print(f"Received location update for Driver {driver_id}: ({latitude}, {longitude}) at {timestamp}")

# Start consuming location updates
process_location_updates()

Explication :

Producteur (pilote envoyant des mises à jour de localisation) :

  • Le producteur envoie un objet JSON au sujet Kafka driver-location avec des champs tels que driver_id, latitude, longitude et timestamp.
  • Le producteur simule les mises à jour GPS en temps réel en envoyant des données de localisation toutes les 5 secondes.

Consommateur (Service de covoiturage) :

  • Le consommateur s'abonne au sujet de localisation des chauffeurs et écoute les mises à jour.
  • Chaque fois qu'une mise à jour de localisation est publiée sur Kafka, le consommateur la traite et l'imprime, simulant un service qui utilise ces données pour faire correspondre les conducteurs et les passagers.

Exécution de l'exemple (j'utilise une machine Windows) :

  1. Démarrez le gardien de zoo
pip install kafka-python
  1. Démarrez votre serveur Kafka local.
from kafka import KafkaProducer
import json
import time
import random

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data to JSON
)

def send_location_updates(driver_id):
    while True:
        # Simulating random GPS coordinates (latitude, longitude)
        location = {
            "driver_id": driver_id,
            "latitude": round(random.uniform(40.0, 41.0), 6),
            "longitude": round(random.uniform(-74.0, -73.0), 6),
            "timestamp": time.time()
        }
        # Send location data to Kafka
        producer.send('driver-location', location)
        print(f"Sent: {location}")
        time.sleep(5)  # Sleep for 5 seconds to simulate real-time updates

# Start sending updates for driver_id = 101
send_location_updates(driver_id=101)

Exécutez maintenant le producteur et le consommateur dans 2 fenêtres de terminal distinctes à l'aide de python.

  1. Exécutez le script du producteur pour simuler le pilote qui envoie des mises à jour de localisation.

  2. Exécutez le script consommateur pour voir le service de correspondance traiter les mises à jour de localisation en temps réel.

Conclusion
Apache Kafka fournit une plateforme exceptionnelle pour gérer les flux de données en temps réel. En combinant Kafka avec Python, les développeurs peuvent créer de puissants pipelines de données et des solutions d'analyse en temps réel.

Qu'il s'agisse de suivi de véhicules, de données IoT ou de tableaux de bord en temps réel, Kafka avec Python est hautement évolutif et peut être adapté à divers cas d'utilisation. Alors, commencez à expérimenter Kafka et vous serez étonné par son potentiel dans les applications du monde réel !

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn