ホームページ >Java >&#&チュートリアル >Java 開発: Apache Kafka Streams を使用してリアルタイムのストリーム処理とコンピューティングを行う方法
Java 開発: Apache Kafka Streams を使用してリアルタイムのストリーム処理とコンピューティングを行う方法
はじめに:
ビッグ データとリアルタイムの台頭によりコンピューティング、Apache Kafka Streams ストリーム処理エンジンとして、ますます多くの開発者によって使用されています。これは、リアルタイム ストリーミング データを処理し、複雑なストリーム処理と計算を実行するためのシンプルかつ強力な方法を提供します。この記事では、環境の構成、コードの記述、サンプル デモンストレーションなど、リアルタイムのストリーム処理とコンピューティングに Apache Kafka Streams を使用する方法を紹介します。
1. 準備:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.8.1</version> </dependency>
2. コードを記述します。
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; public class KafkaStreamsApp { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); StreamsBuilder builder = new StreamsBuilder(); // 在这里添加流处理和计算逻辑 Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, props); streams.start(); // 添加Shutdown Hook,确保应用程序在关闭时能够优雅地停止 Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import java.util.Arrays; public class KafkaStreamsApp { // 省略其他代码... public static void main(String[] args) { // 省略其他代码... KStream<String, String> inputStream = builder.stream("input-topic"); KTable<String, Long> wordCounts = inputStream .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+"))) .groupBy((key, word) -> word) .count(); wordCounts.toStream().to("output-topic"); // 省略其他代码... } }
上記のサンプル コードでは、最初に入力トピックから KStream オブジェクトが作成され、次に flatMapValues 操作を使用して各メッセージが単語に分割され、統計的なカウントを実行します。最後に、結果が出力トピックに送信されます。
3. デモの例:
リアルタイム ストリーム処理およびコンピューティング アプリケーションを検証するには、Kafka コマンド ライン ツールを使用してメッセージを送信し、結果を表示します。デモンストレーション例の手順は次のとおりです。
bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092 >hello world >apache kafka streams >real-time processing >``` 3. 查看结果: 在命令行中执行以下命令,从"output-topic"中消费结果消息:
bin/kafka-console-consumer.sh --topic 出力トピック --from-beginning --bootstrap-server localhost:9092
可以看到,输出的结果是单词及其对应的计数值:
real-time: 1
processing : 1
apache: 1
kafka: 1
streams: 1
hello: 2
world: 1
结论: 通过上述示例,我们了解了如何使用Apache Kafka Streams进行实时流处理和计算。可以根据实际需求,编写更复杂的流处理和计算逻辑,并通过Kafka命令行工具来验证和查看结果。希望本文对于Java开发人员在实时流处理和计算领域有所帮助。 参考文档: 1. Apache Kafka官方文档:https://kafka.apache.org/documentation/ 2. Kafka Streams官方文档:https://kafka.apache.org/documentation/streams/
以上がJava 開発: Apache Kafka Streams を使用してリアルタイムのストリーム処理とコンピューティングを行う方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。