search
HomeBackend DevelopmentPython TutorialReal-time air traffic data analysis with Spark Structured Streaming and Apache Kafka

Currently, we live in a world where peta bytes of data are generated every second. As such, analyzing and processing this data in real time becomes more than essential for a company looking to generate business insights more accurately as data and more data is produced.

Today, we will develop real-time data analysis based on fictitious air traffic data using Spark Structured Streaming and Apache Kafka. If you don't know what these technologies are, I suggest reading my article that I wrote introducing them in more detail, as well as other concepts that will be covered throughout this article. So, don't forget to check it out?.

You can check out the complete project on my GitHub.

Architecture

Well, imagine that you, a data engineer, work at an airline called SkyX, where data about air traffic is generated every second.

You were asked to develop a dashboard that displays real-time data from these flights, such as a ranking of the most visited cities abroad; the cities where most people leave; and the aircraft that transport the most people around the world.

This is the data that is generated with each flight:

  • aircraft_name: name of the aircraft. At SkyX, there are only five aircraft available.
  • From: city where the aircraft is departing. SkyX only operates flights between five cities around the world.
  • To: aircraft destination city. As mentioned, SkyX only operates flights between five cities around the world.
  • Passengers: number of passengers the aircraft is transporting. All SkyX aircraft carry between 50 and 100 people on each flight.

Following is the basic architecture of our project:

  • Producer: responsible for producing aircraft air traffic data and sending it to an Apache Kafka topic.
  • Consumer: only observes the data arriving in real time to the Apache Kafka topic.
  • Data analysis: three dashboards that process and analyze in real time the data that arrives at the Apache Kafka topic. Analysis of the cities that receive the most tourists; analysis of the cities where most people leave to visit other cities; and analysis of the SkyX aircraft that transport the most people between cities around the world.

Preparing the development environment

This tutorial assumes you already have PySpark installed on your machine. If you haven't already, check out the steps in the documentation itself.

As for Apache Kafka, we will use it through containerization via Docker??.

And finally, we will use Python through a virtual environment.

Apache Kafka by containerization via Docker

Without further ado, create a folder called skyx and add the file docker-compose.yml inside it.

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

Now, add the following content inside the docker-compose file:

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Done! We can now upload our Kafka server. To do this, type the following command in the terminal:

$ docker compose up -d
$ docker compose ps
NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp

Note: This tutorial is using version 2.0 of Docker Compose. This is why there is no "-" between docker and compose ☺.

Now, we need to create a topic within Kafka that will store the data sent in real time by the producer. To do this, let's access Kafka inside the container:

$ docker compose exec kafka bash

And finally create the topic, called airtraffic.

$ kafka-topics --create --topic airtraffic --bootstrap-server localhost:29092

Created topic airtraffic.

Creation of the virtual environment

To develop our producer, that is, the application that will be responsible for sending real-time air traffic data to the Kafka topic, we need to use the kafka-python library. kafka-python is a community-developed library that allows us to develop producers and consumers that integrate with Apache Kafka.

First, let's create a file called requirements.txt and add the following dependency inside it:

kafka-python

Second, we will create a virtual environment and install the dependencies in the requirements.txt file:

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

Done! Now our environment is ready for development?.

Producer development

Now let's create our producer. As mentioned, the producer will be responsible for sending air traffic data to the newly created Kafka topic.

As was also said in the architecture, SkyX only flies between five cities around the world, and only has five aircraft available?. It is worth mentioning that each aircraft carries between 50 and 100 people.

Note that the data is generated randomly and sent to the topic in json format in a time interval between 1 and 6 seconds?.

Let's go! Create a subdirectory called src and another subdirectory called kafka. Inside the kafka directory, create a file called airtraffic_producer.py and add the following code inside it:

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Done! We develop our producer. Run it and let it run for a while.

$ python airtraffic_producer.py

Consumer development

Now let's develop our consumer. This will be a very simple application. It will just display the data arriving in the kafka topic in real time in the terminal.

Still inside the kafka directory, create a file called airtraffic_consumer.py and add the following code inside it:

$ docker compose up -d
$ docker compose ps

See, I told you it was very simple. Run it and watch the data that will be displayed in real time as the producer sends data to the topic.

$ python airtraffic_consumer.py

Data analysis: cities that receive the most tourists

Now we start with our data analysis. At this point, we will develop a dashboard, an application, that will display in real time a ranking of the cities that receive the most tourists. In other words, we will group the data by the to column and make a sum based on the passengers column. Very simple!

To do this, within the src directory, create a subdirectory called dashboards and create a file called tourists_analysis.py. Then add the following code inside it:

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

And we can now execute our file through spark-submit. But calm down! When we are integrating PySpark with Kafka, we must run spark-submit differently. It is necessary to inform the Apache Kafka package and the current version of Apache Spark through the --packages.

parameter

If this is your first time integrating Apache Spark with Apache Kafka, spark-submit may take a while to run. This is because it needs to download the necessary packages.

Make sure the producer is still running so we can see the data analysis in real time. Inside the dashboards directory, run the following command:

$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 tourists_analysis.py

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Data analysis: cities where most people leave

This analysis is very similar to the previous one. However, instead of analyzing in real time the cities that receive the most tourists, we will analyze the cities where the most people leave. To do this, create a file called leavers_analysis.py and add the following code inside it:

$ docker compose up -d
$ docker compose ps

Make sure the producer is still running so we can see the data analysis in real time. Inside the dashboards directory, run the following command:

