首頁 >後端開發 >Python教學 >使用 Python 的 Kafka 初學者指南:即時資料處理和應用程式

使用 Python 的 Kafka 初學者指南:即時資料處理和應用程式

Mary-Kate Olsen
Mary-Kate Olsen原創
2024-11-05 17:41:031042瀏覽

A Beginner’s Guide to Kafka with Python: Real-Time Data Processing and Applications

卡夫卡簡介

  • Kafka 是一個由 Apache 開發的開源分散式事件流平台。
  • 最初由 LinkedIn 創建,旨在處理高吞吐量、容錯和即時資料流。
  • Kafka 允許系統發布和訂閱記錄流(訊息)、處理它們並有效地儲存它們。

為什麼要使用Kafka?

  • 高吞吐量:Kafka 每秒可以處理數百萬個訊息。
  • 容錯性:Kafka是分散式的,這意味著它可以跨多個節點複製資料以確保可靠性。
  • 持久性:Kafka將資料持久保存到磁碟並可以重播訊息,確保訊息傳遞的可靠性。
  • 即時處理:Kafka 可以即時處理資料流,非常適合監控、分析或事件驅動系統等應用程式。
  • 可擴充性:Kafka 可以透過增加更多代理來處理大量資料來輕鬆擴展。
  • 系統解耦:Kafka充當訊息傳遞的中間層,允許不同系統非同步通訊。

卡夫卡建築

A Beginner’s Guide to Kafka with Python: Real-Time Data Processing and Applications

組件:

製作人:
這些是將資料/訊息發送到 Kafka 的應用程式或服務。生產者將訊息推送到 Kafka 中的特定主題。

主題:
主題是發布記錄的類別或提要名稱。主題被分區以允許可擴展性和並行性。

分區:

  • 每個主題分為一個或多個分區。
  • 分區使Kafka能夠處理更多訊息並支持 並行處理。
  • 每個分割區都有一個唯一的ID,並且可以儲存分割區的子集 主題的數據。

經紀人:

  • Kafka 作為 Broker(伺服器)叢集運行,每個 Broker 處理數據 對於多個主題和分區。
  • 代理程式儲存與管理分區,處理讀寫 來自生產者和消費者的請求。
  • 每個經紀商都由唯一的 ID 來識別。

消費者:

消費者是從主題讀取訊息的應用程式或服務。
消費者訂閱主題,從 Kafka 代理中提取資料。

消費群:

  • 消費者被組織成消費者群。
  • 分區內的每個訊息僅傳遞給群組內的一個消費者,這可以實現多個消費者之間的負載平衡。

動物園管理員:

  • ZooKeeper 管理和協調 Kafka 代理,追蹤代理、主題和分區。
  • 它有助於管理分區的領導者選舉並監控群聚健康狀況。

Kafka 用例

  • 即時分析:公司使用 Kafka 即時處理和分析資料流,以用於監控系統,例如金融交易分析。
  • 日誌聚合:Kafka 整合來自多個服務或應用程式的日誌,以進行處理、警報或儲存。
  • 資料管道:Kafka 用作在不同系統或服務(ETL 管道)之間傳輸大量資料的骨幹。
  • 物聯網應用:Kafka 可以處理來自物聯網感測器的資料流,從而實現即時分析和回應。
  • 微服務通訊:Kafka 作為微服務架構的可靠訊息傳遞平台,支援非同步、解耦通訊。
  • 即時車輛追蹤:以下範例說明如何使用 Kafka 即時追蹤車輛。

使用 Python 示範如何在即時場景中使用 Kafka 的範例 :

共乘應用程式的位置追蹤。

為了簡單起見,我們將使用 kafka-python 庫建立一個生產者(以模擬發送位置更新的驅動程式)和一個消費者(以模擬處理這些位置更新的服務)。

1。設定 Kafka
確保您在本地運行 Kafka 或使用雲端提供者。您可以按照 Kafka 快速入門指南在本機下載並執行 Kafka。

