Rumah >pembangunan bahagian belakang >Tutorial Python >Panduan Pemula untuk Kafka dengan Python: Pemprosesan dan Aplikasi Data Masa Nyata

Panduan Pemula untuk Kafka dengan Python: Pemprosesan dan Aplikasi Data Masa Nyata

Mary-Kate Olsen
Mary-Kate Olsenasal
2024-11-05 17:41:031118semak imbas

A Beginner’s Guide to Kafka with Python: Real-Time Data Processing and Applications

Pengenalan kepada Kafka

  • Kafka ialah platform penstriman acara teragih sumber terbuka yang dibangunkan oleh Apache.
  • Asalnya dicipta oleh LinkedIn, ia telah direka untuk mengendalikan daya pemprosesan yang tinggi, toleran terhadap kesalahan dan penstriman data masa nyata.
  • Kafka membenarkan sistem untuk menerbitkan dan melanggan aliran rekod (mesej), memprosesnya dan menyimpannya dengan cekap.

Mengapa Kafka Digunakan?

  • Pusat Tinggi: Kafka boleh mengendalikan berjuta-juta mesej sesaat.
  • Toleransi Kesalahan: Kafka diedarkan, bermakna ia boleh mereplikasi data merentas berbilang nod untuk memastikan kebolehpercayaan.
  • Ketahanan: Kafka mengekalkan data ke cakera dan boleh memainkan semula mesej, memastikan kebolehpercayaan dalam penghantaran mesej.
  • Pemprosesan Masa Nyata: Kafka boleh memproses aliran data dalam masa nyata, sesuai untuk aplikasi seperti pemantauan, analitik atau sistem dipacu peristiwa.
  • Skalabiliti: Kafka boleh membuat skala dengan mudah dengan menambahkan lebih banyak broker untuk mengendalikan volum data yang besar.
  • Sistem Penyahgandingan: Kafka bertindak sebagai lapisan tengah untuk pemesejan, membenarkan sistem yang berbeza berkomunikasi secara tidak segerak.

Seni Bina Kafka

A Beginner’s Guide to Kafka with Python: Real-Time Data Processing and Applications

Komponen:

Pengeluar:
Ini ialah aplikasi atau perkhidmatan yang menghantar data/mesej kepada Kafka. Pengeluar menolak mesej ke Topik tertentu dalam Kafka.

Topik:
Topik ialah kategori atau nama suapan yang mana rekod diterbitkan. Topik dibahagikan untuk membolehkan kebolehskalaan dan keselarian.

Pecahan:

  • Setiap Topik dibahagikan kepada satu atau lebih Partition.
  • Partition membolehkan Kafka mengendalikan lebih banyak mesej dan sokongan pemprosesan selari.
  • Setiap Partition mempunyai ID unik dan boleh menyimpan subset daripada data topik.

Broker:

  • Kafka berjalan sebagai kelompok Broker (pelayan), setiap satu mengendalikan data untuk berbilang topik dan sekatan.
  • Broker menyimpan dan mengurus partition, mengendalikan baca dan tulis permintaan daripada Pengeluar dan Pengguna.
  • Setiap Broker dikenal pasti melalui ID unik.

Pengguna:

Pengguna ialah aplikasi atau perkhidmatan yang membaca mesej daripada topik.
Pengguna melanggan topik, mengambil data daripada broker Kafka.

Kumpulan Pengguna:

  • Pengguna disusun dalam Kumpulan Pengguna.
  • Setiap mesej dalam partition dihantar kepada hanya seorang pengguna dalam kumpulan, yang membolehkan pengimbangan beban merentas berbilang pengguna.

Penjaga Zoo:

  • ZooKeeper mengurus dan menyelaras broker Kafka, menjejaki broker, topik dan partition.
  • Ia membantu menguruskan pemilihan pemimpin untuk partition dan memantau kesihatan kelompok.

Kes Penggunaan Kafka

  • Analitis Masa Nyata: Syarikat menggunakan Kafka untuk memproses dan menganalisis aliran data dalam masa nyata untuk sistem pemantauan, seperti analisis transaksi kewangan.
  • Pengagregatan Log: Kafka menyatukan log daripada berbilang perkhidmatan atau aplikasi untuk pemprosesan, makluman atau penyimpanan.
  • Saluran Paip Data: Kafka digunakan sebagai tulang belakang untuk memindahkan sejumlah besar data antara sistem atau perkhidmatan yang berbeza (talian paip ETL).
  • Aplikasi IoT: Kafka boleh mengendalikan aliran data daripada penderia IoT, membenarkan analisis dan respons masa nyata.
  • Komunikasi Perkhidmatan Mikro: Kafka berfungsi sebagai platform pemesejan yang boleh dipercayai untuk seni bina perkhidmatan mikro, membolehkan komunikasi tak segerak, dipisahkan.
  • Penjejakan Kenderaan Masa Nyata: Contoh berikut menggambarkan cara Kafka digunakan untuk menjejak kenderaan dalam masa nyata.

Contoh menggunakan Python untuk menunjukkan cara Kafka boleh digunakan dalam senario masa nyata :

Penjejakan lokasi untuk apl perkongsian perjalanan.

