찾다
백엔드 개발파이썬 튜토리얼Spark Structured Streaming 및 Apache Kafka를 사용한 실시간 항공 교통 데이터 분석

현재 우리는 매초 페타바이트의 데이터가 생성되는 세상에 살고 있습니다. 따라서 이 데이터를 실시간으로 분석하고 처리하는 것은 데이터가 생성되고 더 많은 데이터가 생성됨에 따라 보다 정확하게 비즈니스 통찰력을 생성하려는 기업에게 매우 중요합니다.

오늘은 Spark Structured Streaming과 Apache Kafka를 활용하여 가상의 항공 교통 데이터를 기반으로 실시간 데이터 분석을 개발해 보겠습니다. 이러한 기술이 무엇인지 모르신다면 제가 해당 기술을 더 자세히 소개한 기사와 이 기사 전체에서 다룰 다른 개념을 읽어 보시기 바랍니다. 그럼 잊지 말고 꼭 확인해보세요.

제 GitHub에서 전체 프로젝트를 확인하실 수 있습니다.

건축학

글쎄요, 데이터 엔지니어인 당신이 SkyX라는 항공사에서 일하고 있다고 상상해 보세요. 그곳에서 항공 교통에 대한 데이터는 매초 생성됩니다.

해외에서 가장 많이 방문한 도시 순위 등 이러한 항공편의 실시간 데이터를 표시하는 대시보드를 개발해 달라는 요청을 받았습니다. 대부분의 사람들이 떠나는 도시; 그리고 전 세계에서 가장 많은 사람을 수송하는 항공기입니다.

각 항공편에서 생성되는 데이터는 다음과 같습니다.

  • aircraft_name: 항공기 이름입니다. SkyX에는 5대의 항공기만 이용 가능합니다.
  • 출발지: 항공기가 출발하는 도시입니다. SkyX는 전 세계 5개 도시 간 항공편만 운항합니다.
  • 도착지: 항공기 목적지 도시. 앞서 언급했듯이 SkyX는 전 세계 5개 도시 간 항공편만 운항합니다.
  • 승객: 항공기가 수송하는 승객 수입니다. 모든 SkyX 항공기는 각 항공편당 50~100명을 수송합니다.

다음은 우리 프로젝트의 기본 아키텍처입니다.

  • 생산자: 항공기 항공 교통 데이터를 생성하여 Apache Kafka 주제로 전송하는 역할을 담당합니다.
  • 소비자: Apache Kafka 주제에 실시간으로 도착하는 데이터만 관찰합니다.
  • 데이터 분석: Apache Kafka 주제에 도달하는 데이터를 실시간으로 처리하고 분석하는 3개의 대시보드입니다. 가장 많은 관광객이 방문하는 도시를 분석합니다. 대부분의 사람들이 다른 도시를 방문하기 위해 떠나는 도시 분석; 전 세계 도시 간 가장 많은 사람을 수송하는 SkyX 항공기 분석

개발 환경 준비

이 튜토리얼에서는 컴퓨터에 PySpark가 이미 설치되어 있다고 가정합니다. 아직 확인하지 않았다면 설명서 자체의 단계를 확인하세요.

Apache Kafka는 Docker??를 통한 컨테이너화로 사용하겠습니다.

그리고 마지막으로 가상환경을 통해 Python을 사용해보겠습니다.

Docker를 통한 컨테이너화를 통한 Apache Kafka

더 이상 고민하지 말고 skyx라는 폴더를 만들고 그 안에 docker-compose.yml 파일을 추가하세요.

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

이제 docker-compose 파일에 다음 콘텐츠를 추가하세요.

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

완료! 이제 Kafka 서버를 업로드할 수 있습니다. 이렇게 하려면 터미널에 다음 명령을 입력하세요.

$ docker compose up -d
$ docker compose ps
NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp

참고: 이 튜토리얼에서는 Docker Compose 버전 2.0을 사용합니다. 이것이 dockercompose 사이에 "-"가 없는 이유입니다. ☺.

이제 생산자가 실시간으로 보낸 데이터를 저장할 Kafka 내 주제를 생성해야 합니다. 이를 위해 컨테이너 내부의 Kafka에 액세스해 보겠습니다.

$ docker compose exec kafka bash

마지막으로 airtraffic이라는 주제를 만듭니다.

