搜索
首页后端开发Python教程使用 Spark Structured Streaming 和 Apache Kafka 进行实时空中交通数据分析

目前,我们生活在一个每秒生成千万亿字节数据的世界。因此,对于希望随着数据的产生而更准确地生成业务洞察的公司来说,实时分析和处理这些数据变得非常重要。

今天,我们将使用 Spark Structured Streaming 和 Apache Kafka 开发基于虚构空中交通数据的实时数据分析。如果您不知道这些技术是什么,我建议您阅读我撰写的文章,更详细地介绍它们,以及本文将涵盖的其他概念。所以,别忘了检查一下吗?.

你可以在我的 GitHub 上查看完整的项目。

建筑学

好吧,想象一下,您是一名数据工程师,在一家名为 SkyX 的航空公司工作,该航空公司每秒都会生成有关空中交通的数据。

您被要求开发一个仪表板来显示这些航班的实时数据,例如国外访问量最大的城市的排名;大多数人离开的城市;以及在世界各地运送最多乘客的飞机。

这是每次航班生成的数据:

  • aircraft_name:飞机的名称。在 SkyX,只有五架飞机可用。
  • 出发地:飞机起飞的城市。 SkyX 只运营全球五个城市之间的航班。
  • 目的地:飞机目的地城市。如前所述,SkyX 仅运营全球五个城市之间的航班。
  • 乘客:飞机运送的乘客数量。所有 SkyX 飞机每次飞行可搭载 50 至 100 人。

以下是我们项目的基本架构:

  • 生产者:负责生成飞机空中交通数据并将其发送到 Apache Kafka 主题。
  • Consumer:只观察实时到达Apache Kafka Topic的数据。
  • 数据分析:三个仪表板,实时处理和分析到达 Apache Kafka 主题的数据。分析接待游客最多的城市;分析大多数人离开前往其他城市的城市;以及对在世界各地城市之间运送最多乘客的 SkyX 飞机的分析。

准备开发环境

本教程假设您的计算机上已经安装了 PySpark。如果您还没有这样做,请查看文档本身中的步骤。

至于Apache Kafka,我们将通过Docker容器化来使用它??.

最后,我们将通过虚拟环境使用 Python。

通过 Docker 进行容器化的 Apache Kafka

话不多说,创建一个名为 skyx 的文件夹,并在其中添加文件 docker-compose.yml

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

现在,在 docker-compose 文件中添加以下内容:

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

完成!我们现在可以上传我们的 Kafka 服务器。为此,请在终端中键入以下命令:

$ docker compose up -d
$ docker compose ps
NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp

注意:本教程使用 Docker Compose 2.0 版本。这就是为什么 dockercompose ☺ 之间没有“-”。

现在,我们需要在 Kafka 中创建一个主题,用于存储生产者实时发送的数据。为此,让我们访问容器内的 Kafka:

$ docker compose 执行 kafka bash

最后创建主题,名为airtraffic

$ kafka-topics --create --topic Airtraffic --bootstrap-server localhost:29092

创建了主题airtraffic。

虚拟环境的创建

为了开发我们的生产者,即负责将实时空中交通数据发送到 Kafka 主题的应用程序,我们需要使用 kafka-python 库。 kafka-python 是一个社区开发的库,允许我们开发与 Apache Kafka 集成的生产者和消费者。

首先,让我们创建一个名为 requirements.txt 的文件,并在其中添加以下依赖项:

卡夫卡-python

其次,我们将创建一个虚拟环境并在requirements.txt文件中安装依赖项:

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

完成!现在我们的环境已经准备好开发了吗?.

生产者发展

现在让我们创建我们的生产者。如前所述,生产者将负责将空中交通数据发送到新创建的 Kafka 主题。

正如架构中所说,SkyX 只在全球五个城市之间飞行,并且只有五架飞机?。值得一提的是,每架飞机可搭载50至100人。

注意,数据是随机生成的,以json格式发送到主题,时间间隔在1到6秒之间?.

我们走吧!创建一个名为 src 的子目录和另一个名为 kafka 的子目录。在 kafka 目录中,创建一个名为 airtraffic_ Producer.py 的文件,并在其中添加以下代码:

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

完成!我们发展我们的生产者。运行它并让它运行一段时间。

$ python Airtraffic_ Producer.py

消费者发展

现在让我们开发我们的消费者。这将是一个非常简单的应用程序。它只是在终端中实时显示到达kafka主题的数据。

仍在 kafka 目录中,创建一个名为 airtraffic_consumer.py 的文件,并在其中添加以下代码:

$ docker compose up -d
$ docker compose ps

看,我告诉过你这很简单。运行它并观察生产者向主题发送数据时实时显示的数据。

$ python airtraffic_consumer.py

数据分析:接待游客最多的城市

现在我们开始数据分析。此时,我们将开发一个仪表板,一个应用程序,它将实时显示接待游客最多的城市的排名。换句话说,我们将按 to 列对数据进行分组,并根据 passengers 列进行求和。非常简单!

为此,请在 src 目录中创建一个名为 dashboards 的子目录,并创建一个名为 tourists_analysis.py 的文件。然后在其中添加以下代码:

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

我们现在可以通过spark-submit执行我们的文件。但冷静点!当我们将 PySpark 与 Kafka 集成时,我们必须以不同的方式运行 Spark-Submit。需要通过 --packages.