$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 leavers_analysis.py

NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp

Data analysis: aircraft that carry the most passengers

This analysis is much simpler than the previous ones. Let's analyze in real time the aircraft that transport the most passengers between cities around the world. Create a file called aircrafts_analysis.py and add the following code inside it:

$ python -m venv venv
$ venv\scripts\activate
$ pip install -r requirements.txt

Make sure the producer is still running so we can see the data analysis in real time. Inside the dashboards directory, run the following command:

$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 aircrafts_analysis.py

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

Final considerations

And we finish here, guys! In this article we develop real-time data analysis based on fictitious air traffic data using Spark Structured Streaming and Apache Kafka.

To do this, we developed a producer that sends this data in real time to the Kafka topic, and then we developed 3 dashboards to analyze this data in real time.

I hope you liked it. See you next time?.

The above is the detailed content of Real-time air traffic data analysis with Spark Structured Streaming and Apache Kafka. For more information, please follow other related articles on the PHP Chinese website!

Statement
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Python's Execution Model: Compiled, Interpreted, or Both?Python's Execution Model: Compiled, Interpreted, or Both?May 10, 2025 am 12:04 AM

Pythonisbothcompiledandinterpreted.WhenyourunaPythonscript,itisfirstcompiledintobytecode,whichisthenexecutedbythePythonVirtualMachine(PVM).Thishybridapproachallowsforplatform-independentcodebutcanbeslowerthannativemachinecodeexecution.

Is Python executed line by line?Is Python executed line by line?May 10, 2025 am 12:03 AM

Python is not strictly line-by-line execution, but is optimized and conditional execution based on the interpreter mechanism. The interpreter converts the code to bytecode, executed by the PVM, and may precompile constant expressions or optimize loops. Understanding these mechanisms helps optimize code and improve efficiency.

What are the alternatives to concatenate two lists in Python?What are the alternatives to concatenate two lists in Python?May 09, 2025 am 12:16 AM

There are many methods to connect two lists in Python: 1. Use operators, which are simple but inefficient in large lists; 2. Use extend method, which is efficient but will modify the original list; 3. Use the = operator, which is both efficient and readable; 4. Use itertools.chain function, which is memory efficient but requires additional import; 5. Use list parsing, which is elegant but may be too complex. The selection method should be based on the code context and requirements.

Python: Efficient Ways to Merge Two ListsPython: Efficient Ways to Merge Two ListsMay 09, 2025 am 12:15 AM

There are many ways to merge Python lists: 1. Use operators, which are simple but not memory efficient for large lists; 2. Use extend method, which is efficient but will modify the original list; 3. Use itertools.chain, which is suitable for large data sets; 4. Use * operator, merge small to medium-sized lists in one line of code; 5. Use numpy.concatenate, which is suitable for large data sets and scenarios with high performance requirements; 6. Use append method, which is suitable for small lists but is inefficient. When selecting a method, you need to consider the list size and application scenarios.

Compiled vs Interpreted Languages: pros and consCompiled vs Interpreted Languages: pros and consMay 09, 2025 am 12:06 AM

Compiledlanguagesofferspeedandsecurity,whileinterpretedlanguagesprovideeaseofuseandportability.1)CompiledlanguageslikeC arefasterandsecurebuthavelongerdevelopmentcyclesandplatformdependency.2)InterpretedlanguageslikePythonareeasiertouseandmoreportab

Python: For and While Loops, the most complete guidePython: For and While Loops, the most complete guideMay 09, 2025 am 12:05 AM

In Python, a for loop is used to traverse iterable objects, and a while loop is used to perform operations repeatedly when the condition is satisfied. 1) For loop example: traverse the list and print the elements. 2) While loop example: guess the number game until you guess it right. Mastering cycle principles and optimization techniques can improve code efficiency and reliability.

Python concatenate lists into a stringPython concatenate lists into a stringMay 09, 2025 am 12:02 AM

To concatenate a list into a string, using the join() method in Python is the best choice. 1) Use the join() method to concatenate the list elements into a string, such as ''.join(my_list). 2) For a list containing numbers, convert map(str, numbers) into a string before concatenating. 3) You can use generator expressions for complex formatting, such as ','.join(f'({fruit})'forfruitinfruits). 4) When processing mixed data types, use map(str, mixed_list) to ensure that all elements can be converted into strings. 5) For large lists, use ''.join(large_li

Python's Hybrid Approach: Compilation and Interpretation CombinedPython's Hybrid Approach: Compilation and Interpretation CombinedMay 08, 2025 am 12:16 AM

Pythonusesahybridapproach,combiningcompilationtobytecodeandinterpretation.1)Codeiscompiledtoplatform-independentbytecode.2)BytecodeisinterpretedbythePythonVirtualMachine,enhancingefficiencyandportability.

See all articles

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

Video Face Swap

Video Face Swap

Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Tools

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

MinGW - Minimalist GNU for Windows

MinGW - Minimalist GNU for Windows

This project is in the process of being migrated to osdn.net/projects/mingw, you can continue to follow us there. MinGW: A native Windows port of the GNU Compiler Collection (GCC), freely distributable import libraries and header files for building native Windows applications; includes extensions to the MSVC runtime to support C99 functionality. All MinGW software can run on 64-bit Windows platforms.

MantisBT

MantisBT

Mantis is an easy-to-deploy web-based defect tracking tool designed to aid in product defect tracking. It requires PHP, MySQL and a web server. Check out our demo and hosting services.

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment