首页  >  文章  >  后端开发  >  使用 Python 的 Kafka 初学者指南:实时数据处理和应用程序

使用 Python 的 Kafka 初学者指南:实时数据处理和应用程序

Mary-Kate Olsen
Mary-Kate Olsen原创
2024-11-05 17:41:03956浏览

A Beginner’s Guide to Kafka with Python: Real-Time Data Processing and Applications

卡夫卡简介

  • Kafka 是一个由 Apache 开发的开源分布式事件流平台。
  • 最初由 LinkedIn 创建,旨在处理高吞吐量、容错和实时数据流。
  • Kafka 允许系统发布和订阅记录流(消息)、处理它们并有效地存储它们。

为什么使用Kafka?

  • 高吞吐量:Kafka 每秒可以处理数百万条消息。
  • 容错性:Kafka是分布式的,这意味着它可以跨多个节点复制数据以确保可靠性。
  • 持久性:Kafka将数据持久保存到磁盘并可以重放消息,确保消息传递的可靠性。
  • 实时处理:Kafka 可以实时处理数据流,非常适合监控、分析或事件驱动系统等应用程序。
  • 可扩展性:Kafka 可以通过添加更多代理来处理大量数据来轻松扩展。
  • 系统解耦:Kafka充当消息传递的中间层,允许不同系统异步通信。

卡夫卡建筑

A Beginner’s Guide to Kafka with Python: Real-Time Data Processing and Applications

组件:

制作人:
这些是将数据/消息发送到 Kafka 的应用程序或服务。生产者将消息推送到 Kafka 中的特定主题。

主题:
主题是发布记录的类别或提要名称。主题被分区以允许可扩展性和并行性。

分区:

  • 每个主题分为一个或多个分区。
  • 分区使Kafka能够处理更多消息并支持 并行处理。
  • 每个分区都有一个唯一的ID,并且可以存储分区的子集 主题的数据。

经纪人:

  • Kafka 作为 Broker(服务器)集群运行,每个 Broker 处理数据 对于多个主题和分区。
  • 代理存储和管理分区,处理读写 来自生产者和消费者的请求。
  • 每个经纪商都由唯一的 ID 标识。

消费者:

消费者是从主题读取消息的应用程序或服务。
消费者订阅主题,从 Kafka 代理中提取数据。

消费群体:

  • 消费者被组织成消费者组。
  • 分区内的每条消息仅传递给组内的一个消费者,这可以实现多个消费者之间的负载平衡。

动物园管理员:

  • ZooKeeper 管理和协调 Kafka 代理,跟踪代理、主题和分区。
  • 它有助于管理分区的领导者选举并监控集群健康状况。

Kafka 用例

  • 实时分析:公司使用 Kafka 实时处理和分析数据流,以用于监控系统,例如金融交易分析。
  • 日志聚合:Kafka 整合来自多个服务或应用程序的日志,以进行处理、警报或存储。
  • 数据管道:Kafka 用作在不同系统或服务(ETL 管道)之间传输大量数据的骨干。
  • 物联网应用:Kafka 可以处理来自物联网传感器的数据流,从而实现实时分析和响应。
  • 微服务通信:Kafka 作为微服务架构的可靠消息传递平台,支持异步、解耦通信。
  • 实时车辆跟踪:以下示例说明了如何使用 Kafka 实时跟踪车辆。

使用 Python 演示如何在实时场景中使用 Kafka 的示例 :

拼车应用程序的位置跟踪。

为了简单起见,我们将使用 kafka-python 库创建一个生产者(以模拟发送位置更新的驱动程序)和一个消费者(以模拟处理这些位置更新的服务)。

1。设置 Kafka
确保您在本地运行 Kafka 或使用云提供商。您可以按照 Kafka 快速入门指南在本地下载并运行 Kafka。

2。安装 Kafka Python 库
您可以使用 pip 安装 Kafka Python 库:

pip install kafka-python

3。 Python Kafka Producer(模拟驱动程序位置更新)
生产者模拟驱动程序向 Kafka 主题发送位置更新(驱动程序位置)。

from kafka import KafkaProducer
import json
import time
import random

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data to JSON
)

def send_location_updates(driver_id):
    while True:
        # Simulating random GPS coordinates (latitude, longitude)
        location = {
            "driver_id": driver_id,
            "latitude": round(random.uniform(40.0, 41.0), 6),
            "longitude": round(random.uniform(-74.0, -73.0), 6),
            "timestamp": time.time()
        }
        # Send location data to Kafka
        producer.send('driver-location', location)
        print(f"Sent: {location}")
        time.sleep(5)  # Sleep for 5 seconds to simulate real-time updates

# Start sending updates for driver_id = 101
send_location_updates(driver_id=101)

4。 Python Kafka Consumer(模拟乘车匹配服务)
消费者从司机位置主题读取位置更新并处理它们。

from kafka import KafkaConsumer
import json

# Kafka Consumer
consumer = KafkaConsumer(
    'driver-location',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',  # Start from the earliest message
    enable_auto_commit=True,
    group_id='location-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))  # Deserialize data from JSON
)

def process_location_updates():
    print("Waiting for location updates...")
    for message in consumer:
        location = message.value
        driver_id = location['driver_id']
        latitude = location['latitude']
        longitude = location['longitude']
        timestamp = location['timestamp']
        print(f"Received location update for Driver {driver_id}: ({latitude}, {longitude}) at {timestamp}")

# Start consuming location updates
process_location_updates()

说明:

生产者(发送位置更新的司机):

  • 生产者将一个 JSON 对象发送到 Kafka 主题 driver-location,其中包含 driver_id、纬度、经度和时间戳等字段。
  • 生产者通过每 5 秒发送一次位置数据来模拟实时 GPS 更新。

消费者(拼车服务):

  • 消费者订阅驾驶员位置主题,监听更新。
  • 每次将位置更新发布到 Kafka 时,消费者都会处理并打印它,模拟使用此数据来匹配司机和乘客的服务。

运行示例(我在 Windows 机器上运行):

  1. 启动 Zookeeper
pip install kafka-python
  1. 启动本地 Kafka 服务器。
from kafka import KafkaProducer
import json
import time
import random

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data to JSON
)

def send_location_updates(driver_id):
    while True:
        # Simulating random GPS coordinates (latitude, longitude)
        location = {
            "driver_id": driver_id,
            "latitude": round(random.uniform(40.0, 41.0), 6),
            "longitude": round(random.uniform(-74.0, -73.0), 6),
            "timestamp": time.time()
        }
        # Send location data to Kafka
        producer.send('driver-location', location)
        print(f"Sent: {location}")
        time.sleep(5)  # Sleep for 5 seconds to simulate real-time updates

# Start sending updates for driver_id = 101
send_location_updates(driver_id=101)

现在使用 python 在 2 个单独的终端窗口中运行生产者和消费者。

  1. 运行生产者脚本来模拟驱动程序发送位置更新。

  2. 运行消费者脚本以查看乘车匹配服务实时处理位置更新。

结论
Apache Kafka 提供了一个用于管理实时数据流的卓越平台。通过将 Kafka 与 Python 相结合,开发人员可以构建强大的数据管道和实时分析解决方案。

无论是车辆跟踪、物联网数据还是实时仪表板,Kafka with Python 都具有高度可扩展性,可以适应各种用例。因此,开始尝试 Kafka,您将会对其在实际应用中的潜力感到惊讶!

以上是使用 Python 的 Kafka 初学者指南:实时数据处理和应用程序的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn