目前,我们生活在一个每秒生成千万亿字节数据的世界。因此,对于希望随着数据的产生而更准确地生成业务洞察的公司来说,实时分析和处理这些数据变得非常重要。
今天,我们将使用 Spark Structured Streaming 和 Apache Kafka 开发基于虚构空中交通数据的实时数据分析。如果您不知道这些技术是什么,我建议您阅读我撰写的文章,更详细地介绍它们,以及本文将涵盖的其他概念。所以,别忘了检查一下吗?.

Spark Structured Streaming 和 Apache Kafka 实时数据处理简介
Gehazi Anc ・22 年 9 月 29 日
你可以在我的 GitHub 上查看完整的项目。
建筑学
好吧,想象一下,您是一名数据工程师,在一家名为 SkyX 的航空公司工作,该航空公司每秒都会生成有关空中交通的数据。
您被要求开发一个仪表板来显示这些航班的实时数据,例如国外访问量最大的城市的排名;大多数人离开的城市;以及在世界各地运送最多乘客的飞机。
这是每次航班生成的数据:
- aircraft_name:飞机的名称。在 SkyX,只有五架飞机可用。
- 出发地:飞机起飞的城市。 SkyX 只运营全球五个城市之间的航班。
- 目的地:飞机目的地城市。如前所述,SkyX 仅运营全球五个城市之间的航班。
- 乘客:飞机运送的乘客数量。所有 SkyX 飞机每次飞行可搭载 50 至 100 人。
以下是我们项目的基本架构:
- 生产者:负责生成飞机空中交通数据并将其发送到 Apache Kafka 主题。
- Consumer:只观察实时到达Apache Kafka Topic的数据。
- 数据分析:三个仪表板,实时处理和分析到达 Apache Kafka 主题的数据。分析接待游客最多的城市;分析大多数人离开前往其他城市的城市;以及对在世界各地城市之间运送最多乘客的 SkyX 飞机的分析。
准备开发环境
本教程假设您的计算机上已经安装了 PySpark。如果您还没有这样做,请查看文档本身中的步骤。
至于Apache Kafka,我们将通过Docker容器化来使用它??.
最后,我们将通过虚拟环境使用 Python。
通过 Docker 进行容器化的 Apache Kafka
话不多说,创建一个名为 skyx 的文件夹,并在其中添加文件 docker-compose.yml。
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
现在,在 docker-compose 文件中添加以下内容:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
完成!我们现在可以上传我们的 Kafka 服务器。为此,请在终端中键入以下命令:
$ docker compose up -d $ docker compose ps
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
注意:本教程使用 Docker Compose 2.0 版本。这就是为什么 docker 和 compose ☺ 之间没有“-”。
现在,我们需要在 Kafka 中创建一个主题,用于存储生产者实时发送的数据。为此,让我们访问容器内的 Kafka:
$ docker compose 执行 kafka bash
最后创建主题,名为airtraffic。
$ kafka-topics --create --topic Airtraffic --bootstrap-server localhost:29092
创建了主题airtraffic。
虚拟环境的创建
为了开发我们的生产者,即负责将实时空中交通数据发送到 Kafka 主题的应用程序,我们需要使用 kafka-python 库。 kafka-python 是一个社区开发的库,允许我们开发与 Apache Kafka 集成的生产者和消费者。
首先,让我们创建一个名为 requirements.txt 的文件,并在其中添加以下依赖项:
卡夫卡-python
其次,我们将创建一个虚拟环境并在requirements.txt文件中安装依赖项:
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
完成!现在我们的环境已经准备好开发了吗?.
生产者发展
现在让我们创建我们的生产者。如前所述,生产者将负责将空中交通数据发送到新创建的 Kafka 主题。
正如架构中所说,SkyX 只在全球五个城市之间飞行,并且只有五架飞机?。值得一提的是,每架飞机可搭载50至100人。
注意,数据是随机生成的,以json格式发送到主题,时间间隔在1到6秒之间?.
我们走吧!创建一个名为 src 的子目录和另一个名为 kafka 的子目录。在 kafka 目录中,创建一个名为 airtraffic_ Producer.py 的文件,并在其中添加以下代码:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
完成!我们发展我们的生产者。运行它并让它运行一段时间。
$ python Airtraffic_ Producer.py
消费者发展
现在让我们开发我们的消费者。这将是一个非常简单的应用程序。它只是在终端中实时显示到达kafka主题的数据。
仍在 kafka 目录中,创建一个名为 airtraffic_consumer.py 的文件,并在其中添加以下代码:
$ docker compose up -d $ docker compose ps
看,我告诉过你这很简单。运行它并观察生产者向主题发送数据时实时显示的数据。
$ python airtraffic_consumer.py
数据分析:接待游客最多的城市
现在我们开始数据分析。此时,我们将开发一个仪表板,一个应用程序,它将实时显示接待游客最多的城市的排名。换句话说,我们将按 to 列对数据进行分组,并根据 passengers 列进行求和。非常简单!
为此,请在 src 目录中创建一个名为 dashboards 的子目录,并创建一个名为 tourists_analysis.py 的文件。然后在其中添加以下代码:
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
我们现在可以通过spark-submit执行我们的文件。但冷静点!当我们将 PySpark 与 Kafka 集成时,我们必须以不同的方式运行 Spark-Submit。需要通过 --packages.
参数告知 Apache Kafka 包以及 Apache Spark 当前版本如果这是您第一次将 Apache Spark 与 Apache Kafka 集成,spark-submit 可能需要一段时间才能运行。这是因为它需要下载必要的软件包。
确保生产者仍在运行,以便我们可以实时看到数据分析。在仪表板目录中,运行以下命令:
$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0游客分析.py
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
数据分析:最多人离开的城市
这个分析与上一个非常相似。然而,我们不会实时分析接待游客最多的城市,而是分析最多人离开的城市。为此,创建一个名为 leavers_analysis.py 的文件,并在其中添加以下代码:
$ docker compose up -d $ docker compose ps
确保生产者仍在运行,以便我们可以实时看到数据分析。在仪表板目录中,运行以下命令:
$ Spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 Leavers_analysis.py
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
数据分析:载客量最多的飞机
这个分析比前面的简单多了。让我们实时分析在世界各地城市之间运送最多乘客的飞机。创建一个名为 aircrafts_analysis.py 的文件,并在其中添加以下代码:
$ python -m venv venv $ venv\scripts\activate $ pip install -r requirements.txt
确保生产者仍在运行,以便我们可以实时看到数据分析。在仪表板目录中,运行以下命令:
$ Spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 heavens_analysis.py
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
最后的考虑因素
我们就到此结束了,伙计们!在本文中,我们使用 Spark Structured Streaming 和 Apache Kafka 开发基于虚构空中交通数据的实时数据分析。
为此,我们开发了一个生产者,将这些数据实时发送到 Kafka 主题,然后我们开发了 3 个仪表板来实时分析这些数据。
我希望你喜欢它。下次见?.
以上是使用 Spark Structured Streaming 和 Apache Kafka 进行实时空中交通数据分析的详细内容。更多信息请关注PHP中文网其他相关文章!