2。安裝 Kafka Python 函式庫
您可以使用 pip 安裝 Kafka Python 函式庫:

pip install kafka-python

3。 Python Kafka Producer(模擬驅動程式位置更新)
生產者模擬驅動程式向 Kafka 主題發送位置更新(驅動程式位置)。

from kafka import KafkaProducer
import json
import time
import random

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data to JSON
)

def send_location_updates(driver_id):
    while True:
        # Simulating random GPS coordinates (latitude, longitude)
        location = {
            "driver_id": driver_id,
            "latitude": round(random.uniform(40.0, 41.0), 6),
            "longitude": round(random.uniform(-74.0, -73.0), 6),
            "timestamp": time.time()
        }
        # Send location data to Kafka
        producer.send('driver-location', location)
        print(f"Sent: {location}")
        time.sleep(5)  # Sleep for 5 seconds to simulate real-time updates

# Start sending updates for driver_id = 101
send_location_updates(driver_id=101)

4。 Python Kafka Consumer(模擬乘車匹配服務)
消費者從司機位置主題讀取位置更新並處理它們。

from kafka import KafkaConsumer
import json

# Kafka Consumer
consumer = KafkaConsumer(
    'driver-location',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',  # Start from the earliest message
    enable_auto_commit=True,
    group_id='location-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))  # Deserialize data from JSON
)

def process_location_updates():
    print("Waiting for location updates...")
    for message in consumer:
        location = message.value
        driver_id = location['driver_id']
        latitude = location['latitude']
        longitude = location['longitude']
        timestamp = location['timestamp']
        print(f"Received location update for Driver {driver_id}: ({latitude}, {longitude}) at {timestamp}")

# Start consuming location updates
process_location_updates()

說明:

生產者(發送位置更新的司機):

  • 生產者將一個 JSON 物件傳送到 Kafka 主題 driver-location,其中包含 driver_id、緯度、經度和時間戳等欄位。
  • 生產者透過每 5 秒發送一次位置資料來模擬即時 GPS 更新。

消費者(共乘服務):

  • 消費者訂閱駕駛位置主題,監聽更新。
  • 每次將位置更新發佈到 Kafka 時,消費者都會處理並列印它,模擬使用此數據來匹配司機和乘客的服務。

運行範例(我在 Windows 機器上運行):

  1. 啟動 Zookeeper
pip install kafka-python
  1. 啟動本機 Kafka 伺服器。
from kafka import KafkaProducer
import json
import time
import random

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data to JSON
)

def send_location_updates(driver_id):
    while True:
        # Simulating random GPS coordinates (latitude, longitude)
        location = {
            "driver_id": driver_id,
            "latitude": round(random.uniform(40.0, 41.0), 6),
            "longitude": round(random.uniform(-74.0, -73.0), 6),
            "timestamp": time.time()
        }
        # Send location data to Kafka
        producer.send('driver-location', location)
        print(f"Sent: {location}")
        time.sleep(5)  # Sleep for 5 seconds to simulate real-time updates

# Start sending updates for driver_id = 101
send_location_updates(driver_id=101)

現在使用 python 在 2 個單獨的終端視窗中運行生產者和消費者。

  1. 運行生產者腳本來模擬驅動程式發送位置更新。

  2. 執行消費者腳本以查看乘車匹配服務即時處理位置更新。

結論
Apache Kafka 提供了一個用於管理即時資料流的卓越平台。透過將 Kafka 與 Python 結合,開發人員可以建立強大的資料管道和即時分析解決方案。

無論是車輛追蹤、物聯網資料或即時儀表板,Kafka with Python 都具有高度可擴充性,可以適應各種用例。因此,開始嘗試 Kafka,您將對其在實際應用中的潛力感到驚訝!

以上是使用 Python 的 Kafka 初學者指南:即時資料處理和應用程式的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn