Rumah >Java >javaTutorial >Cara menggunakan Java untuk membangunkan aplikasi pemprosesan strim berdasarkan Apache Kafka dan KSQL
Cara membangunkan aplikasi pemprosesan strim berdasarkan Apache Kafka dan KSQL menggunakan Java
Pemprosesan strim ialah teknologi yang mengendalikan aliran data masa nyata supaya data boleh dianalisis dan diproses sebaik sahaja ia tiba. Apache Kafka ialah platform pemprosesan strim teragih yang boleh digunakan untuk membina aplikasi pemprosesan strim berskala dengan cekap. KSQL ialah enjin pemprosesan data aliran sumber terbuka yang boleh digunakan untuk pertanyaan SQL dan penukaran data aliran masa nyata. Dalam artikel ini, kami akan memperkenalkan cara menggunakan Java untuk membangunkan aplikasi pemprosesan strim berdasarkan Apache Kafka dan KSQL.
1. Persediaan persekitaran
Sebelum kita mula, kita perlu menyediakan persekitaran Kafka dan KSQL tempatan. Pertama, kita perlu memuat turun dan memasang Java JDK, Apache Kafka dan Confluent Platform. Kemudian kita boleh memulakan Kafka dan KSQL menggunakan arahan berikut:
2. Cipta topik Kafka dan jadual KSQL
Sebelum kita mula menulis kod Java, kita perlu terlebih dahulu Buat topik Kafka dan tulis data masa nyata ke dalamnya. Kita boleh mencipta topik bernama "example-topic" menggunakan arahan berikut:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic example-topic --partitions 1 --replication - faktor 1
Seterusnya, kita perlu mencipta jadual dalam KSQL untuk membuat pertanyaan dan mengubah data masa nyata. Kita boleh mencipta jadual bernama "example-table" dalam terminal KSQL menggunakan arahan berikut:
CREATE TABLE example_table (key VARCHAR, value VARCHAR) WITH (kafka_topic='example-topic', value_format='json', key= 'kunci');
3. Pelaksanaan kod Java
Sebelum mula menulis kod Java, kita perlu menambah kebergantungan pada Kafka dan KSQL. Kami boleh menambah kebergantungan berikut dalam fail konfigurasi Maven atau Gradle:
Maven:
b4b38e33757a6497aa8690936b905cc1
<groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version>
09a0e22e5aaafd848ae04665be625b91
b4b38e33757a6497aa8690936b905cc1
<groupId>io.confluent</groupId> <artifactId>ksql-serde</artifactId> <version>0.10.0</version>
<
public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); StreamsBuilder builder = new StreamsBuilder(); // Step 1: Read from Kafka topic KStream<String, String> stream = builder.stream("example-topic"); // Step 2: Transform and process the data stream.mapValues(value -> value.toUpperCase()) .filter((key, value) -> value.startsWith("A")) .to("processed-topic"); // Step 3: Create a Kafka producer to send data to another topic Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps); // Step 4: Consume and process the data from the processed topic KStream<String, String> processedStream = builder.stream("processed-topic"); processedStream.foreach((key, value) -> { // Process the data here System.out.println("Key: " + key + ", Value: " + value); }); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); }
09a0e22e5aaafd848ae04665be625b91
implementation 'org.apache.kafka:kafka-clients:2.5.0'implementation 'io.confluent:ksql-serde:0.10.0'
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.*;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream . *;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.streams.state.*;
import java.util.*;
import java.util.concurrent.*;
rrreee
}Kod di atas melaksanakan aplikasi pemprosesan strim mudah, yang akan membaca data masa nyata dalam topik "contoh-topik", menukar data kepada huruf besar dan akan menggunakan huruf "A" Data pada permulaan ditulis ke topik "topik diproses". Pada masa yang sama, ia juga akan menggunakan data dalam topik "topik diproses" dan memprosesnya. 4. Jalankan aplikasiSelepas menulis kod Java, kita boleh menggunakan arahan berikut untuk menyusun dan menjalankan aplikasi:
java StreamProcessingApp
Atas ialah kandungan terperinci Cara menggunakan Java untuk membangunkan aplikasi pemprosesan strim berdasarkan Apache Kafka dan KSQL. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!