Rumah >pembangunan bahagian belakang >Tutorial Python >Analisis data trafik udara masa nyata dengan Spark Structured Streaming dan Apache Kafka

Analisis data trafik udara masa nyata dengan Spark Structured Streaming dan Apache Kafka

Mary-Kate Olsen
Mary-Kate Olsenasal
2024-10-29 21:55:02988semak imbas

Pada masa ini, kita hidup dalam dunia di mana bait peta data dijana setiap saat. Oleh itu, menganalisis dan memproses data ini dalam masa nyata menjadi lebih penting bagi syarikat yang ingin menjana cerapan perniagaan dengan lebih tepat apabila data dan lebih banyak data dihasilkan.

Hari ini, kami akan membangunkan analisis data masa nyata berdasarkan data trafik udara rekaan menggunakan Spark Structured Streaming dan Apache Kafka. Jika anda tidak tahu apakah teknologi ini, saya cadangkan membaca artikel saya yang saya tulis untuk memperkenalkannya dengan lebih terperinci, serta konsep lain yang akan dibincangkan sepanjang artikel ini. Jadi, jangan lupa untuk menyemaknya?.

Anda boleh menyemak projek lengkap di GitHub saya.

Seni bina

Nah, bayangkan anda, seorang jurutera data, bekerja di syarikat penerbangan bernama SkyX, di mana data tentang trafik udara dijana setiap saat.

Anda diminta untuk membangunkan papan pemuka yang memaparkan data masa nyata daripada penerbangan ini, seperti penarafan bandar yang paling banyak dikunjungi di luar negara; bandar-bandar di mana kebanyakan orang pergi; dan pesawat yang mengangkut paling ramai orang di seluruh dunia.

Ini ialah data yang dijana dengan setiap penerbangan:

  • nama_pesawat: nama pesawat. Di SkyX, hanya terdapat lima pesawat yang tersedia.
  • Dari: bandar tempat pesawat berlepas. SkyX hanya mengendalikan penerbangan antara lima bandar di seluruh dunia.
  • Ke: bandar destinasi pesawat. Seperti yang dinyatakan, SkyX hanya mengendalikan penerbangan antara lima bandar di seluruh dunia.
  • Penumpang: bilangan penumpang yang diangkut oleh pesawat. Semua pesawat SkyX membawa antara 50 dan 100 orang pada setiap penerbangan.

Berikut ialah seni bina asas projek kami:

  • Pengeluar: bertanggungjawab untuk menghasilkan data trafik udara pesawat dan menghantarnya ke topik Apache Kafka.
  • Pengguna: hanya memerhati data yang tiba dalam masa nyata ke topik Apache Kafka.
  • Analisis data: tiga papan pemuka yang memproses dan menganalisis dalam masa nyata data yang sampai pada topik Apache Kafka. Analisis bandar yang menerima paling ramai pelancong; analisis bandar di mana kebanyakan orang pergi melawat bandar lain; dan analisis pesawat SkyX yang mengangkut paling ramai orang antara bandar di seluruh dunia.

Menyediakan persekitaran pembangunan

Tutorial ini menganggap anda sudah memasang PySpark pada mesin anda. Jika anda belum berbuat demikian, lihat langkah-langkah dalam dokumentasi itu sendiri.

Bagi Apache Kafka, kami akan menggunakannya melalui kontena melalui Docker??.

Dan akhirnya, kami akan menggunakan Python melalui persekitaran maya.

Apache Kafka melalui kontena melalui Docker

Tanpa berlengah lagi, buat folder bernama skyx dan tambahkan fail docker-compose.yml di dalamnya.

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

Sekarang, tambah kandungan berikut di dalam fail docker-compose:

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Selesai! Kami kini boleh memuat naik pelayan Kafka kami. Untuk melakukan ini, taip arahan berikut dalam terminal:

$ docker compose up -d
$ docker compose ps
NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp

Nota: Tutorial ini menggunakan versi 2.0 Docker Compose. Inilah sebabnya mengapa tiada "-" antara docker dan karang ☺.

Kini, kita perlu mencipta topik dalam Kafka yang akan menyimpan data yang dihantar dalam masa nyata oleh pengeluar. Untuk melakukan ini, mari akses Kafka di dalam bekas:

$ docker compose exec kafka bash

Dan akhirnya mencipta topik, dipanggil lalu lintas udara.

$ kafka-topik --buat --topik lalu lintas udara --bootstrap-server localhost:29092

Mencipta topik lalu lintas udara.

Penciptaan persekitaran maya

Untuk membangunkan pengeluar kami, iaitu, aplikasi yang akan bertanggungjawab menghantar data trafik udara masa nyata ke topik Kafka, kami perlu menggunakan perpustakaan kafka-python. kafka-python ialah perpustakaan yang dibangunkan komuniti yang membolehkan kami membangunkan pengeluar dan pengguna yang berintegrasi dengan Apache Kafka.

Mula-mula, mari buat fail bernama requirements.txt dan tambahkan kebergantungan berikut di dalamnya:

kafka-python

Kedua, kami akan mencipta persekitaran maya dan memasang kebergantungan dalam fail requirements.txt:

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

