Rumah  >  Artikel  >  Java  >  Pembangunan Java: Cara menggunakan Apache Kafka Streams untuk pemprosesan dan pengkomputeran strim masa nyata

Pembangunan Java: Cara menggunakan Apache Kafka Streams untuk pemprosesan dan pengkomputeran strim masa nyata

WBOY
WBOYasal
2023-09-21 12:39:241470semak imbas

Java开发:如何使用Apache Kafka Streams进行实时流处理和计算

Pembangunan Java: Cara menggunakan Apache Kafka Streams untuk pemprosesan dan pengkomputeran strim masa nyata

Pengenalan:
Dengan peningkatan data besar dan pengkomputeran masa nyata, Apache Kafka Streams, sebagai enjin pemprosesan strim, sedang digunakan semakin banyak Digunakan oleh pembangun. Ia menyediakan cara yang mudah tetapi berkuasa untuk mengendalikan data penstriman masa nyata dan melaksanakan pemprosesan dan pengiraan strim yang kompleks. Artikel ini akan memperkenalkan cara menggunakan Apache Kafka Streams untuk pemprosesan dan pengkomputeran strim masa nyata, termasuk mengkonfigurasi persekitaran, menulis kod dan demonstrasi sampel.

1. Penyediaan:

  1. Pasang dan konfigurasikan Apache Kafka: Anda perlu memuat turun dan memasang Apache Kafka, dan mulakan gugusan Apache Kafka. Untuk pemasangan dan konfigurasi terperinci, sila rujuk dokumentasi rasmi Apache Kafka.
  2. Perkenalkan dependensi: Perkenalkan dependensi berkaitan Kafka Streams ke dalam projek Java. Contohnya, menggunakan Maven, anda boleh menambah kebergantungan berikut dalam maklumat sambungan Kluster projek. Berikut ialah kod contoh mudah:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.8.1</version>
</dependency>

Tambah pemprosesan strim dan logik pengiraan:
    Selepas mencipta aplikasi Kafka Streams, anda perlu menambah pemprosesan strim dan logik pengiraan. Mengambil contoh mudah, kami menganggap bahawa kami menerima mesej rentetan daripada topik Kafka bernama "topik-input", melakukan pengiraan panjang pada mesej itu, dan kemudian menghantar hasilnya ke topik Kafka bernama "topik-output" . Berikut ialah kod sampel:

  1. import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    
    import java.util.Properties;
    
    public class KafkaStreamsApp {
    
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    
            StreamsBuilder builder = new StreamsBuilder();
            // 在这里添加流处理和计算逻辑
    
            Topology topology = builder.build();
            KafkaStreams streams = new KafkaStreams(topology, props);
            streams.start();
    
            // 添加Shutdown Hook,确保应用程序在关闭时能够优雅地停止
            Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        }
    }
  2. Dalam kod sampel di atas, objek KStream dibuat pertama kali daripada topik input, dan kemudian setiap mesej dibahagikan kepada perkataan menggunakan operasi flatMapValues ​​​​dan dikira secara statistik. Akhirnya, keputusan dihantar ke topik output.
    3. Contoh Demonstrasi:
  1. Untuk mengesahkan aplikasi pemprosesan dan pengkomputeran aliran masa nyata kami, anda boleh menggunakan alat baris arahan Kafka untuk menghantar mesej dan melihat hasil. Berikut ialah langkah-langkah untuk contoh demonstrasi:
Buat topik input dan output:

Laksanakan arahan berikut dalam baris arahan untuk mencipta topik Kafka bernama "topik-input" dan "topik-output":

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Arrays;

public class KafkaStreamsApp {

    // 省略其他代码...
    
    public static void main(String[] args) {
        // 省略其他代码...
        
        KStream<String, String> inputStream = builder.stream("input-topic");
        KTable<String, Long> wordCounts = inputStream
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
                .groupBy((key, word) -> word)
                .count();

        wordCounts.toStream().to("output-topic");

        // 省略其他代码...
    }
}

Hantar mesej Kepada topik input:
    Laksanakan arahan berikut dalam baris arahan untuk menghantar beberapa mesej ke "input-topic":

  1. bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
    bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  2. bin/kafka-console-consumer.sh --topic output-topic --from- permulaan --bootstrap -server localhost:9092
bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092
>hello world
>apache kafka streams
>real-time processing
>```

3. 查看结果:
在命令行中执行以下命令,从"output-topic"中消费结果消息:
    masa nyata: 1
  1. pemprosesan: 1
    apache: 1
  2. kafka: 1
strim: 1

hello: 2

dunia: 1

rreee

Atas ialah kandungan terperinci Pembangunan Java: Cara menggunakan Apache Kafka Streams untuk pemprosesan dan pengkomputeran strim masa nyata. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn