ホームページ >Java >&#&チュートリアル >Java を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法

Java を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法

WBOY
WBOYオリジナル
2023-09-21 08:23:041073ブラウズ

如何使用Java开发一个基于Apache Kafka和KSQL的流处理应用

Java を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法

ストリーム処理は、リアルタイム データ ストリームを処理し、データを処理できるテクノロジです。到着したらすぐに分析して処理します。 Apache Kafka は、スケーラブルなストリーム処理アプリケーションを効率的に構築するために使用できる分散ストリーム処理プラットフォームです。 KSQL は、SQL クエリとリアルタイム ストリーム データの変換に使用できるオープン ソースのストリーム データ処理エンジンです。この記事では、Java を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法を紹介します。

1. 環境セットアップ
始める前に、ローカルの Kafka および KSQL 環境をセットアップする必要があります。まず、Java JDK、Apache Kafka、Confluent Platform をダウンロードしてインストールする必要があります。次に、次のコマンドを使用して Kafka と KSQL を起動します。

  1. Start ZooKeeper:
    bin/zookeeper-server-start.sh config/zookeeper.properties
  2. Start Kafka Broker :
    bin/kafka-server-start.sh config/server.properties
  3. KSQL サーバーの起動:
    bin/ksql-server-start.sh config/ksql-server.properties

2. Kafka トピックと KSQL テーブルの作成
Java コードを書き始める前に、Kafka トピックを作成し、そこにリアルタイム データを書き込む必要があります。次のコマンドを使用して、「example-topic」という名前のトピックを作成できます:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic example-topic --partitions 1 --replication-factor 1

次に、リアルタイム データのクエリと変換を行うためのテーブルを KSQL に作成する必要があります。次のコマンドを使用して、KSQL ターミナルで「example-table」という名前のテーブルを作成できます:

CREATE TABLE example_table (key VARCHAR, value VARCHAR) WITH (kafka_topic='example-topic', value_format='json) ', key='key');

3. Java コードの実装
Java コードを書き始める前に、Kafka と KSQL への依存関係を追加する必要があります。 Maven または Gradle 構成ファイルに次の依存関係を追加できます:

Maven:

b4b38e33757a6497aa8690936b905cc1

<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>

09a0e22e5aaafd848ae04665be625b91

<groupId>io.confluent</groupId>
<artifactId>ksql-serde</artifactId>
<version>0.10.0</version>

09a0e22e5aaafd848ae04665be625b91

Gradle:

実装 'org.apache.kafka:kafka-clients:2.5.0'
実装 'io。 confluent:ksql-serde:0.10.0'

次に、ストリーム処理アプリケーションを実装する Java コードを記述します。以下は簡単なサンプル コードです:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.Producer.*;
import org.apache .kafka.common.*;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams .*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.processor.Wall ClockTimestampExtractor;
import org.apache.kafka.streams.state.* ;
import java.util.*;
import java.util.concurrent.*;

public class StreamProcessingApp {

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    StreamsBuilder builder = new StreamsBuilder();

    // Step 1: Read from Kafka topic
    KStream<String, String> stream = builder.stream("example-topic");

    // Step 2: Transform and process the data
    stream.mapValues(value -> value.toUpperCase())
          .filter((key, value) -> value.startsWith("A"))
          .to("processed-topic");

    // Step 3: Create a Kafka producer to send data to another topic
    Properties producerProps = new Properties();
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

    // Step 4: Consume and process the data from the processed topic
    KStream<String, String> processedStream = builder.stream("processed-topic");
    processedStream.foreach((key, value) -> {
        // Process the data here
        System.out.println("Key: " + key + ", Value: " + value);
    });

    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    streams.start();
}

}

上記のコード「example-topic」トピックからリアルタイム データを読み取り、そのデータを大文字に変換し、文字「A」で始まるデータを「processed-topic」トピックに書き込む、単純なストリーム処理アプリケーションを作成します。同時に、「processed-topic」トピック内のデータも消費して処理します。

4. アプリケーションの実行
Java コードを作成した後、次のコマンドを使用してアプリケーションをコンパイルし、実行できます:

javac StreamProcessingApp.java
java StreamProcessingApp

これで、Apache Kafka と KSQL に基づくストリーム処理アプリケーションの開発に成功し、Java コードを通じてデータの読み取り、変換、処理、書き込みを実装できました。実際のニーズに応じてコードを変更および拡張し、ビジネス ニーズを満たすことができます。この記事がお役に立てば幸いです!

以上がJava を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。