ホームページ >Java >&#&チュートリアル >Spring Boot と Apache Kafka ストリームを使用してストリーム処理アプリケーションを構築する
ビッグデータ時代の到来に伴い、リアルタイムのデータ処理と分析のニーズを満たすために、ますます多くの企業がストリーム処理テクノロジーに注目し始めています。 Apache Kafka は、ストリーム処理の分野で事実上の標準となっている、高スループットでスケーラブルな分散メッセージ キュー システムです。 Spring Boot は Spring アプリケーションを迅速に開発するためのツールであり、ストリーム処理アプリケーションをより迅速かつ簡単に構築するのに役立ちます。この記事では、Spring Boot と Apache Kafka Streams を使用してストリーム処理アプリケーションを構築する方法を紹介し、これら 2 つのツールの長所と短所、およびアプリケーションのパフォーマンスを最適化する方法について説明します。
アプリケーションの構築を開始する前に、まず Kafka トピックを作成する必要があります。この記事では、Web サイト上のユーザー クリック イベントを保存する「user-clicks」というトピックを作成します。
コマンド ラインで次のコマンドを実行します。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user-clicks
これにより、Kafka サーバー上に「user-clicks」という名前のトピックが作成されます。このトピックには、パーティションが 1 つとローカル コピーだけあります。
次に、Spring Boot を使用して基本的なアプリケーションを作成します。 Spring Boot では、Spring Initializr を使用して基本的なアプリケーションを迅速に作成できます。アプリケーションを作成するときは、必ず次の依存関係を選択してください:
アプリケーションを作成した後、次の依存関係:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.6.0</version> </dependency>
これにより、Kafka ストリーム処理 API が提供されます。
これで、Kafka ストリーム処理コードの作成を開始できます。アプリケーションを作成するときに、「UserController」というコントローラー クラスを定義しました。ここで、コントローラー クラスに「clicks」という名前の POST リクエスト ハンドラーを追加します。このハンドラーは、POST リクエストからユーザーのクリック イベントを取得し、「user-clicks」という名前の Kafka トピックに送信します。コードは次のとおりです。
@RestController public class UserController { private final KafkaTemplate<String, String> kafkaTemplate; @Autowired public UserController(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @PostMapping("/clicks") public void clicks(@RequestBody String click) { kafkaTemplate.send("user-clicks", click); } }
上記のコードでは、Spring の依存関係注入関数を使用して、「kafkaTemplate」という名前の KafkaTemplate オブジェクトを注入します。このオブジェクトは、Kafka トピックにメッセージを送信するために使用できます。
次に、「user-clicks」トピックから受信したクリック イベントを処理するための Kafka ストリーム処理トポロジを作成します。この例では、Kafka Streams API を使用してストリーム処理トポロジを実装します。
Spring Boot アプリケーションでは、Kafka Streams API を使用してクリック イベントを処理する「UserClicksStream」というクラスを作成します。コードは次のとおりです。
@Configuration @EnableKafkaStreams public class UserClicksStream { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public KStream<String, String> kStream(StreamsBuilder builder) { KStream<String, String> stream = builder.stream("user-clicks"); stream.foreach((key, value) -> { System.out.println("Received: " + value); }); return stream; } @Bean public KafkaStreams kafkaStreams(StreamsBuilder builder) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-clicks-stream"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); return new KafkaStreams(builder.build(), props); } }
上記のコードでは、Spring の依存関係注入関数を使用して、「StreamsBuilder」という名前の StreamsBuilder オブジェクトを注入します。このオブジェクトは、Kafka ストリーム処理トポロジを作成するために使用されます。
kStream メソッドでは、「ユーザークリック」トピックから KStream オブジェクトを作成し、foreach メソッドを使用して受信したイベントを出力します。 froeach は、後の手順で使用する端末操作です。
kafkaStreams メソッドでは、「user-clicks-stream」という名前のアプリケーションを作成し、Kafka サーバーのアドレスを指定します。このアプリケーションは、前のトポロジで定義したストリーム処理操作を自動的に実行します。
これで、アプリケーションのすべてのコードが作成されました。アプリケーションを実行する前に、Kafka サーバーを起動する必要があります。
コマンド ラインで次のコマンドを実行します:
bin/kafka-server-start.sh config/server.properties
これにより、Kafka サーバーが起動します。これでアプリケーションを開始できるようになりました。
コマンド ラインで次のコマンドを実行します:
mvn spring-boot:run
これにより、アプリケーションが起動します。これで、cURL や Postman などの HTTP クライアントを使用してアプリケーションに POST リクエストを送信できるようになりました。各リクエストはクリック イベントを生成し、コンソールに出力します。
トポロジでさらに多くの操作 (集計、ウィンドウ計算など) を実行したい場合は、Kafka Streams API によって提供される他の操作を使用してトポロジを構築できます。
Spring Boot と Apache Kafka Streams を使用してストリーム処理アプリケーションを構築することは、リアルタイム データをより簡単に処理するのに役立つ高速で便利な方法です。ただし、トポロジ設計、バッファ サイズ、ストリーム処理時間など、いくつかの最適化パフォーマンスの問題に注意する必要があります。これらの問題を理解することで、効率的なストリーム処理アプリケーションをより適切に構築できるようになります。
以上がSpring Boot と Apache Kafka ストリームを使用してストリーム処理アプリケーションを構築するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。