目前,我們生活在一個每秒產生千萬億位元組資料的世界。因此,對於希望隨著數據的產生而更準確地產生業務洞察的公司來說,即時分析和處理這些數據變得非常重要。
今天,我們將使用 Spark Structured Streaming 和 Apache Kafka 開發基於虛構空中交通數據的即時數據分析。如果您不知道這些技術是什麼,我建議您閱讀我撰寫的文章,更詳細地介紹它們,以及本文將涵蓋的其他概念。所以,別忘了檢查一下嗎? .

Spark Structured Streaming 和 Apache Kafka 即時資料處理簡介
Gehazi Anc ・22 年 9 月 29 日
你可以在我的 GitHub 上查看完整的專案。
建築學
好吧,想像一下,您是一名數據工程師,在一家名為 SkyX 的航空公司工作,該航空公司每秒都會產生有關空中交通的數據。
您被要求開發一個儀表板來顯示這些航班的即時數據,例如國外訪問量最大的城市的排名;大多數人離開的城市;以及在世界各地運送最多乘客的飛機。
這是每次航班產生的數據:
- aircraft_name:飛機的名稱。在 SkyX,只有五架飛機可用。
- 出發地:飛機起飛的城市。 SkyX 只營運全球五個城市之間的航班。
- 目的地:飛機目的地城市。如前所述,SkyX 僅運營全球五個城市之間的航班。
- 乘客:飛機運送的乘客數量。所有 SkyX 飛機每次飛行可搭載 50 至 100 人。
以下是我們專案的基本架構:
- 生產者:負責產生飛機空中交通資料並將其發送到 Apache Kafka 主題。
- Consumer:只觀察即時到達Apache Kafka Topic的資料。
- 資料分析:三個儀表板,即時處理和分析到達 Apache Kafka 主題的資料。分析接待遊客最多的城市;分析大多數人離開前往其他城市的城市;以及在世界各地城市之間運送最多乘客的 SkyX 飛機的分析。
準備開發環境
本教學假設您的電腦上已經安裝了 PySpark。如果您還沒有這樣做,請查看文件本身中的步驟。
至於Apache Kafka,我們將透過Docker容器化來使用它??.
最後,我們將透過虛擬環境使用 Python。
透過 Docker 進行容器化的 Apache Kafka
話不多說,建立一個名為 skyx 的資料夾,並在其中加入檔案 docker-compose.yml。
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
現在,在 docker-compose 檔案中加入以下內容:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
完成!我們現在可以上傳我們的 Kafka 伺服器。為此,請在終端機中鍵入以下命令:
$ docker compose up -d $ docker compose ps
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
注意:本教學使用 Docker Compose 2.0 版本。這就是為什麼 docker 和 compose ☺ 之間沒有「-」。
現在,我們需要在 Kafka 中建立一個主題來儲存生產者即時發送的資料。為此,讓我們訪問容器內的 Kafka:
$ docker compose 執行 kafka bash
最後建立主題,名稱為airtraffic。
$ kafka-topics --create --topic Airtraffic --bootstrap-server localhost:29092
創建了主題airtraffic。
虛擬環境的創建
為了開發我們的生產者,即負責將即時空中交通數據發送到 Kafka 主題的應用程序,我們需要使用 kafka-python 庫。 kafka-python 是一個社群開發的函式庫,可讓我們開發與 Apache Kafka 整合的生產者和消費者。
首先,讓我們建立一個名為 requirements.txt 的文件,並在其中新增以下相依性:
卡夫卡-python
其次,我們將建立一個虛擬環境並在requirements.txt檔案中安裝依賴項:
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
完成!現在我們的環境已經準備好開發了嗎? .
生產者發展
現在讓我們創建我們的生產者。如前所述,生產者將負責將空中交通數據發送到新創建的 Kafka 主題。
如架構所說,SkyX 只在全球五個城市之間飛行,並且只有五架飛機? 。值得一提的是,每架飛機可搭載50至100人。
注意,資料是隨機產生的,以json格式傳送到主題,時間間隔在1到6秒之間? .
我們走吧!建立一個名為 src 的子目錄和另一個名為 kafka 的子目錄。在 kafka 目錄中,建立一個名為 airtraffic_ Producer.py 的文件,並在其中新增以下程式碼:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
完成!我們發展我們的生產者。運行它並讓它運行一段時間。
$ python Airtraffic_ Producer.py
消費者發展
現在讓我們開發我們的消費者。這將是一個非常簡單的應用程式。它只是在終端機中即時顯示到達kafka topic的資料。
仍在 kafka 目錄中,建立一個名為 airtraffic_consumer.py 的文件,並在其中添加以下程式碼:
$ docker compose up -d $ docker compose ps
看,我告訴你這很簡單。運行它並觀察生產者向主題發送資料時即時顯示的資料。
$ python airtraffic_consumer.py
數據分析:接待遊客最多的城市
現在我們開始進行數據分析。此時,我們將開發一個儀表板,一個應用程序,它將即時顯示接待遊客最多的城市的排名。換句話說,我們將按 to 列對資料進行分組,並根據 passengers 列進行求和。非常簡單!
為此,請在 src 目錄中建立一個名為 dashboards 的子目錄,並建立一個名為 tourists_analysis.py 的檔案。然後在其中加入以下程式碼:
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
我們現在可以透過spark-submit執行我們的檔案。但冷靜點!當我們將 PySpark 與 Kafka 整合時,我們必須以不同的方式運行 Spark-Submit。需要透過 --packages.
參數告知 Apache Kafka 套件以及 Apache Spark 目前版本如果這是您第一次將 Apache Spark 與 Apache Kafka 集成,spark-submit 可能需要一段時間才能運行。這是因為它需要下載必要的軟體包。
確保生產者仍在運行,以便我們可以即時看到數據分析。在儀表板目錄中,執行以下命令:
$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0遊客分析.py
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
數據分析:最多人離開的城市
這個分析與上一個非常相似。然而,我們不會即時分析接待遊客最多的城市,而是分析最多人離開的城市。為此,請建立一個名為 leavers_analysis.py 的文件,並在其中加入以下程式碼:
$ docker compose up -d $ docker compose ps
確保生產者仍在運行,以便我們可以即時看到數據分析。在儀表板目錄中,執行以下命令:
$ Spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 Leavers_analysis.py
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
數據分析:載客量最多的飛機
這個分析比前面的簡單多了。讓我們即時分析在世界各地城市之間運送最多乘客的飛機。建立一個名為 aircrafts_analysis.py 的文件,並在其中加入以下程式碼:
$ python -m venv venv $ venv\scripts\activate $ pip install -r requirements.txt
確保生產者仍在運行,以便我們可以即時看到數據分析。在儀表板目錄中,執行以下命令:
$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 heavens_analysis.py
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
最後的考慮因素
我們就到此結束了,夥伴們!在本文中,我們使用 Spark Structured Streaming 和 Apache Kafka 開發基於虛構空中交通資料的即時資料分析。
為此,我們開發了一個生產者,將這些數據即時發送到 Kafka 主題,然後我們開發了 3 個儀表板來即時分析這些數據。
我希望你喜歡它。下次見? .
以上是使用 Spark Structured Streaming 和 Apache Kafka 進行即時空中交通資料分析的詳細內容。更多資訊請關注PHP中文網其他相關文章!

