目前,我們生活在一個每秒產生千萬億位元組資料的世界。因此,對於希望隨著數據的產生而更準確地產生業務洞察的公司來說,即時分析和處理這些數據變得非常重要。
今天,我們將使用 Spark Structured Streaming 和 Apache Kafka 開發基於虛構空中交通數據的即時數據分析。如果您不知道這些技術是什麼,我建議您閱讀我撰寫的文章,更詳細地介紹它們,以及本文將涵蓋的其他概念。所以,別忘了檢查一下嗎? .
你可以在我的 GitHub 上查看完整的專案。
好吧,想像一下,您是一名數據工程師,在一家名為 SkyX 的航空公司工作,該航空公司每秒都會產生有關空中交通的數據。
您被要求開發一個儀表板來顯示這些航班的即時數據,例如國外訪問量最大的城市的排名;大多數人離開的城市;以及在世界各地運送最多乘客的飛機。
這是每次航班產生的數據:
以下是我們專案的基本架構:
本教學假設您的電腦上已經安裝了 PySpark。如果您還沒有這樣做,請查看文件本身中的步驟。
至於Apache Kafka,我們將透過Docker容器化來使用它??.
最後,我們將透過虛擬環境使用 Python。
話不多說,建立一個名為 skyx 的資料夾,並在其中加入檔案 docker-compose.yml。
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
現在,在 docker-compose 檔案中加入以下內容:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
完成!我們現在可以上傳我們的 Kafka 伺服器。為此,請在終端機中鍵入以下命令:
$ docker compose up -d $ docker compose ps
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
注意:本教學使用 Docker Compose 2.0 版本。這就是為什麼 docker 和 compose ☺ 之間沒有「-」。
現在,我們需要在 Kafka 中建立一個主題來儲存生產者即時發送的資料。為此,讓我們訪問容器內的 Kafka:
$ docker compose 執行 kafka bash
最後建立主題,名稱為airtraffic。
$ kafka-topics --create --topic Airtraffic --bootstrap-server localhost:29092
創建了主題airtraffic。
為了開發我們的生產者,即負責將即時空中交通數據發送到 Kafka 主題的應用程序,我們需要使用 kafka-python 庫。 kafka-python 是一個社群開發的函式庫,可讓我們開發與 Apache Kafka 整合的生產者和消費者。
首先,讓我們建立一個名為 requirements.txt 的文件,並在其中新增以下相依性:
卡夫卡-python
其次,我們將建立一個虛擬環境並在requirements.txt檔案中安裝依賴項:
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
完成!現在我們的環境已經準備好開發了嗎? .
現在讓我們創建我們的生產者。如前所述,生產者將負責將空中交通數據發送到新創建的 Kafka 主題。
如架構所說,SkyX 只在全球五個城市之間飛行,並且只有五架飛機? 。值得一提的是,每架飛機可搭載50至100人。
注意,資料是隨機產生的,以json格式傳送到主題,時間間隔在1到6秒之間? .
我們走吧!建立一個名為 src 的子目錄和另一個名為 kafka 的子目錄。在 kafka 目錄中,建立一個名為 airtraffic_ Producer.py 的文件,並在其中新增以下程式碼:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
完成!我們發展我們的生產者。運行它並讓它運行一段時間。
$ python Airtraffic_ Producer.py
現在讓我們開發我們的消費者。這將是一個非常簡單的應用程式。它只是在終端機中即時顯示到達kafka topic的資料。
仍在 kafka 目錄中,建立一個名為 airtraffic_consumer.py 的文件,並在其中添加以下程式碼:
$ docker compose up -d $ docker compose ps
看,我告訴你這很簡單。運行它並觀察生產者向主題發送資料時即時顯示的資料。
$ python airtraffic_consumer.py
現在我們開始進行數據分析。此時,我們將開發一個儀表板,一個應用程序,它將即時顯示接待遊客最多的城市的排名。換句話說,我們將按 to 列對資料進行分組,並根據 passengers 列進行求和。非常簡單!
為此,請在 src 目錄中建立一個名為 dashboards 的子目錄,並建立一個名為 tourists_analysis.py 的檔案。然後在其中加入以下程式碼:
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
我們現在可以透過spark-submit執行我們的檔案。但冷靜點!當我們將 PySpark 與 Kafka 整合時,我們必須以不同的方式運行 Spark-Submit。需要透過 --packages.
參數告知 Apache Kafka 套件以及 Apache Spark 目前版本如果這是您第一次將 Apache Spark 與 Apache Kafka 集成,spark-submit 可能需要一段時間才能運行。這是因為它需要下載必要的軟體包。
確保生產者仍在運行,以便我們可以即時看到數據分析。在儀表板目錄中,執行以下命令:
$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0遊客分析.py
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
這個分析與上一個非常相似。然而,我們不會即時分析接待遊客最多的城市,而是分析最多人離開的城市。為此,請建立一個名為 leavers_analysis.py 的文件,並在其中加入以下程式碼:
$ docker compose up -d $ docker compose ps
確保生產者仍在運行,以便我們可以即時看到數據分析。在儀表板目錄中,執行以下命令:
$ Spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 Leavers_analysis.py
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
這個分析比前面的簡單多了。讓我們即時分析在世界各地城市之間運送最多乘客的飛機。建立一個名為 aircrafts_analysis.py 的文件,並在其中加入以下程式碼:
$ python -m venv venv $ venv\scripts\activate $ pip install -r requirements.txt
確保生產者仍在運行,以便我們可以即時看到數據分析。在儀表板目錄中,執行以下命令:
$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 heavens_analysis.py
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
我們就到此結束了,夥伴們!在本文中,我們使用 Spark Structured Streaming 和 Apache Kafka 開發基於虛構空中交通資料的即時資料分析。
為此,我們開發了一個生產者,將這些數據即時發送到 Kafka 主題,然後我們開發了 3 個儀表板來即時分析這些數據。
我希望你喜歡它。下次見? .
以上是使用 Spark Structured Streaming 和 Apache Kafka 進行即時空中交通資料分析的詳細內容。更多資訊請關注PHP中文網其他相關文章!