$ kafka-topics --create --topic airtraffic --bootstrap-server localhost:29092

항공교통 주제를 만들었습니다.

가상 환경 생성

실시간 항공 교통 데이터를 Kafka 주제로 전송하는 프로듀서, 즉 애플리케이션을 개발하려면 kafka-python 라이브러리를 사용해야 합니다. kafka-python은 Apache Kafka와 통합되는 생산자와 소비자를 개발할 수 있는 커뮤니티 개발 라이브러리입니다.

먼저 requirements.txt라는 파일을 만들고 그 안에 다음 종속성을 추가해 보겠습니다.

카프카파이썬

두 번째로 가상 환경을 생성하고 요구사항.txt 파일에 종속성을 설치합니다.

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

완료! 이제 우리 환경은 개발 준비가 되었나요?.

생산자 개발

이제 프로듀서를 만들어 보겠습니다. 앞서 언급했듯이 제작자는 새로 생성된 Kafka 주제에 항공 교통 데이터를 전송하는 책임을 맡게 됩니다.

건축에서도 말했듯이 SkyX는 전 세계 5개 도시 사이만 비행하며, 이용 가능한 항공기는 5대뿐인가요? 각 항공기에는 50~100명의 승객이 탑승한다는 점은 언급할 가치가 있습니다.

데이터는 무작위로 생성되어 1~6초 사이의 시간 간격으로 json 형식으로 주제에 전송됩니다.

가자! src라는 하위 디렉터리와 kafka라는 다른 하위 디렉터리를 만듭니다. kafka 디렉터리 내에 airtraffic_producer.py라는 파일을 만들고 그 안에 다음 코드를 추가합니다.

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

완료! 우리는 생산자를 개발합니다. 실행하고 잠시 동안 실행해 보세요.

$ python airtraffic_producer.py

소비자 개발

이제 소비자를 개발해 보겠습니다. 이것은 매우 간단한 응용 프로그램이 될 것입니다. kafka 주제에 도착하는 데이터를 실시간으로 터미널에 표시합니다.

kafka 디렉터리 내에서 airtraffic_consumer.py라는 파일을 만들고 그 안에 다음 코드를 추가합니다.

$ docker compose up -d
$ docker compose ps

보세요, 아주 간단하다고 말씀드렸죠. 실행하여 제작자가 주제에 데이터를 보내면서 실시간으로 표시되는 데이터를 지켜보세요.

$ python airtraffic_consumer.py

데이터 분석: 관광객이 가장 많은 도시

이제 데이터 분석부터 시작하겠습니다. 이 시점에서 우리는 가장 많은 관광객을 유치하는 도시의 순위를 실시간으로 표시하는 대시보드, 애플리케이션을 개발할 것입니다. 즉, to 열을 기준으로 데이터를 그룹화하고 승객 열을 기준으로 합계를 계산합니다. 아주 간단해요!

이를 수행하려면 src 디렉터리 내에 dashboards라는 하위 디렉터리를 만들고 tourists_analytic.py라는 파일을 만듭니다. 그런 다음 그 안에 다음 코드를 추가하세요.

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

이제 Spark-Submit을 통해 파일을 실행할 수 있습니다. 하지만 진정하세요! PySpark를 Kafka와 통합할 때 스파크 제출을 다르게 실행해야 합니다. --packages.

매개변수를 통해 Apache Kafka 패키지와 Apache Spark의 현재 버전을 알려야 합니다.

Apache Spark를 Apache Kafka와 처음 통합하는 경우 Spark-submit을 실행하는 데 시간이 걸릴 수 있습니다. 필요한 패키지를 다운로드 받아야 하기 때문입니다.

데이터 분석을 실시간으로 볼 수 있도록 생산자가 여전히 실행 중인지 확인하세요. 대시보드 디렉터리 내에서 다음 명령을 실행합니다.

$ Spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 tourist_analytic.py

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

데이터 분석: 대부분의 사람들이 떠나는 도시

이번 분석은 이전 분석과 매우 유사합니다. 하지만, 관광객이 가장 많이 들어오는 도시를 실시간으로 분석하는 대신, 사람들이 가장 많이 떠나는 도시를 분석하겠습니다. 이렇게 하려면 leavers_analytic.py라는 파일을 만들고 그 안에 다음 코드를 추가하세요.

$ docker compose up -d
$ docker compose ps

