首页 >后端开发 >Python教程 >使用 Spark Structured Streaming 和 Apache Kafka 进行实时空中交通数据分析

使用 Spark Structured Streaming 和 Apache Kafka 进行实时空中交通数据分析

Mary-Kate Olsen
Mary-Kate Olsen原创
2024-10-29 21:55:02977浏览

目前,我们生活在一个每秒生成千万亿字节数据的世界。因此,对于希望随着数据的产生而更准确地生成业务洞察的公司来说,实时分析和处理这些数据变得非常重要。

今天,我们将使用 Spark Structured Streaming 和 Apache Kafka 开发基于虚构空中交通数据的实时数据分析。如果您不知道这些技术是什么,我建议您阅读我撰写的文章,更详细地介绍它们,以及本文将涵盖的其他概念。所以,别忘了检查一下吗?.

你可以在我的 GitHub 上查看完整的项目。

建筑学

好吧,想象一下,您是一名数据工程师,在一家名为 SkyX 的航空公司工作,该航空公司每秒都会生成有关空中交通的数据。

您被要求开发一个仪表板来显示这些航班的实时数据,例如国外访问量最大的城市的排名;大多数人离开的城市;以及在世界各地运送最多乘客的飞机。

这是每次航班生成的数据:

  • aircraft_name:飞机的名称。在 SkyX,只有五架飞机可用。
  • 出发地:飞机起飞的城市。 SkyX 只运营全球五个城市之间的航班。
  • 目的地:飞机目的地城市。如前所述,SkyX 仅运营全球五个城市之间的航班。
  • 乘客:飞机运送的乘客数量。所有 SkyX 飞机每次飞行可搭载 50 至 100 人。

以下是我们项目的基本架构:

  • 生产者:负责生成飞机空中交通数据并将其发送到 Apache Kafka 主题。
  • Consumer:只观察实时到达Apache Kafka Topic的数据。
  • 数据分析:三个仪表板,实时处理和分析到达 Apache Kafka 主题的数据。分析接待游客最多的城市;分析大多数人离开前往其他城市的城市;以及对在世界各地城市之间运送最多乘客的 SkyX 飞机的分析。

准备开发环境

本教程假设您的计算机上已经安装了 PySpark。如果您还没有这样做,请查看文档本身中的步骤。

至于Apache Kafka,我们将通过Docker容器化来使用它??.

最后,我们将通过虚拟环境使用 Python。

通过 Docker 进行容器化的 Apache Kafka

话不多说,创建一个名为 skyx 的文件夹,并在其中添加文件 docker-compose.yml

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

现在,在 docker-compose 文件中添加以下内容:

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

完成!我们现在可以上传我们的 Kafka 服务器。为此,请在终端中键入以下命令:

$ docker compose up -d
$ docker compose ps
NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp

注意:本教程使用 Docker Compose 2.0 版本。这就是为什么 dockercompose ☺ 之间没有“-”。

现在,我们需要在 Kafka 中创建一个主题,用于存储生产者实时发送的数据。为此,让我们访问容器内的 Kafka:

$ docker compose 执行 kafka bash

最后创建主题,名为airtraffic

$ kafka-topics --create --topic Airtraffic --bootstrap-server localhost:29092

创建了主题airtraffic。

虚拟环境的创建

为了开发我们的生产者,即负责将实时空中交通数据发送到 Kafka 主题的应用程序,我们需要使用 kafka-python 库。 kafka-python 是一个社区开发的库,允许我们开发与 Apache Kafka 集成的生产者和消费者。

首先,让我们创建一个名为 requirements.txt 的文件,并在其中添加以下依赖项:

卡夫卡-python

其次,我们将创建一个虚拟环境并在requirements.txt文件中安装依赖项:

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

完成!现在我们的环境已经准备好开发了吗?.

生产者发展

现在让我们创建我们的生产者。如前所述,生产者将负责将空中交通数据发送到新创建的 Kafka 主题。

正如架构中所说,SkyX 只在全球五个城市之间飞行,并且只有五架飞机?。值得一提的是,每架飞机可搭载50至100人。

注意,数据是随机生成的,以json格式发送到主题,时间间隔在1到6秒之间?.

我们走吧!创建一个名为 src 的子目录和另一个名为 kafka 的子目录。在 kafka 目录中,创建一个名为 airtraffic_ Producer.py 的文件,并在其中添加以下代码:

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

完成!我们发展我们的生产者。运行它并让它运行一段时间。

$ python Airtraffic_ Producer.py

消费者发展

现在让我们开发我们的消费者。这将是一个非常简单的应用程序。它只是在终端中实时显示到达kafka主题的数据。

仍在 kafka 目录中,创建一个名为 airtraffic_consumer.py 的文件,并在其中添加以下代码:

$ docker compose up -d
$ docker compose ps

看,我告诉过你这很简单。运行它并观察生产者向主题发送数据时实时显示的数据。

$ python airtraffic_consumer.py

数据分析:接待游客最多的城市

现在我们开始数据分析。此时,我们将开发一个仪表板,一个应用程序,它将实时显示接待游客最多的城市的排名。换句话说,我们将按 to 列对数据进行分组,并根据 passengers 列进行求和。非常简单!

为此,请在 src 目录中创建一个名为 dashboards 的子目录,并创建一个名为 tourists_analysis.py 的文件。然后在其中添加以下代码:

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

我们现在可以通过spark-submit执行我们的文件。但冷静点!当我们将 PySpark 与 Kafka 集成时,我们必须以不同的方式运行 Spark-Submit。需要通过 --packages.

参数告知 Apache Kafka 包以及 Apache Spark 当前版本

如果这是您第一次将 Apache Spark 与 Apache Kafka 集成,spark-submit 可能需要一段时间才能运行。这是因为它需要下载必要的软件包。

确保生产者仍在运行,以便我们可以实时看到数据分析。在仪表板目录中,运行以下命令:

$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0游客分析.py

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

数据分析:最多人离开的城市

这个分析与上一个非常相似。然而,我们不会实时分析接待游客最多的城市,而是分析最多人离开的城市。为此,创建一个名为 leavers_analysis.py 的文件,并在其中添加以下代码:

$ docker compose up -d
$ docker compose ps

确保生产者仍在运行,以便我们可以实时看到数据分析。在仪表板目录中,运行以下命令:

$ Spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 Leavers_analysis.py

NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp

数据分析:载客量最多的飞机

这个分析比前面的简单多了。让我们实时分析在世界各地城市之间运送最多乘客的飞机。创建一个名为 aircrafts_analysis.py 的文件,并在其中添加以下代码:

$ python -m venv venv
$ venv\scripts\activate
$ pip install -r requirements.txt

确保生产者仍在运行,以便我们可以实时看到数据分析。在仪表板目录中,运行以下命令:

$ Spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 heavens_analysis.py

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

最后的考虑因素

我们就到此结束了,伙计们!在本文中,我们使用 Spark Structured Streaming 和 Apache Kafka 开发基于虚构空中交通数据的实时数据分析。

为此,我们开发了一个生产者,将这些数据实时发送到 Kafka 主题,然后我们开发了 3 个仪表板来实时分析这些数据。

我希望你喜欢它。下次见?.

以上是使用 Spark Structured Streaming 和 Apache Kafka 进行实时空中交通数据分析的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn