>Java >java지도 시간 >Java 개발: 실시간 스트림 처리 및 컴퓨팅을 위해 Apache Kafka Streams를 사용하는 방법

Java 개발: 실시간 스트림 처리 및 컴퓨팅을 위해 Apache Kafka Streams를 사용하는 방법

WBOY
WBOY원래의
2023-09-21 12:39:241511검색

Java开发:如何使用Apache Kafka Streams进行实时流处理和计算

Java 개발: 실시간 스트림 처리 및 컴퓨팅을 위해 Apache Kafka Streams를 사용하는 방법

소개:
빅 데이터 및 실시간 컴퓨팅의 등장과 함께 스트림 처리 엔진인 Apache Kafka Streams가 개발되고 있습니다. 개발자가 점점 더 많이 사용하고 있습니다. 실시간 스트리밍 데이터를 처리하고 복잡한 스트림 처리 및 계산을 수행하는 간단하면서도 강력한 방법을 제공합니다. 이 기사에서는 환경 구성, 코드 작성, 샘플 데모를 포함하여 실시간 스트림 처리 및 컴퓨팅을 위해 Apache Kafka Streams를 사용하는 방법을 소개합니다.

1. 준비:

  1. Apache Kafka 설치 및 구성: Apache Kafka를 다운로드하여 설치하고 Apache Kafka 클러스터를 시작해야 합니다. 자세한 설치 및 구성은 Apache Kafka 공식 문서를 참고하세요.
  2. 종속성 소개: Kafka Streams 관련 종속성을 Java 프로젝트에 도입합니다. 예를 들어 Maven을 사용하면 프로젝트의 클러스터 연결 정보에 다음 종속성을 추가할 수 있습니다. 다음은 간단한 샘플 코드입니다.
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.8.1</version>
</dependency>

스트림 처리 및 계산 로직 추가:
    Kafka Streams 애플리케이션을 생성한 후 특정 스트림 처리 및 계산 로직을 추가해야 합니다. 간단한 예를 들어 "input-topic"이라는 Kafka 주제로부터 문자열 메시지를 받고 메시지에 대한 길이 계산을 수행한 다음 결과를 "output-topic"이라는 Kafka 주제로 보낸다고 가정합니다. 다음은 샘플 코드입니다.

  1. import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    
    import java.util.Properties;
    
    public class KafkaStreamsApp {
    
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    
            StreamsBuilder builder = new StreamsBuilder();
            // 在这里添加流处理和计算逻辑
    
            Topology topology = builder.build();
            KafkaStreams streams = new KafkaStreams(topology, props);
            streams.start();
    
            // 添加Shutdown Hook,确保应用程序在关闭时能够优雅地停止
            Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        }
    }
  2. 위 샘플 코드에서는 먼저 입력 주제에서 KStream 객체를 생성한 후 flatMapValues ​​연산을 사용하여 각 메시지를 단어로 분할하고 통계적으로 계산합니다. 마지막으로 결과가 출력 주제로 전송됩니다.
    3. 샘플 데모:
  1. 실시간 스트림 처리 및 컴퓨팅 애플리케이션을 확인하기 위해 Kafka 명령줄 도구를 사용하여 메시지를 보내고 결과를 볼 수 있습니다. 예시 데모 단계는 다음과 같습니다.
입력 및 출력 주제 생성:

명령줄에서 다음 명령을 실행하여 "input-topic" 및 "output-topic"이라는 Kafka 주제를 생성합니다.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Arrays;

public class KafkaStreamsApp {

    // 省略其他代码...
    
    public static void main(String[] args) {
        // 省略其他代码...
        
        KStream<String, String> inputStream = builder.stream("input-topic");
        KTable<String, Long> wordCounts = inputStream
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
                .groupBy((key, word) -> word)
                .count();

        wordCounts.toStream().to("output-topic");

        // 省略其他代码...
    }
}

Send 메시지 입력 주제로:
    명령줄에서 다음 명령을 실행하여 "input-topic"에 일부 메시지를 보냅니다:

  1. bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
    bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  2. bin/kafka-console-consumer.sh --topic output-topic --from- 시작 --bootstrap -server localhost:9092
bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092
>hello world
>apache kafka streams
>real-time processing
>```

3. 查看结果:
在命令行中执行以下命令,从"output-topic"中消费结果消息:
    실시간: 1
  1. 처리: 1
    apache: 1
  2. kafka: 1
streams: 1

hello: 2

world: 1

可以看到,输出的结果是单词及其对应的计数值:

위 내용은 Java 개발: 실시간 스트림 처리 및 컴퓨팅을 위해 Apache Kafka Streams를 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.