Python更易学且易用,C 则更强大但复杂。1.Python语法简洁,适合初学者,动态类型和自动内存管理使其易用,但可能导致运行时错误。2.C 提供低级控制和高级特性,适合高性能应用,但学习门槛高,需手动管理内存和类型安全。

Python和C 在内存管理和控制方面的差异显着。 1.Python使用自动内存管理,基于引用计数和垃圾回收,简化了程序员的工作。 2.C 则要求手动管理内存,提供更多控制权但增加了复杂性和出错风险。选择哪种语言应基于项目需求和团队技术栈。

Python在科学计算中的应用包括数据分析、机器学习、数值模拟和可视化。1.Numpy提供高效的多维数组和数学函数。2.SciPy扩展Numpy功能,提供优化和线性代数工具。3.Pandas用于数据处理和分析。4.Matplotlib用于生成各种图表和可视化结果。

选择Python还是C 取决于项目需求:1)Python适合快速开发、数据科学和脚本编写,因其简洁语法和丰富库;2)C 适用于需要高性能和底层控制的场景,如系统编程和游戏开发,因其编译型和手动内存管理。

Python在数据科学和机器学习中的应用广泛,主要依赖于其简洁性和强大的库生态系统。1)Pandas用于数据处理和分析,2)Numpy提供高效的数值计算,3)Scikit-learn用于机器学习模型构建和优化,这些库让Python成为数据科学和机器学习的理想工具。

每天学习Python两个小时是否足够?这取决于你的目标和学习方法。1)制定清晰的学习计划,2)选择合适的学习资源和方法,3)动手实践和复习巩固,可以在这段时间内逐步掌握Python的基本知识和高级功能。

Python在Web开发中的关键应用包括使用Django和Flask框架、API开发、数据分析与可视化、机器学习与AI、以及性能优化。1.Django和Flask框架:Django适合快速开发复杂应用,Flask适用于小型或高度自定义项目。2.API开发:使用Flask或DjangoRESTFramework构建RESTfulAPI。3.数据分析与可视化:利用Python处理数据并通过Web界面展示。4.机器学习与AI:Python用于构建智能Web应用。5.性能优化:通过异步编程、缓存和代码优

Python在开发效率上优于C ,但C 在执行性能上更高。1.Python的简洁语法和丰富库提高开发效率。2.C 的编译型特性和硬件控制提升执行性能。选择时需根据项目需求权衡开发速度与执行效率。


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

螳螂BT
Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。

SublimeText3 Linux新版
SublimeText3 Linux最新版

SublimeText3汉化版
中文版,非常好用

Atom编辑器mac版下载
最流行的的开源编辑器

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)