Selesai! Kini persekitaran kita sudah bersedia untuk pembangunan?.

Pembangunan pengeluar

Sekarang mari buat penerbit kami. Seperti yang dinyatakan, pengeluar akan bertanggungjawab untuk menghantar data trafik udara ke topik Kafka yang baru dibuat.

Seperti yang juga dikatakan dalam seni bina, SkyX hanya terbang antara lima bandar di seluruh dunia, dan hanya mempunyai lima pesawat yang tersedia?. Perlu dinyatakan bahawa setiap pesawat membawa antara 50 dan 100 orang.

Perhatikan bahawa data dijana secara rawak dan dihantar ke topik dalam format json dalam selang masa antara 1 dan 6 saat?.

Jom! Buat subdirektori dipanggil src dan subdirektori lain dipanggil kafka. Di dalam direktori kafka, buat fail bernama airtraffic_producer.py dan tambahkan kod berikut di dalamnya:

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Selesai! Kami membangunkan pengeluar kami. Jalankan dan biarkan ia berjalan seketika.

$ python airtraffic_producer.py

Pembangunan pengguna

Sekarang mari bangunkan pengguna kita. Ini akan menjadi aplikasi yang sangat mudah. Ia hanya akan memaparkan data yang tiba dalam topik kafka dalam masa nyata dalam terminal.

Masih di dalam direktori kafka, buat fail bernama airtraffic_consumer.py dan tambahkan kod berikut di dalamnya:

$ docker compose up -d
$ docker compose ps

Lihat, saya memberitahu anda bahawa ia sangat mudah. Jalankan dan tonton data yang akan dipaparkan dalam masa nyata semasa pengeluar menghantar data ke topik.

$ python airtraffic_consumer.py

Analisis data: bandar yang menerima paling ramai pelancong

Sekarang kita mulakan dengan analisis data kita. Pada ketika ini, kami akan membangunkan papan pemuka, aplikasi, yang akan memaparkan dalam masa nyata kedudukan bandar yang menerima paling ramai pelancong. Dengan kata lain, kami akan mengumpulkan data mengikut lajur ke dan membuat jumlah berdasarkan lajur penumpang. Sangat mudah!

Untuk melakukan ini, dalam direktori src, cipta subdirektori yang dipanggil papan pemuka dan buat fail bernama tourists_analysis.py. Kemudian tambahkan kod berikut di dalamnya:

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

Dan kini kami boleh melaksanakan fail kami melalui penyerahan percikan. Tetapi bertenang! Apabila kami menyepadukan PySpark dengan Kafka, kami mesti menjalankan penyerahan percikan secara berbeza. Anda perlu memaklumkan pakej Apache Kafka dan versi semasa Apache Spark melalui parameter --packages.

Jika ini kali pertama anda menyepadukan Apache Spark dengan Apache Kafka, penyerahan percikan mungkin mengambil sedikit masa untuk dijalankan. Ini kerana ia perlu memuat turun pakej yang diperlukan.

Pastikan pengeluar masih berjalan supaya kami dapat melihat analisis data dalam masa nyata. Di dalam direktori papan pemuka, jalankan arahan berikut:

$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 tourists_analysis.py

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Analisis data: bandar yang kebanyakan orang tinggalkan

Analisis ini hampir sama dengan yang sebelumnya. Walau bagaimanapun, daripada menganalisis dalam masa nyata bandar yang menerima paling ramai pelancong, kami akan menganalisis bandar yang paling ramai orang pergi. Untuk melakukan ini, buat fail bernama leavers_analysis.py dan tambahkan kod berikut di dalamnya:

$ docker compose up -d
$ docker compose ps

Pastikan pengeluar masih berjalan supaya kami dapat melihat analisis data dalam masa nyata. Di dalam direktori papan pemuka, jalankan arahan berikut:

$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 leavers_analysis.py

NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp

Analisis data: pesawat yang membawa penumpang paling banyak

Analisis ini jauh lebih mudah daripada yang sebelumnya. Mari analisa dalam masa nyata pesawat yang mengangkut penumpang terbanyak antara bandar di seluruh dunia. Buat fail bernama aircrafts_analysis.py dan tambahkan kod berikut di dalamnya:

$ python -m venv venv
$ venv\scripts\activate
$ pip install -r requirements.txt

Pastikan pengeluar masih berjalan supaya kami dapat melihat analisis data dalam masa nyata. Di dalam direktori papan pemuka, jalankan arahan berikut:

$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 aircrafts_analysis.py

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml

Pertimbangan akhir

Dan kita selesai di sini, kawan-kawan! Dalam artikel ini, kami membangunkan analisis data masa nyata berdasarkan data trafik udara rekaan menggunakan Spark Structured Streaming dan Apache Kafka.

Untuk melakukan ini, kami membangunkan pengeluar yang menghantar data ini dalam masa nyata ke topik Kafka, dan kemudian kami membangunkan 3 papan pemuka untuk menganalisis data ini dalam masa nyata.

Saya harap anda menyukainya. Jumpa lagi lain kali?.

Atas ialah kandungan terperinci Analisis data trafik udara masa nyata dengan Spark Structured Streaming dan Apache Kafka. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn