Maison >développement back-end >Tutoriel Python >Guide du débutant sur Kafka avec Python : traitement des données et applications en temps réel
Introduction à Kafka
Pourquoi Kafka est-il utilisé ?
Architecture Kafka
Composants :
Producteurs :
Ce sont les applications ou services qui envoient des données/messages à Kafka. Les producteurs envoient des messages vers des sujets spécifiques dans Kafka.
Sujets :
Un sujet est une catégorie ou un nom de flux dans lequel les enregistrements sont publiés. Les sujets sont cloisonnés pour permettre l'évolutivité et le parallélisme.
Partitions :
Courtiers :
Consommateurs :
Les consommateurs sont des applications ou des services qui lisent les messages des sujets.
Les consommateurs s'abonnent à des sujets et extraient les données des courtiers Kafka.
Groupes de consommateurs :
ZooKeeper :
Cas d'utilisation de Kafka
Exemple d'utilisation de Python pour démontrer comment Kafka peut être utilisé dans un scénario en temps réel :
Suivi de localisation pour une application de covoiturage.
Pour plus de simplicité, nous utiliserons la bibliothèque kafka-python pour créer à la fois un producteur (pour simuler un pilote envoyant des mises à jour de localisation) et un consommateur (pour simuler un service qui traite ces mises à jour de localisation).
1. Configurer Kafka
Assurez-vous que Kafka est exécuté localement ou utilisez un fournisseur de cloud. Vous pouvez télécharger et exécuter Kafka localement en suivant le guide de démarrage rapide Kafka.
2. Installer la bibliothèque Kafka Python
Vous pouvez installer la bibliothèque Kafka Python en utilisant pip :
pip install kafka-python
3. Python Kafka Producer (simulation des mises à jour de l'emplacement des pilotes)
Le producteur simule un pilote envoyant des mises à jour de localisation à un sujet Kafka (driver-location).
from kafka import KafkaProducer import json import time import random # Kafka Producer producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') # Serialize data to JSON ) def send_location_updates(driver_id): while True: # Simulating random GPS coordinates (latitude, longitude) location = { "driver_id": driver_id, "latitude": round(random.uniform(40.0, 41.0), 6), "longitude": round(random.uniform(-74.0, -73.0), 6), "timestamp": time.time() } # Send location data to Kafka producer.send('driver-location', location) print(f"Sent: {location}") time.sleep(5) # Sleep for 5 seconds to simulate real-time updates # Start sending updates for driver_id = 101 send_location_updates(driver_id=101)
4. Python Kafka Consumer (service de simulation de trajet)
Le consommateur lit les mises à jour de localisation à partir du sujet de localisation du conducteur et les traite.
from kafka import KafkaConsumer import json # Kafka Consumer consumer = KafkaConsumer( 'driver-location', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', # Start from the earliest message enable_auto_commit=True, group_id='location-group', value_deserializer=lambda x: json.loads(x.decode('utf-8')) # Deserialize data from JSON ) def process_location_updates(): print("Waiting for location updates...") for message in consumer: location = message.value driver_id = location['driver_id'] latitude = location['latitude'] longitude = location['longitude'] timestamp = location['timestamp'] print(f"Received location update for Driver {driver_id}: ({latitude}, {longitude}) at {timestamp}") # Start consuming location updates process_location_updates()
Explication :
Producteur (pilote envoyant des mises à jour de localisation) :
Consommateur (Service de covoiturage) :
Exécution de l'exemple (j'utilise une machine Windows) :
pip install kafka-python
from kafka import KafkaProducer import json import time import random # Kafka Producer producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') # Serialize data to JSON ) def send_location_updates(driver_id): while True: # Simulating random GPS coordinates (latitude, longitude) location = { "driver_id": driver_id, "latitude": round(random.uniform(40.0, 41.0), 6), "longitude": round(random.uniform(-74.0, -73.0), 6), "timestamp": time.time() } # Send location data to Kafka producer.send('driver-location', location) print(f"Sent: {location}") time.sleep(5) # Sleep for 5 seconds to simulate real-time updates # Start sending updates for driver_id = 101 send_location_updates(driver_id=101)
Exécutez maintenant le producteur et le consommateur dans 2 fenêtres de terminal distinctes à l'aide de python.
Exécutez le script du producteur pour simuler le pilote qui envoie des mises à jour de localisation.
Exécutez le script consommateur pour voir le service de correspondance traiter les mises à jour de localisation en temps réel.
Conclusion
Apache Kafka fournit une plateforme exceptionnelle pour gérer les flux de données en temps réel. En combinant Kafka avec Python, les développeurs peuvent créer de puissants pipelines de données et des solutions d'analyse en temps réel.
Qu'il s'agisse de suivi de véhicules, de données IoT ou de tableaux de bord en temps réel, Kafka avec Python est hautement évolutif et peut être adapté à divers cas d'utilisation. Alors, commencez à expérimenter Kafka et vous serez étonné par son potentiel dans les applications du monde réel !
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!