卡夫卡简介
为什么使用Kafka?
卡夫卡建筑
组件:
制作人:
这些是将数据/消息发送到 Kafka 的应用程序或服务。生产者将消息推送到 Kafka 中的特定主题。
主题:
主题是发布记录的类别或提要名称。主题被分区以允许可扩展性和并行性。
分区:
经纪人:
消费者:
消费者是从主题读取消息的应用程序或服务。
消费者订阅主题,从 Kafka 代理中提取数据。
消费群体:
动物园管理员:
Kafka 用例
使用 Python 演示如何在实时场景中使用 Kafka 的示例 :
拼车应用程序的位置跟踪。
为了简单起见,我们将使用 kafka-python 库创建一个生产者(以模拟发送位置更新的驱动程序)和一个消费者(以模拟处理这些位置更新的服务)。
1。设置 Kafka
确保您在本地运行 Kafka 或使用云提供商。您可以按照 Kafka 快速入门指南在本地下载并运行 Kafka。
2。安装 Kafka Python 库
您可以使用 pip 安装 Kafka Python 库:
pip install kafka-python
3。 Python Kafka Producer(模拟驱动程序位置更新)
生产者模拟驱动程序向 Kafka 主题发送位置更新(驱动程序位置)。
from kafka import KafkaProducer import json import time import random # Kafka Producer producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') # Serialize data to JSON ) def send_location_updates(driver_id): while True: # Simulating random GPS coordinates (latitude, longitude) location = { "driver_id": driver_id, "latitude": round(random.uniform(40.0, 41.0), 6), "longitude": round(random.uniform(-74.0, -73.0), 6), "timestamp": time.time() } # Send location data to Kafka producer.send('driver-location', location) print(f"Sent: {location}") time.sleep(5) # Sleep for 5 seconds to simulate real-time updates # Start sending updates for driver_id = 101 send_location_updates(driver_id=101)
4。 Python Kafka Consumer(模拟乘车匹配服务)
消费者从司机位置主题读取位置更新并处理它们。
from kafka import KafkaConsumer import json # Kafka Consumer consumer = KafkaConsumer( 'driver-location', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', # Start from the earliest message enable_auto_commit=True, group_id='location-group', value_deserializer=lambda x: json.loads(x.decode('utf-8')) # Deserialize data from JSON ) def process_location_updates(): print("Waiting for location updates...") for message in consumer: location = message.value driver_id = location['driver_id'] latitude = location['latitude'] longitude = location['longitude'] timestamp = location['timestamp'] print(f"Received location update for Driver {driver_id}: ({latitude}, {longitude}) at {timestamp}") # Start consuming location updates process_location_updates()
说明:
生产者(发送位置更新的司机):
消费者(拼车服务):
运行示例(我在 Windows 机器上运行):
pip install kafka-python
from kafka import KafkaProducer import json import time import random # Kafka Producer producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') # Serialize data to JSON ) def send_location_updates(driver_id): while True: # Simulating random GPS coordinates (latitude, longitude) location = { "driver_id": driver_id, "latitude": round(random.uniform(40.0, 41.0), 6), "longitude": round(random.uniform(-74.0, -73.0), 6), "timestamp": time.time() } # Send location data to Kafka producer.send('driver-location', location) print(f"Sent: {location}") time.sleep(5) # Sleep for 5 seconds to simulate real-time updates # Start sending updates for driver_id = 101 send_location_updates(driver_id=101)
现在使用 python 在 2 个单独的终端窗口中运行生产者和消费者。
运行生产者脚本来模拟驱动程序发送位置更新。
运行消费者脚本以查看乘车匹配服务实时处理位置更新。
结论
Apache Kafka 提供了一个用于管理实时数据流的卓越平台。通过将 Kafka 与 Python 相结合,开发人员可以构建强大的数据管道和实时分析解决方案。
无论是车辆跟踪、物联网数据还是实时仪表板,Kafka with Python 都具有高度可扩展性,可以适应各种用例。因此,开始尝试 Kafka,您将会对其在实际应用中的潜力感到惊讶!
以上是使用 Python 的 Kafka 初学者指南:实时数据处理和应用程序的详细内容。更多信息请关注PHP中文网其他相关文章!