>  기사  >  Java  >  Java를 사용하여 Apache Kafka Streams 기반 스트림 처리 애플리케이션을 개발하는 방법

Java를 사용하여 Apache Kafka Streams 기반 스트림 처리 애플리케이션을 개발하는 방법

PHPz
PHPz원래의
2023-09-21 13:42:211355검색

如何使用Java开发一个基于Apache Kafka Streams的流处理应用

Java를 사용하여 Apache Kafka Streams 기반 스트림 처리 애플리케이션을 개발하는 방법

소개:
Apache Kafka Streams는 고성능, 확장 가능, 내결함성을 갖춘 실제 스트림 처리 프레임워크를 개발하는 데 사용할 수 있는 강력한 스트림 처리 프레임워크입니다. 시간 흐름 처리 애플리케이션. Apache Kafka를 기반으로 구축되었으며 입력 및 출력 Kafka 항목을 연결하여 원시 데이터 스트림을 처리할 수 있는 간단하고 강력한 API를 제공합니다. 이 기사에서는 Java를 사용하여 Apache Kafka Streams 기반 스트림 처리 애플리케이션을 개발하는 방법을 소개하고 몇 가지 코드 예제를 제공합니다.

1. 준비 작업:
Apache Kafka Streams를 사용하기 전에 몇 가지 준비 작업을 완료해야 합니다. 먼저 Apache Kafka가 설치되어 실행 중인지 확인하세요. Kafka 클러스터에서는 두 개의 토픽(입력 데이터용 토픽과 출력 결과용 토픽)을 생성해야 합니다. 다음 명령을 사용하여 이러한 주제를 생성할 수 있습니다.

bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

동시에 Java 프로젝트에 다음 종속성을 추가해야 합니다.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.4.0</version>
</dependency>

2. 스트림 처리 애플리케이션 작성:
다음으로 간단한 코드를 작성하겠습니다. 스트림 처리 앱. 이 예에서는 입력 주제에서 데이터를 읽고 데이터를 변환한 다음 결과를 출력 주제에 씁니다. 다음은 간단한 구현 예입니다.

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;

import java.util.Properties;

public class StreamProcessingApp {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> inputStream = builder.stream("input-topic");

        KStream<String, String> outputStream = inputStream
                .mapValues(value -> value.toUpperCase());

        outputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

위 코드에서는 먼저 애플리케이션 ID 및 부트스트랩 서버와 같은 일부 구성 속성을 정의합니다. 그런 다음 StreamsBuilder 인스턴스를 생성하고 입력 주제에서 스트림을 얻었습니다. 다음으로 스트림의 각 값을 대문자로 변환하고 결과를 출력 주제에 썼습니다. 마지막으로 KafkaStreams 인스턴스를 생성하고 스트림 처리 애플리케이션을 시작했습니다.

3. 애플리케이션 실행:
스트림 처리 애플리케이션을 작성한 후 다음 명령을 사용하여 애플리케이션을 실행할 수 있습니다.

java -cp your-project.jar StreamProcessingApp

your-project.jar을 실제 프로젝트 jar 파일 이름으로 바꾸십시오. 애플리케이션을 실행한 후 입력 주제의 데이터 처리를 시작하고 변환된 결과를 출력 주제에 씁니다.

결론:
Java를 사용하여 Apache Kafka Streams 기반 스트림 처리 애플리케이션을 개발하는 것은 매우 간단합니다. 입력 및 출력 Kafka 주제를 연결하고 강력한 Kafka Streams API를 사용하여 고성능, 확장 가능, 내결함성 실시간 스트림 처리 애플리케이션을 쉽게 구축할 수 있습니다. 이 기사가 Kafka Streams를 시작하고 실제 프로젝트에서 사용하는 데 도움이 되기를 바랍니다.

위 내용은 Java를 사용하여 Apache Kafka Streams 기반 스트림 처리 애플리케이션을 개발하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.