Java 개발: 실시간 스트림 처리 및 컴퓨팅을 위해 Apache Kafka Streams를 사용하는 방법
소개:
빅 데이터 및 실시간 컴퓨팅의 등장과 함께 스트림 처리 엔진인 Apache Kafka Streams가 개발되고 있습니다. 개발자가 점점 더 많이 사용하고 있습니다. 실시간 스트리밍 데이터를 처리하고 복잡한 스트림 처리 및 계산을 수행하는 간단하면서도 강력한 방법을 제공합니다. 이 기사에서는 환경 구성, 코드 작성, 샘플 데모를 포함하여 실시간 스트림 처리 및 컴퓨팅을 위해 Apache Kafka Streams를 사용하는 방법을 소개합니다.
1. 준비:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.8.1</version> </dependency>스트림 처리 및 계산 로직 추가:
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; public class KafkaStreamsApp { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); StreamsBuilder builder = new StreamsBuilder(); // 在这里添加流处理和计算逻辑 Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, props); streams.start(); // 添加Shutdown Hook,确保应用程序在关闭时能够优雅地停止 Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
명령줄에서 다음 명령을 실행하여 "input-topic" 및 "output-topic"이라는 Kafka 주제를 생성합니다.
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import java.util.Arrays; public class KafkaStreamsApp { // 省略其他代码... public static void main(String[] args) { // 省略其他代码... KStream<String, String> inputStream = builder.stream("input-topic"); KTable<String, Long> wordCounts = inputStream .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+"))) .groupBy((key, word) -> word) .count(); wordCounts.toStream().to("output-topic"); // 省略其他代码... } }
bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092 >hello world >apache kafka streams >real-time processing >``` 3. 查看结果: 在命令行中执行以下命令,从"output-topic"中消费结果消息:
hello: 2
world: 1可以看到,输出的结果是单词及其对应的计数值:
위 내용은 Java 개발: 실시간 스트림 처리 및 컴퓨팅을 위해 Apache Kafka Streams를 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!