Untuk memudahkan, kami akan menggunakan perpustakaan kafka-python untuk mencipta kedua-dua pengeluar (untuk mensimulasikan pemandu menghantar kemas kini lokasi) dan pengguna (untuk mensimulasikan perkhidmatan yang memproses kemas kini lokasi ini).

1. Sediakan Kafka
Pastikan anda menjalankan Kafka secara setempat atau gunakan pembekal awan. Anda boleh memuat turun dan menjalankan Kafka secara tempatan dengan mengikuti Panduan Mula Pantas Kafka.

2. Pasang Perpustakaan Python Kafka
Anda boleh memasang perpustakaan Kafka Python menggunakan pip:

pip install kafka-python

3. Pengeluar Python Kafka (Simulasi Kemas Kini Lokasi Pemacu)
Pengeluar mensimulasikan pemandu yang menghantar kemas kini lokasi kepada topik Kafka (lokasi pemandu).

from kafka import KafkaProducer
import json
import time
import random

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data to JSON
)

def send_location_updates(driver_id):
    while True:
        # Simulating random GPS coordinates (latitude, longitude)
        location = {
            "driver_id": driver_id,
            "latitude": round(random.uniform(40.0, 41.0), 6),
            "longitude": round(random.uniform(-74.0, -73.0), 6),
            "timestamp": time.time()
        }
        # Send location data to Kafka
        producer.send('driver-location', location)
        print(f"Sent: {location}")
        time.sleep(5)  # Sleep for 5 seconds to simulate real-time updates

# Start sending updates for driver_id = 101
send_location_updates(driver_id=101)

4. Pengguna Python Kafka (Simulating Ride Matching Service)
Pengguna membaca kemas kini lokasi daripada topik lokasi pemacu dan memprosesnya.

from kafka import KafkaConsumer
import json

# Kafka Consumer
consumer = KafkaConsumer(
    'driver-location',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',  # Start from the earliest message
    enable_auto_commit=True,
    group_id='location-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))  # Deserialize data from JSON
)

def process_location_updates():
    print("Waiting for location updates...")
    for message in consumer:
        location = message.value
        driver_id = location['driver_id']
        latitude = location['latitude']
        longitude = location['longitude']
        timestamp = location['timestamp']
        print(f"Received location update for Driver {driver_id}: ({latitude}, {longitude}) at {timestamp}")

# Start consuming location updates
process_location_updates()

Penjelasan:

Pengeluar (Pemandu menghantar kemas kini lokasi):

  • Pengeluar menghantar objek JSON ke lokasi pemandu topik Kafka dengan medan seperti driver_id, latitud, longitud dan cap waktu.
  • Pengeluar mensimulasikan kemas kini GPS masa nyata dengan menghantar data lokasi setiap 5 saat.

Pengguna (Perkhidmatan padanan tunggangan):

  • Pengguna melanggan topik lokasi pemandu, mendengar kemas kini.
  • Setiap kali kemas kini lokasi diterbitkan kepada Kafka, pengguna memproses dan mencetaknya, mensimulasikan perkhidmatan yang menggunakan data ini untuk memadankan pemandu dan penunggang.

Menjalankan Contoh (saya menggunakan mesin windows):

  1. Mulakan Penjaga Zoo
pip install kafka-python
  1. Mulakan pelayan Kafka tempatan anda.
from kafka import KafkaProducer
import json
import time
import random

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data to JSON
)

def send_location_updates(driver_id):
    while True:
        # Simulating random GPS coordinates (latitude, longitude)
        location = {
            "driver_id": driver_id,
            "latitude": round(random.uniform(40.0, 41.0), 6),
            "longitude": round(random.uniform(-74.0, -73.0), 6),
            "timestamp": time.time()
        }
        # Send location data to Kafka
        producer.send('driver-location', location)
        print(f"Sent: {location}")
        time.sleep(5)  # Sleep for 5 seconds to simulate real-time updates

# Start sending updates for driver_id = 101
send_location_updates(driver_id=101)

Sekarang Jalankan pengeluar dan Pengguna dalam 2 tetingkap terminal berasingan menggunakan python.

  1. Jalankan skrip pengeluar untuk mensimulasikan pemandu menghantar kemas kini lokasi.

  2. Jalankan skrip pengguna untuk melihat perkhidmatan pemadanan perjalanan yang memproses kemas kini lokasi dalam masa nyata.

Kesimpulan
Apache Kafka menyediakan platform yang luar biasa untuk mengurus aliran data masa nyata. Dengan menggabungkan Kafka dengan Python, pembangun boleh membina saluran paip data yang berkuasa dan penyelesaian analitik masa nyata.

Sama ada penjejakan kenderaan, data IoT atau papan pemuka masa nyata, Kafka dengan Python sangat berskala dan boleh disesuaikan dengan pelbagai kes penggunaan. Jadi, mula bereksperimen dengan Kafka, dan anda akan kagum dengan potensinya dalam aplikasi dunia sebenar!

Atas ialah kandungan terperinci Panduan Pemula untuk Kafka dengan Python: Pemprosesan dan Aplikasi Data Masa Nyata. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn