ホームページ >Java >&#&チュートリアル >Java を使用して Apache Kafka Streams に基づくストリーム処理アプリケーションを開発する方法
Java を使用して Apache Kafka Streams に基づくストリーム処理アプリケーションを開発する方法
はじめに:
Apache Kafka Streams は、次のような強力なストリーム処理フレームワークです。高性能、スケーラブル、フォールトトレラントなリアルタイム ストリーム処理アプリケーションの開発に使用されます。これは Apache Kafka 上に構築されており、入力と出力の Kafka トピックを接続することで生のデータ ストリームを処理できるようにするシンプルで強力な API を提供します。この記事では、Java を使用して Apache Kafka Streams に基づくストリーム処理アプリケーションを開発する方法を紹介し、いくつかのコード例を示します。
1. 準備作業:
Apache Kafka Streams の使用を開始する前に、いくつかの準備作業を完了する必要があります。まず、Apache Kafka がインストールされ、実行されていることを確認します。 Kafka クラスターでは、入力データ用と出力結果用の 2 つのトピックを作成する必要があります。次のコマンドを使用してこれらのトピックを作成できます:
bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
同時に、Java プロジェクトに次の依存関係を必ず追加してください:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.4.0</version> </dependency>
2. ストリーム処理アプリケーションを作成します。
Continue 次に、単純なストリーム処理アプリケーションを作成します。この例では、入力トピックからデータを読み取り、データを変換し、結果を出力トピックに書き込みます。以下は簡単な実装例です:
import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; import java.util.Properties; public class StreamProcessingApp { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> inputStream = builder.stream("input-topic"); KStream<String, String> outputStream = inputStream .mapValues(value -> value.toUpperCase()); outputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String())); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } }
上記のコードでは、最初にアプリケーション ID やブートストラップ サーバーなどのいくつかの構成プロパティを定義します。次に、StreamsBuilder インスタンスを作成し、入力トピックからストリームを取得しました。次に、ストリーム内の各値を大文字にキャストし、結果を出力トピックに書き込みました。最後に、KafkaStreams インスタンスを作成し、ストリーム処理アプリケーションを開始しました。
3. アプリケーションを実行します:
ストリーム処理アプリケーションを作成した後、次のコマンドを使用してアプリケーションを実行できます:
java -cp your-project.jar StreamProcessingApp
your-project.jar を必ず置き換えてください。実際のプロジェクトの jar ファイル名。アプリケーションを実行すると、入力トピック内のデータの処理が開始され、変換された結果が出力トピックに書き込まれます。
結論:
Java を使用して、Apache Kafka Streams に基づくストリーム処理アプリケーションを開発するのは非常に簡単です。入力と出力の Kafka トピックを接続し、強力な Kafka Streams API を使用することで、高性能、スケーラブル、フォールトトレラントなリアルタイム ストリーム処理アプリケーションを簡単に構築できます。この記事が、Kafka Streams を使い始めて、実際のプロジェクトで使用するのに役立つことを願っています。
以上がJava を使用して Apache Kafka Streams に基づくストリーム処理アプリケーションを開発する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。