>  기사  >  Java  >  Java를 사용하여 Apache Kafka 및 KSQL 기반 스트림 처리 애플리케이션을 개발하는 방법

Java를 사용하여 Apache Kafka 및 KSQL 기반 스트림 처리 애플리케이션을 개발하는 방법

WBOY
WBOY원래의
2023-09-21 08:23:04965검색

如何使用Java开发一个基于Apache Kafka和KSQL的流处理应用

Java를 사용하여 Apache Kafka 및 KSQL 기반의 스트림 처리 애플리케이션을 개발하는 방법

스트림 처리는 데이터가 도착하는 즉시 분석하고 처리할 수 있도록 실시간 데이터 스트림을 처리하는 기술입니다. Apache Kafka는 확장 가능한 스트림 처리 애플리케이션을 효율적으로 구축하는 데 사용할 수 있는 분산 스트림 처리 플랫폼입니다. KSQL은 실시간 스트림 데이터의 SQL 쿼리 및 변환에 사용할 수 있는 오픈 소스 스트림 데이터 처리 엔진입니다. 이 기사에서는 Java를 사용하여 Apache Kafka 및 KSQL 기반의 스트림 처리 애플리케이션을 개발하는 방법을 소개합니다.

1. 환경 설정
시작하기 전에 로컬 Kafka 및 KSQL 환경을 설정해야 합니다. 먼저 Java JDK, Apache Kafka 및 Confluent Platform을 다운로드하여 설치해야 합니다. 그런 다음 다음 명령을 사용하여 Kafka 및 KSQL을 시작할 수 있습니다.

  1. ZooKeeper 시작:
    bin/zookeeper-server-start.sh config/zookeeper.properties
  2. Kafka 브로커 시작:
    bin/kafka-server-start.sh config /server.properties
  3. KSQL 서버 시작:
    bin/ksql-server-start.sh config/ksql-server.properties

2. Kafka 주제 및 KSQL 테이블 만들기
Java 코드 작성을 시작하기 전에 먼저 해야 할 일이 있습니다. Kafka 주제를 생성하고 여기에 실시간 데이터를 씁니다. 다음 명령을 사용하여 "example-topic"이라는 주제를 생성할 수 있습니다:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic example-topic --partitions 1 --replication - 요소 1

다음으로 실시간 데이터를 쿼리하고 변환하기 위해 KSQL에서 테이블을 생성해야 합니다. 다음 명령을 사용하여 KSQL 터미널에서 "example-table"이라는 테이블을 생성할 수 있습니다:

CREATE TABLE example_table (key VARCHAR, value VARCHAR) WITH (kafka_topic='example-topic', value_format='json', key= 'key');

3. Java 코드 구현
Java 코드 작성을 시작하기 전에 Kafka 및 KSQL에 대한 종속성을 추가해야 합니다. Maven 또는 Gradle 구성 파일에 다음 종속성을 추가할 수 있습니다.

Maven:

f91694a7232a35a7fec4f6bb68aa9390

<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>

b470ebe8eba2e92148f0dcc59db521d6
f91694a7232a35a7fec4f6bb68aa9390

<groupId>io.confluent</groupId>
<artifactId>ksql-serde</artifactId>
<version>0.10.0</version>

b470ebe8eba2e92148f0dcc59db521d6

Gradle:

implementation 'org.apache.kafka:kafka-clients:2.5.0'
implementation 'io.confluent:ksql-serde:0.10.0'

다음으로 스트림 처리 애플리케이션을 구현하는 Java 코드를 작성할 수 있습니다. 다음은 간단한 예제 코드입니다.

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.*;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream . *;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.streams.state.*;
import java.util.*;
import java.util.concurrent.*;

public class StreamProcessingApp {

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    StreamsBuilder builder = new StreamsBuilder();

    // Step 1: Read from Kafka topic
    KStream<String, String> stream = builder.stream("example-topic");

    // Step 2: Transform and process the data
    stream.mapValues(value -> value.toUpperCase())
          .filter((key, value) -> value.startsWith("A"))
          .to("processed-topic");

    // Step 3: Create a Kafka producer to send data to another topic
    Properties producerProps = new Properties();
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

    // Step 4: Consume and process the data from the processed topic
    KStream<String, String> processedStream = builder.stream("processed-topic");
    processedStream.foreach((key, value) -> {
        // Process the data here
        System.out.println("Key: " + key + ", Value: " + value);
    });

    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    streams.start();
}

}

위 코드는 "example-topic" 주제의 실시간 데이터를 읽고 데이터를 대문자로 변환한 후 문자 "A"로 쓰는 간단한 스트림 처리 애플리케이션을 구현합니다. 처음의 데이터는 "processed-topic" 주제에 기록됩니다. 동시에 "처리된 주제" 주제의 데이터도 소비하고 처리합니다.

4. 애플리케이션 실행
Java 코드를 작성한 후 다음 명령을 사용하여 애플리케이션을 컴파일하고 실행할 수 있습니다.

javac StreamProcessingApp.java
java StreamProcessingApp

이제 Apache Kafka 기반 스트림을 성공적으로 개발했습니다. KSQL 애플리케이션을 처리하고 Java 코드를 통해 데이터 읽기, 변환, 처리 및 쓰기를 구현합니다. 비즈니스 요구 사항을 충족하기 위해 실제 요구 사항에 따라 코드를 수정하고 확장할 수 있습니다. 이 기사가 도움이 되기를 바랍니다!

위 내용은 Java를 사용하여 Apache Kafka 및 KSQL 기반 스트림 처리 애플리케이션을 개발하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.