데이터 분석을 실시간으로 볼 수 있도록 생산자가 여전히 실행 중인지 확인하세요. 대시보드 디렉터리 내에서 다음 명령을 실행합니다.

$ Spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 Leavers_analytic.py

NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp

데이터 분석: 가장 많은 승객을 태운 항공기

이번 분석은 이전 분석보다 훨씬 간단합니다. 전 세계 도시 간 가장 많은 승객을 수송하는 항공기를 실시간으로 분석해 보겠습니다. aircrafts_analytic.py라는 파일을 만들고 그 안에 다음 코드를 추가하세요.

$ python -m venv venv
$ venv\scripts\activate
$ pip install -r requirements.txt

데이터 분석을 실시간으로 볼 수 있도록 생산자가 여전히 실행 중인지 확인하세요. 대시보드 디렉터리 내에서 다음 명령을 실행합니다.

$ Spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 Aircrafts_analytic.py

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

최종 고려사항

여기서 마치겠습니다! 이 기사에서는 Spark Structured Streaming 및 Apache Kafka를 사용하여 가상의 항공 교통 데이터를 기반으로 실시간 데이터 분석을 개발합니다.

이를 위해 우리는 이 데이터를 Kafka 주제에 실시간으로 전송하는 생산자를 개발한 다음, 이 데이터를 실시간으로 분석하기 위해 3개의 대시보드를 개발했습니다.

좋아하셨으면 좋겠습니다. 다음에 또 만나요?.

위 내용은 Spark Structured Streaming 및 Apache Kafka를 사용한 실시간 항공 교통 데이터 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
Python vs. C : 응용 및 사용 사례가 비교되었습니다Python vs. C : 응용 및 사용 사례가 비교되었습니다Apr 12, 2025 am 12:01 AM

Python은 데이터 과학, 웹 개발 및 자동화 작업에 적합한 반면 C는 시스템 프로그래밍, 게임 개발 및 임베디드 시스템에 적합합니다. Python은 단순성과 강력한 생태계로 유명하며 C는 고성능 및 기본 제어 기능으로 유명합니다.

2 시간의 파이썬 계획 : 현실적인 접근2 시간의 파이썬 계획 : 현실적인 접근Apr 11, 2025 am 12:04 AM

2 시간 이내에 Python의 기본 프로그래밍 개념과 기술을 배울 수 있습니다. 1. 변수 및 데이터 유형을 배우기, 2. 마스터 제어 흐름 (조건부 명세서 및 루프), 3. 기능의 정의 및 사용을 이해하십시오. 4. 간단한 예제 및 코드 스 니펫을 통해 Python 프로그래밍을 신속하게 시작하십시오.

파이썬 : 기본 응용 프로그램 탐색파이썬 : 기본 응용 프로그램 탐색Apr 10, 2025 am 09:41 AM

Python은 웹 개발, 데이터 과학, 기계 학습, 자동화 및 스크립팅 분야에서 널리 사용됩니다. 1) 웹 개발에서 Django 및 Flask 프레임 워크는 개발 프로세스를 단순화합니다. 2) 데이터 과학 및 기계 학습 분야에서 Numpy, Pandas, Scikit-Learn 및 Tensorflow 라이브러리는 강력한 지원을 제공합니다. 3) 자동화 및 스크립팅 측면에서 Python은 자동화 된 테스트 및 시스템 관리와 ​​같은 작업에 적합합니다.

2 시간 안에 얼마나 많은 파이썬을 배울 수 있습니까?2 시간 안에 얼마나 많은 파이썬을 배울 수 있습니까?Apr 09, 2025 pm 04:33 PM

2 시간 이내에 파이썬의 기본 사항을 배울 수 있습니다. 1. 변수 및 데이터 유형을 배우십시오. 이를 통해 간단한 파이썬 프로그램 작성을 시작하는 데 도움이됩니다.

10 시간 이내에 프로젝트 및 문제 중심 방법에서 컴퓨터 초보자 프로그래밍 기본 사항을 가르치는 방법?10 시간 이내에 프로젝트 및 문제 중심 방법에서 컴퓨터 초보자 프로그래밍 기본 사항을 가르치는 방법?Apr 02, 2025 am 07:18 AM

