Rumah  >  Artikel  >  Java  >  Cara menggunakan Java untuk membangunkan aplikasi pemprosesan strim berdasarkan Apache Kafka dan KSQL

Cara menggunakan Java untuk membangunkan aplikasi pemprosesan strim berdasarkan Apache Kafka dan KSQL

WBOY
WBOYasal
2023-09-21 08:23:04965semak imbas

如何使用Java开发一个基于Apache Kafka和KSQL的流处理应用

Cara membangunkan aplikasi pemprosesan strim berdasarkan Apache Kafka dan KSQL menggunakan Java

Pemprosesan strim ialah teknologi yang mengendalikan aliran data masa nyata supaya data boleh dianalisis dan diproses sebaik sahaja ia tiba. Apache Kafka ialah platform pemprosesan strim teragih yang boleh digunakan untuk membina aplikasi pemprosesan strim berskala dengan cekap. KSQL ialah enjin pemprosesan data aliran sumber terbuka yang boleh digunakan untuk pertanyaan SQL dan penukaran data aliran masa nyata. Dalam artikel ini, kami akan memperkenalkan cara menggunakan Java untuk membangunkan aplikasi pemprosesan strim berdasarkan Apache Kafka dan KSQL.

1. Persediaan persekitaran
Sebelum kita mula, kita perlu menyediakan persekitaran Kafka dan KSQL tempatan. Pertama, kita perlu memuat turun dan memasang Java JDK, Apache Kafka dan Confluent Platform. Kemudian kita boleh memulakan Kafka dan KSQL menggunakan arahan berikut:

  1. Mulakan ZooKeeper:
    bin/zookeeper-server-start.sh config/zookeeper.properties
  2. Mulakan Kafka Broker:
    bin/kafka.sher config /server.properties
  3. Mulakan Pelayan KSQL:
    bin/ksql-server-start.sh config/ksql-server.properties

2. Cipta topik Kafka dan jadual KSQL
Sebelum kita mula menulis kod Java, kita perlu terlebih dahulu Buat topik Kafka dan tulis data masa nyata ke dalamnya. Kita boleh mencipta topik bernama "example-topic" menggunakan arahan berikut:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic example-topic --partitions 1 --replication - faktor 1

Seterusnya, kita perlu mencipta jadual dalam KSQL untuk membuat pertanyaan dan mengubah data masa nyata. Kita boleh mencipta jadual bernama "example-table" dalam terminal KSQL menggunakan arahan berikut:

CREATE TABLE example_table (key VARCHAR, value VARCHAR) WITH (kafka_topic='example-topic', value_format='json', key= 'kunci');

3. Pelaksanaan kod Java
Sebelum mula menulis kod Java, kita perlu menambah kebergantungan pada Kafka dan KSQL. Kami boleh menambah kebergantungan berikut dalam fail konfigurasi Maven atau Gradle:

Maven:

b4b38e33757a6497aa8690936b905cc1

<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>

09a0e22e5aaafd848ae04665be625b91
b4b38e33757a6497aa8690936b905cc1

<groupId>io.confluent</groupId>
<artifactId>ksql-serde</artifactId>
<version>0.10.0</version>

<

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    StreamsBuilder builder = new StreamsBuilder();

    // Step 1: Read from Kafka topic
    KStream<String, String> stream = builder.stream("example-topic");

    // Step 2: Transform and process the data
    stream.mapValues(value -> value.toUpperCase())
          .filter((key, value) -> value.startsWith("A"))
          .to("processed-topic");

    // Step 3: Create a Kafka producer to send data to another topic
    Properties producerProps = new Properties();
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

    // Step 4: Consume and process the data from the processed topic
    KStream<String, String> processedStream = builder.stream("processed-topic");
    processedStream.foreach((key, value) -> {
        // Process the data here
        System.out.println("Key: " + key + ", Value: " + value);
    });

    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    streams.start();
}

09a0e22e5aaafd848ae04665be625b91

implementation 'org.apache.kafka:kafka-clients:2.5.0'

implementation 'io.confluent:ksql-serde:0.10.0'

Seterusnya, kita boleh menulis kod Java untuk melaksanakan aplikasi pemprosesan strim. Berikut ialah kod contoh mudah:

import org.apache.kafka.clients.consumer.*;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.*;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream . *;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.streams.state.*;
import java.util.*;
import java.util.concurrent.*;

StreamProcessingApp kelas awam {

rrreee

}

Kod di atas melaksanakan aplikasi pemprosesan strim mudah, yang akan membaca data masa nyata dalam topik "contoh-topik", menukar data kepada huruf besar dan akan menggunakan huruf "A" Data pada permulaan ditulis ke topik "topik diproses". Pada masa yang sama, ia juga akan menggunakan data dalam topik "topik diproses" dan memprosesnya.

4. Jalankan aplikasi

Selepas menulis kod Java, kita boleh menggunakan arahan berikut untuk menyusun dan menjalankan aplikasi:

javac StreamProcessingApp.java

java StreamProcessingApp

Kini, kami telah berjaya membangunkan strim berdasarkan Apache Kafka dan aplikasi Pemprosesan KSQL dan melaksanakan pembacaan, penukaran, pemprosesan dan penulisan data melalui kod Java. Anda boleh mengubah suai dan melanjutkan kod mengikut keperluan sebenar untuk memenuhi keperluan perniagaan anda. Harap artikel ini membantu anda!

Atas ialah kandungan terperinci Cara menggunakan Java untuk membangunkan aplikasi pemprosesan strim berdasarkan Apache Kafka dan KSQL. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn