ホームページ >Java >&#&チュートリアル >Java を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法
Java を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法
ストリーム処理は、リアルタイム データ ストリームを処理し、データを処理できるテクノロジです。到着したらすぐに分析して処理します。 Apache Kafka は、スケーラブルなストリーム処理アプリケーションを効率的に構築するために使用できる分散ストリーム処理プラットフォームです。 KSQL は、SQL クエリとリアルタイム ストリーム データの変換に使用できるオープン ソースのストリーム データ処理エンジンです。この記事では、Java を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法を紹介します。
1. 環境セットアップ
始める前に、ローカルの Kafka および KSQL 環境をセットアップする必要があります。まず、Java JDK、Apache Kafka、Confluent Platform をダウンロードしてインストールする必要があります。次に、次のコマンドを使用して Kafka と KSQL を起動します。
2. Kafka トピックと KSQL テーブルの作成
Java コードを書き始める前に、Kafka トピックを作成し、そこにリアルタイム データを書き込む必要があります。次のコマンドを使用して、「example-topic」という名前のトピックを作成できます:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic example-topic --partitions 1 --replication-factor 1
次に、リアルタイム データのクエリと変換を行うためのテーブルを KSQL に作成する必要があります。次のコマンドを使用して、KSQL ターミナルで「example-table」という名前のテーブルを作成できます:
CREATE TABLE example_table (key VARCHAR, value VARCHAR) WITH (kafka_topic='example-topic', value_format='json) ', key='key');
3. Java コードの実装
Java コードを書き始める前に、Kafka と KSQL への依存関係を追加する必要があります。 Maven または Gradle 構成ファイルに次の依存関係を追加できます:
Maven:
b4b38e33757a6497aa8690936b905cc1
<groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version>
09a0e22e5aaafd848ae04665be625b91 09a0e22e5aaafd848ae04665be625b91 Gradle: 実装 'org.apache.kafka:kafka-clients:2.5.0' 次に、ストリーム処理アプリケーションを実装する Java コードを記述します。以下は簡単なサンプル コードです: import org.apache.kafka.clients.consumer.*; public class StreamProcessingApp { } 上記のコード「example-topic」トピックからリアルタイム データを読み取り、そのデータを大文字に変換し、文字「A」で始まるデータを「processed-topic」トピックに書き込む、単純なストリーム処理アプリケーションを作成します。同時に、「processed-topic」トピック内のデータも消費して処理します。 4. アプリケーションの実行 javac StreamProcessingApp.java これで、Apache Kafka と KSQL に基づくストリーム処理アプリケーションの開発に成功し、Java コードを通じてデータの読み取り、変換、処理、書き込みを実装できました。実際のニーズに応じてコードを変更および拡張し、ビジネス ニーズを満たすことができます。この記事がお役に立てば幸いです! 以上がJava を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。<groupId>io.confluent</groupId>
<artifactId>ksql-serde</artifactId>
<version>0.10.0</version>
実装 'io。 confluent:ksql-serde:0.10.0'
import org.apache.kafka.clients.Producer.*;
import org.apache .kafka.common.*;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams .*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.processor.Wall ClockTimestampExtractor;
import org.apache.kafka.streams.state.* ;
import java.util.*;
import java.util.concurrent.*;public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// Step 1: Read from Kafka topic
KStream<String, String> stream = builder.stream("example-topic");
// Step 2: Transform and process the data
stream.mapValues(value -> value.toUpperCase())
.filter((key, value) -> value.startsWith("A"))
.to("processed-topic");
// Step 3: Create a Kafka producer to send data to another topic
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
// Step 4: Consume and process the data from the processed topic
KStream<String, String> processedStream = builder.stream("processed-topic");
processedStream.foreach((key, value) -> {
// Process the data here
System.out.println("Key: " + key + ", Value: " + value);
});
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
Java コードを作成した後、次のコマンドを使用してアプリケーションをコンパイルし、実行できます:
java StreamProcessingApp