Rumah >pembangunan bahagian belakang >Tutorial Python >Panduan Pemula untuk Kafka dengan Python: Pemprosesan dan Aplikasi Data Masa Nyata
Pengenalan kepada Kafka
Mengapa Kafka Digunakan?
Seni Bina Kafka
Komponen:
Pengeluar:
Ini ialah aplikasi atau perkhidmatan yang menghantar data/mesej kepada Kafka. Pengeluar menolak mesej ke Topik tertentu dalam Kafka.
Topik:
Topik ialah kategori atau nama suapan yang mana rekod diterbitkan. Topik dibahagikan untuk membolehkan kebolehskalaan dan keselarian.
Pecahan:
Broker:
Pengguna:
Pengguna ialah aplikasi atau perkhidmatan yang membaca mesej daripada topik.
Pengguna melanggan topik, mengambil data daripada broker Kafka.
Kumpulan Pengguna:
Penjaga Zoo:
Kes Penggunaan Kafka
Contoh menggunakan Python untuk menunjukkan cara Kafka boleh digunakan dalam senario masa nyata :
Penjejakan lokasi untuk apl perkongsian perjalanan.
Untuk memudahkan, kami akan menggunakan perpustakaan kafka-python untuk mencipta kedua-dua pengeluar (untuk mensimulasikan pemandu menghantar kemas kini lokasi) dan pengguna (untuk mensimulasikan perkhidmatan yang memproses kemas kini lokasi ini).
1. Sediakan Kafka
Pastikan anda menjalankan Kafka secara setempat atau gunakan pembekal awan. Anda boleh memuat turun dan menjalankan Kafka secara tempatan dengan mengikuti Panduan Mula Pantas Kafka.
2. Pasang Perpustakaan Python Kafka
Anda boleh memasang perpustakaan Kafka Python menggunakan pip:
pip install kafka-python
3. Pengeluar Python Kafka (Simulasi Kemas Kini Lokasi Pemacu)
Pengeluar mensimulasikan pemandu yang menghantar kemas kini lokasi kepada topik Kafka (lokasi pemandu).
from kafka import KafkaProducer import json import time import random # Kafka Producer producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') # Serialize data to JSON ) def send_location_updates(driver_id): while True: # Simulating random GPS coordinates (latitude, longitude) location = { "driver_id": driver_id, "latitude": round(random.uniform(40.0, 41.0), 6), "longitude": round(random.uniform(-74.0, -73.0), 6), "timestamp": time.time() } # Send location data to Kafka producer.send('driver-location', location) print(f"Sent: {location}") time.sleep(5) # Sleep for 5 seconds to simulate real-time updates # Start sending updates for driver_id = 101 send_location_updates(driver_id=101)
4. Pengguna Python Kafka (Simulating Ride Matching Service)
Pengguna membaca kemas kini lokasi daripada topik lokasi pemacu dan memprosesnya.
from kafka import KafkaConsumer import json # Kafka Consumer consumer = KafkaConsumer( 'driver-location', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', # Start from the earliest message enable_auto_commit=True, group_id='location-group', value_deserializer=lambda x: json.loads(x.decode('utf-8')) # Deserialize data from JSON ) def process_location_updates(): print("Waiting for location updates...") for message in consumer: location = message.value driver_id = location['driver_id'] latitude = location['latitude'] longitude = location['longitude'] timestamp = location['timestamp'] print(f"Received location update for Driver {driver_id}: ({latitude}, {longitude}) at {timestamp}") # Start consuming location updates process_location_updates()
Penjelasan:
Pengeluar (Pemandu menghantar kemas kini lokasi):
Pengguna (Perkhidmatan padanan tunggangan):
Menjalankan Contoh (saya menggunakan mesin windows):
pip install kafka-python
from kafka import KafkaProducer import json import time import random # Kafka Producer producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') # Serialize data to JSON ) def send_location_updates(driver_id): while True: # Simulating random GPS coordinates (latitude, longitude) location = { "driver_id": driver_id, "latitude": round(random.uniform(40.0, 41.0), 6), "longitude": round(random.uniform(-74.0, -73.0), 6), "timestamp": time.time() } # Send location data to Kafka producer.send('driver-location', location) print(f"Sent: {location}") time.sleep(5) # Sleep for 5 seconds to simulate real-time updates # Start sending updates for driver_id = 101 send_location_updates(driver_id=101)
Sekarang Jalankan pengeluar dan Pengguna dalam 2 tetingkap terminal berasingan menggunakan python.
Jalankan skrip pengeluar untuk mensimulasikan pemandu menghantar kemas kini lokasi.
Jalankan skrip pengguna untuk melihat perkhidmatan pemadanan perjalanan yang memproses kemas kini lokasi dalam masa nyata.
Kesimpulan
Apache Kafka menyediakan platform yang luar biasa untuk mengurus aliran data masa nyata. Dengan menggabungkan Kafka dengan Python, pembangun boleh membina saluran paip data yang berkuasa dan penyelesaian analitik masa nyata.
Sama ada penjejakan kenderaan, data IoT atau papan pemuka masa nyata, Kafka dengan Python sangat berskala dan boleh disesuaikan dengan pelbagai kes penggunaan. Jadi, mula bereksperimen dengan Kafka, dan anda akan kagum dengan potensinya dalam aplikasi dunia sebenar!
Atas ialah kandungan terperinci Panduan Pemula untuk Kafka dengan Python: Pemprosesan dan Aplikasi Data Masa Nyata. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!