10 시간 이내에 컴퓨터 초보자 프로그래밍 기본 사항을 가르치는 방법은 무엇입니까? 컴퓨터 초보자에게 프로그래밍 지식을 가르치는 데 10 시간 밖에 걸리지 않는다면 무엇을 가르치기로 선택 하시겠습니까?

중간 독서를 위해 Fiddler를 사용할 때 브라우저에서 감지되는 것을 피하는 방법은 무엇입니까?중간 독서를 위해 Fiddler를 사용할 때 브라우저에서 감지되는 것을 피하는 방법은 무엇입니까?Apr 02, 2025 am 07:15 AM

Fiddlerevery Where를 사용할 때 Man-in-the-Middle Reading에 Fiddlereverywhere를 사용할 때 감지되는 방법 ...

Python 3.6에 피클 파일을로드 할 때 '__builtin__'모듈을 찾을 수없는 경우 어떻게해야합니까?Python 3.6에 피클 파일을로드 할 때 '__builtin__'모듈을 찾을 수없는 경우 어떻게해야합니까?Apr 02, 2025 am 07:12 AM

Python 3.6에 피클 파일로드 3.6 환경 보고서 오류 : modulenotfounderror : nomodulename ...

경치 좋은 스팟 코멘트 분석에서 Jieba Word 세분화의 정확성을 향상시키는 방법은 무엇입니까?경치 좋은 스팟 코멘트 분석에서 Jieba Word 세분화의 정확성을 향상시키는 방법은 무엇입니까?Apr 02, 2025 am 07:09 AM

경치 좋은 스팟 댓글 분석에서 Jieba Word 세분화 문제를 해결하는 방법은 무엇입니까? 경치가 좋은 스팟 댓글 및 분석을 수행 할 때 종종 Jieba Word 세분화 도구를 사용하여 텍스트를 처리합니다 ...

See all articles

핫 AI 도구

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Clothoff.io

Clothoff.io

AI 옷 제거제

AI Hentai Generator

AI Hentai Generator

AI Hentai를 무료로 생성하십시오.

인기 기사

R.E.P.O. 에너지 결정과 그들이하는 일 (노란색 크리스탈)
3 몇 주 전By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 최고의 그래픽 설정
3 몇 주 전By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 아무도들을 수없는 경우 오디오를 수정하는 방법
3 몇 주 전By尊渡假赌尊渡假赌尊渡假赌
WWE 2K25 : Myrise에서 모든 것을 잠금 해제하는 방법
3 몇 주 전By尊渡假赌尊渡假赌尊渡假赌

뜨거운 도구

SublimeText3 Linux 새 버전

SublimeText3 Linux 새 버전

SublimeText3 Linux 최신 버전

DVWA

DVWA

DVWA(Damn Vulnerable Web App)는 매우 취약한 PHP/MySQL 웹 애플리케이션입니다. 주요 목표는 보안 전문가가 법적 환경에서 자신의 기술과 도구를 테스트하고, 웹 개발자가 웹 응용 프로그램 보안 프로세스를 더 잘 이해할 수 있도록 돕고, 교사/학생이 교실 환경 웹 응용 프로그램에서 가르치고 배울 수 있도록 돕는 것입니다. 보안. DVWA의 목표는 다양한 난이도의 간단하고 간단한 인터페이스를 통해 가장 일반적인 웹 취약점 중 일부를 연습하는 것입니다. 이 소프트웨어는

ZendStudio 13.5.1 맥

ZendStudio 13.5.1 맥

강력한 PHP 통합 개발 환경

SecList

SecList

SecLists는 최고의 보안 테스터의 동반자입니다. 보안 평가 시 자주 사용되는 다양한 유형의 목록을 한 곳에 모아 놓은 것입니다. SecLists는 보안 테스터에게 필요할 수 있는 모든 목록을 편리하게 제공하여 보안 테스트를 더욱 효율적이고 생산적으로 만드는 데 도움이 됩니다. 목록 유형에는 사용자 이름, 비밀번호, URL, 퍼징 페이로드, 민감한 데이터 패턴, 웹 셸 등이 포함됩니다. 테스터는 이 저장소를 새로운 테스트 시스템으로 간단히 가져올 수 있으며 필요한 모든 유형의 목록에 액세스할 수 있습니다.

SublimeText3 중국어 버전

SublimeText3 중국어 버전

중국어 버전, 사용하기 매우 쉽습니다.