Python更易學且易用,C 則更強大但複雜。 1.Python語法簡潔,適合初學者,動態類型和自動內存管理使其易用,但可能導致運行時錯誤。 2.C 提供低級控制和高級特性,適合高性能應用,但學習門檻高,需手動管理內存和類型安全。

Python和C 在内存管理和控制方面的差异显著。1.Python使用自动内存管理,基于引用计数和垃圾回收,简化了程序员的工作。2.C 则要求手动管理内存,提供更多控制权但增加了复杂性和出错风险。选择哪种语言应基于项目需求和团队技术栈。

Python在科學計算中的應用包括數據分析、機器學習、數值模擬和可視化。 1.Numpy提供高效的多維數組和數學函數。 2.SciPy擴展Numpy功能,提供優化和線性代數工具。 3.Pandas用於數據處理和分析。 4.Matplotlib用於生成各種圖表和可視化結果。

選擇Python還是C 取決於項目需求:1)Python適合快速開發、數據科學和腳本編寫,因其簡潔語法和豐富庫;2)C 適用於需要高性能和底層控制的場景,如係統編程和遊戲開發,因其編譯型和手動內存管理。

Python在數據科學和機器學習中的應用廣泛,主要依賴於其簡潔性和強大的庫生態系統。 1)Pandas用於數據處理和分析,2)Numpy提供高效的數值計算,3)Scikit-learn用於機器學習模型構建和優化,這些庫讓Python成為數據科學和機器學習的理想工具。

每天學習Python兩個小時是否足夠?這取決於你的目標和學習方法。 1)制定清晰的學習計劃,2)選擇合適的學習資源和方法,3)動手實踐和復習鞏固,可以在這段時間內逐步掌握Python的基本知識和高級功能。

Python在Web開發中的關鍵應用包括使用Django和Flask框架、API開發、數據分析與可視化、機器學習與AI、以及性能優化。 1.Django和Flask框架:Django適合快速開發複雜應用,Flask適用於小型或高度自定義項目。 2.API開發:使用Flask或DjangoRESTFramework構建RESTfulAPI。 3.數據分析與可視化:利用Python處理數據並通過Web界面展示。 4.機器學習與AI:Python用於構建智能Web應用。 5.性能優化:通過異步編程、緩存和代碼優

Python在開發效率上優於C ,但C 在執行性能上更高。 1.Python的簡潔語法和豐富庫提高開發效率。 2.C 的編譯型特性和硬件控制提升執行性能。選擇時需根據項目需求權衡開發速度與執行效率。


熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

AI Hentai Generator
免費產生 AI 無盡。

熱門文章

熱工具

MantisBT
Mantis是一個易於部署的基於Web的缺陷追蹤工具,用於幫助產品缺陷追蹤。它需要PHP、MySQL和一個Web伺服器。請查看我們的演示和託管服務。

SublimeText3 Linux新版
SublimeText3 Linux最新版

SublimeText3漢化版
中文版,非常好用

Atom編輯器mac版下載
最受歡迎的的開源編輯器

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)