카프카 소개
카프카를 사용하는 이유는 무엇인가요?
카프카 아키텍처
구성품:
제작자:
Kafka에 데이터/메시지를 보내는 애플리케이션 또는 서비스입니다. 생산자는 Kafka 내의 특정 주제에 메시지를 푸시합니다.
주제:
주제는 레코드가 게시되는 카테고리 또는 피드 이름입니다. 확장성과 병렬 처리가 가능하도록 주제가 분할되어 있습니다.
파티션:
브로커:
소비자:
소비자는 주제의 메시지를 읽는 애플리케이션 또는 서비스입니다.
소비자는 주제를 구독하고 Kafka 브로커에서 데이터를 가져옵니다.
소비자 그룹:
동물원 관리인:
Kafka 활용 사례
Python을 사용하여 실시간 시나리오에서 Kafka를 사용하는 방법을 보여주는 예:
차량 공유 앱의 위치 추적
간단하게 하기 위해 kafka-python 라이브러리를 사용하여 생산자(위치 업데이트를 보내는 운전자를 시뮬레이션하기 위해)와 소비자(이러한 위치 업데이트를 처리하는 서비스를 시뮬레이션하기 위해)를 모두 생성하겠습니다.
1. 카프카 설정
Kafka가 로컬에서 실행되고 있는지 확인하거나 클라우드 공급자를 사용하세요. Kafka 빠른 시작 가이드에 따라 Kafka를 로컬로 다운로드하고 실행할 수 있습니다.
2. Kafka Python 라이브러리 설치
pip를 사용하여 Kafka Python 라이브러리를 설치할 수 있습니다.
pip install kafka-python
3. Python Kafka Producer(드라이버 위치 업데이트 시뮬레이션)
생산자는 Kafka 주제(driver-location)에 위치 업데이트를 보내는 드라이버를 시뮬레이션합니다.
from kafka import KafkaProducer import json import time import random # Kafka Producer producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') # Serialize data to JSON ) def send_location_updates(driver_id): while True: # Simulating random GPS coordinates (latitude, longitude) location = { "driver_id": driver_id, "latitude": round(random.uniform(40.0, 41.0), 6), "longitude": round(random.uniform(-74.0, -73.0), 6), "timestamp": time.time() } # Send location data to Kafka producer.send('driver-location', location) print(f"Sent: {location}") time.sleep(5) # Sleep for 5 seconds to simulate real-time updates # Start sending updates for driver_id = 101 send_location_updates(driver_id=101)
4. Python Kafka Consumer(승차 매칭 서비스 시뮬레이션)
소비자는 운전자 위치 주제에서 위치 업데이트를 읽고 이를 처리합니다.
from kafka import KafkaConsumer import json # Kafka Consumer consumer = KafkaConsumer( 'driver-location', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', # Start from the earliest message enable_auto_commit=True, group_id='location-group', value_deserializer=lambda x: json.loads(x.decode('utf-8')) # Deserialize data from JSON ) def process_location_updates(): print("Waiting for location updates...") for message in consumer: location = message.value driver_id = location['driver_id'] latitude = location['latitude'] longitude = location['longitude'] timestamp = location['timestamp'] print(f"Received location update for Driver {driver_id}: ({latitude}, {longitude}) at {timestamp}") # Start consuming location updates process_location_updates()
설명:
제작자(운전기사가 위치 업데이트를 전송함):
소비자(라이드 매칭 서비스):
예제 실행(Windows 컴퓨터에서 실행 중):
pip install kafka-python
from kafka import KafkaProducer import json import time import random # Kafka Producer producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') # Serialize data to JSON ) def send_location_updates(driver_id): while True: # Simulating random GPS coordinates (latitude, longitude) location = { "driver_id": driver_id, "latitude": round(random.uniform(40.0, 41.0), 6), "longitude": round(random.uniform(-74.0, -73.0), 6), "timestamp": time.time() } # Send location data to Kafka producer.send('driver-location', location) print(f"Sent: {location}") time.sleep(5) # Sleep for 5 seconds to simulate real-time updates # Start sending updates for driver_id = 101 send_location_updates(driver_id=101)
이제 Python을 사용하여 2개의 별도 터미널 창에서 생산자와 소비자를 실행합니다.
생산자 스크립트를 실행하여 위치 업데이트를 보내는 운전자를 시뮬레이션합니다.
소비자 스크립트를 실행하여 차량 매칭 서비스가 실시간으로 위치 업데이트를 처리하는 모습을 확인하세요.
결론
Apache Kafka는 실시간 데이터 스트림 관리를 위한 뛰어난 플랫폼을 제공합니다. Kafka와 Python을 결합하면 개발자는 강력한 데이터 파이프라인과 실시간 분석 솔루션을 구축할 수 있습니다.
차량 추적, IoT 데이터, 실시간 대시보드 등 Kafka with Python은 확장성이 뛰어나며 다양한 사용 사례에 맞게 조정할 수 있습니다. 따라서 Kafka를 실험해 보면 실제 애플리케이션에서의 잠재력에 놀라게 될 것입니다!
위 내용은 Python을 사용한 Kafka 초보자 가이드: 실시간 데이터 처리 및 애플리케이션의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!