参数告知 Apache Kafka 包以及 Apache Spark 当前版本

如果这是您第一次将 Apache Spark 与 Apache Kafka 集成,spark-submit 可能需要一段时间才能运行。这是因为它需要下载必要的软件包。

确保生产者仍在运行,以便我们可以实时看到数据分析。在仪表板目录中,运行以下命令:

$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0游客分析.py

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

数据分析:最多人离开的城市

这个分析与上一个非常相似。然而,我们不会实时分析接待游客最多的城市,而是分析最多人离开的城市。为此,创建一个名为 leavers_analysis.py 的文件,并在其中添加以下代码:

$ docker compose up -d
$ docker compose ps

确保生产者仍在运行,以便我们可以实时看到数据分析。在仪表板目录中,运行以下命令:

$ Spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 Leavers_analysis.py

NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp

数据分析:载客量最多的飞机

这个分析比前面的简单多了。让我们实时分析在世界各地城市之间运送最多乘客的飞机。创建一个名为 aircrafts_analysis.py 的文件,并在其中添加以下代码:

$ python -m venv venv
$ venv\scripts\activate
$ pip install -r requirements.txt

确保生产者仍在运行,以便我们可以实时看到数据分析。在仪表板目录中,运行以下命令:

$ Spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 heavens_analysis.py

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

最后的考虑因素

我们就到此结束了,伙计们!在本文中,我们使用 Spark Structured Streaming 和 Apache Kafka 开发基于虚构空中交通数据的实时数据分析。

为此,我们开发了一个生产者,将这些数据实时发送到 Kafka 主题,然后我们开发了 3 个仪表板来实时分析这些数据。

我希望你喜欢它。下次见?.

以上是使用 Spark Structured Streaming 和 Apache Kafka 进行实时空中交通数据分析的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
Python与C:学习曲线和易用性Python与C:学习曲线和易用性Apr 19, 2025 am 12:20 AM

Python更易学且易用,C 则更强大但复杂。1.Python语法简洁,适合初学者,动态类型和自动内存管理使其易用,但可能导致运行时错误。2.C 提供低级控制和高级特性,适合高性能应用,但学习门槛高,需手动管理内存和类型安全。

Python vs. C:内存管理和控制Python vs. C:内存管理和控制Apr 19, 2025 am 12:17 AM

Python和C 在内存管理和控制方面的差异显着。 1.Python使用自动内存管理,基于引用计数和垃圾回收,简化了程序员的工作。 2.C 则要求手动管理内存,提供更多控制权但增加了复杂性和出错风险。选择哪种语言应基于项目需求和团队技术栈。

科学计算的Python:详细的外观科学计算的Python:详细的外观Apr 19, 2025 am 12:15 AM

Python在科学计算中的应用包括数据分析、机器学习、数值模拟和可视化。1.Numpy提供高效的多维数组和数学函数。2.SciPy扩展Numpy功能,提供优化和线性代数工具。3.Pandas用于数据处理和分析。4.Matplotlib用于生成各种图表和可视化结果。

Python和C:找到合适的工具Python和C:找到合适的工具Apr 19, 2025 am 12:04 AM

选择Python还是C 取决于项目需求:1)Python适合快速开发、数据科学和脚本编写,因其简洁语法和丰富库;2)C 适用于需要高性能和底层控制的场景,如系统编程和游戏开发,因其编译型和手动内存管理。

数据科学和机器学习的Python数据科学和机器学习的PythonApr 19, 2025 am 12:02 AM

Python在数据科学和机器学习中的应用广泛,主要依赖于其简洁性和强大的库生态系统。1)Pandas用于数据处理和分析,2)Numpy提供高效的数值计算,3)Scikit-learn用于机器学习模型构建和优化,这些库让Python成为数据科学和机器学习的理想工具。

学习Python:2小时的每日学习是否足够?学习Python:2小时的每日学习是否足够?Apr 18, 2025 am 12:22 AM

每天学习Python两个小时是否足够?这取决于你的目标和学习方法。1)制定清晰的学习计划,2)选择合适的学习资源和方法,3)动手实践和复习巩固,可以在这段时间内逐步掌握Python的基本知识和高级功能。

Web开发的Python:关键应用程序Web开发的Python:关键应用程序Apr 18, 2025 am 12:20 AM

Python在Web开发中的关键应用包括使用Django和Flask框架、API开发、数据分析与可视化、机器学习与AI、以及性能优化。1.Django和Flask框架:Django适合快速开发复杂应用,Flask适用于小型或高度自定义项目。2.API开发:使用Flask或DjangoRESTFramework构建RESTfulAPI。3.数据分析与可视化:利用Python处理数据并通过Web界面展示。4.机器学习与AI:Python用于构建智能Web应用。5.性能优化:通过异步编程、缓存和代码优

Python vs.C:探索性能和效率Python vs.C:探索性能和效率Apr 18, 2025 am 12:20 AM

Python在开发效率上优于C ,但C 在执行性能上更高。1.Python的简洁语法和丰富库提高开发效率。2.C 的编译型特性和硬件控制提升执行性能。选择时需根据项目需求权衡开发速度与执行效率。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热工具

螳螂BT

螳螂BT

Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。

SublimeText3 Linux新版

SublimeText3 Linux新版

SublimeText3 Linux最新版

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

Atom编辑器mac版下载

Atom编辑器mac版下载

最流行的的开源编辑器

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)