ホームページ >Java >&#&チュートリアル >Java を使用して Apache Kafka Streams に基づくストリーム処理アプリケーションを開発する方法

Java を使用して Apache Kafka Streams に基づくストリーム処理アプリケーションを開発する方法

PHPz
PHPzオリジナル
2023-09-21 13:42:211367ブラウズ

如何使用Java开发一个基于Apache Kafka Streams的流处理应用

Java を使用して Apache Kafka Streams に基づくストリーム処理アプリケーションを開発する方法

はじめに:
Apache Kafka Streams は、次のような強力なストリーム処理フレームワークです。高性能、スケーラブル、フォールトトレラントなリアルタイム ストリーム処理アプリケーションの開発に使用されます。これは Apache Kafka 上に構築されており、入力と出力の Kafka トピックを接続することで生のデータ ストリームを処理できるようにするシンプルで強力な API を提供します。この記事では、Java を使用して Apache Kafka Streams に基づくストリーム処理アプリケーションを開発する方法を紹介し、いくつかのコード例を示します。

1. 準備作業:
Apache Kafka Streams の使用を開始する前に、いくつかの準備作業を完了する必要があります。まず、Apache Kafka がインストールされ、実行されていることを確認します。 Kafka クラスターでは、入力データ用と出力結果用の 2 つのトピックを作成する必要があります。次のコマンドを使用してこれらのトピックを作成できます:

bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

同時に、Java プロジェクトに次の依存関係を必ず追加してください:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.4.0</version>
</dependency>

2. ストリーム処理アプリケーションを作成します。
Continue 次に、単純なストリーム処理アプリケーションを作成します。この例では、入力トピックからデータを読み取り、データを変換し、結果を出力トピックに書き込みます。以下は簡単な実装例です:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;

import java.util.Properties;

public class StreamProcessingApp {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> inputStream = builder.stream("input-topic");

        KStream<String, String> outputStream = inputStream
                .mapValues(value -> value.toUpperCase());

        outputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

上記のコードでは、最初にアプリケーション ID やブートストラップ サーバーなどのいくつかの構成プロパティを定義します。次に、StreamsBuilder インスタンスを作成し、入力トピックからストリームを取得しました。次に、ストリーム内の各値を大文字にキャストし、結果を出力トピックに書き込みました。最後に、KafkaStreams インスタンスを作成し、ストリーム処理アプリケーションを開始しました。

3. アプリケーションを実行します:
ストリーム処理アプリケーションを作成した後、次のコマンドを使用してアプリケーションを実行できます:

java -cp your-project.jar StreamProcessingApp

your-project.jar を必ず置き換えてください。実際のプロジェクトの jar ファイル名。アプリケーションを実行すると、入力トピック内のデータの処理が開始され、変換された結果が出力トピックに書き込まれます。

結論:
Java を使用して、Apache Kafka Streams に基づくストリーム処理アプリケーションを開発するのは非常に簡単です。入力と出力の Kafka トピックを接続し、強力な Kafka Streams API を使用することで、高性能、スケーラブル、フォールトトレラントなリアルタイム ストリーム処理アプリケーションを簡単に構築できます。この記事が、Kafka Streams を使い始めて、実際のプロジェクトで使用するのに役立つことを願っています。

以上がJava を使用して Apache Kafka Streams に基づくストリーム処理アプリケーションを開発する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。