>  기사  >  Java  >  Kafka를 사용하여 전달되고 주문된 메시지를 가져오고 사용하는 방법

Kafka를 사용하여 전달되고 주문된 메시지를 가져오고 사용하는 방법

Linda Hamilton
Linda Hamilton원래의
2024-11-06 21:08:02202검색

Como conseguir y la consumición entregada y ordenada de mensajes con Kafka

Apache Kafka에서 이벤트가 완전히 일관된 순서로 전송되고 소비되도록 하려면 메시지 분할 및 소비자 할당이 작동하는 방식을 이해하는 것이 중요합니다.

Kafka에서 파티션 사용

  1. 주제 분할:

    • Kafka는 메시지를 주제 내의 파티션으로 구성합니다. 각 파티션은 수신되는 메시지의 순서를 유지합니다. 즉, 메시지는 해당 파티션으로 전송된 순서대로 처리됩니다.
    • 순서를 보장하려면 동일한 컨텍스트(예: 사용자 ID 또는 트랜잭션 ID)와 관련된 모든 메시지가 동일한 파티션으로 전송되는 것이 중요합니다. 이는 메시지를 보낼 때 파티션 키를 사용하여 수행됩니다. Kafka는 이 키를 사용하여 해시 함수[1][5]를 사용하여 메시지를 보낼 파티션을 결정합니다.
  2. 메시지 키:

    • 메시지를 보낼 때 를 지정할 수 있습니다. 동일한 키를 가진 모든 메시지는 동일한 파티션으로 전송되므로 생성된 순서와 동일한 순서로 사용됩니다. 예를 들어 사용자 ID를 키로 사용하면 해당 사용자와 관련된 모든 이벤트가 동일한 파티션으로 이동합니다.

소비자 그룹

  1. 소비자 할당:

    • Kafka의 소비자는 소비자 그룹으로 그룹화됩니다. 각 그룹에는 여러 소비자가 있을 수 있지만 각 파티션은 그룹 내에서 한 번에 한 소비자만 읽을 수 있습니다.
    • 이는 파티션보다 소비자가 많으면 일부 소비자가 비활성 상태가 된다는 의미입니다. 질서를 유지하고 효율성을 극대화하려면 최소한 그룹 내 소비자 수만큼 파티션을 두는 것이 좋습니다.
  2. 오프셋 관리:

    • Kafka는 파티션 내 각 메시지에 대한 증분 숫자 식별자인 오프셋을 사용하여 각 소비자의 읽기 상태를 저장합니다. 이를 통해 소비자는 장애 발생 시 중단한 부분부터 다시 시작할 수 있습니다.

추가 전략

  • 오버로드 방지: 파티션 키를 선택할 때 일부 파티션은 과부하되고 다른 파티션은 충분히 활용되지 않도록 트래픽 분산을 고려하는 것이 중요합니다.
  • 복제 및 내결함성: 파티션에 대해 적절한 복제(1보다 큼)를 구성해야 가용성은 물론 장애에 대한 시스템의 복원력도 향상됩니다.

Avro를 사용하여 Kafka에서 메시지 생성 및 소비 시스템을 구현하여 메시지가 순서대로 처리되도록 하고 가능한 오류를 처리하기 위한 전체 예는 다음과 같습니다. 여기에는 Avro 스키마 정의, 생산자 및 소비자 코드, 오류 처리 전략이 포함됩니다.
아브로 계획
먼저 페이로드에 대한 Avro 스키마를 정의합니다. 메시지 구조를 설명하는 user_signed_up.avsc라는 파일을 생성하겠습니다.

{
  "type": "record",
  "name": "UserSignedUp",
  "namespace": "com.example",
  "fields": [
    { "name": "userId", "type": "int" },
    { "name": "userEmail", "type": "string" },
    { "name": "timestamp", "type": "string" } // Formato ISO 8601
  ]
}

키 생성
메시지 생성 및 소비의 순서를 보장하기 위해 메시지 유형-날짜로 구성된 키를 사용합니다(예: user-signed-up-2024-11-04).
프로듀서 카프카
Avro 구성표를 사용하여 Kafka에 메시지를 보내는 Producer의 코드는 다음과 같습니다.

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;

public class AvroProducer {
    private final KafkaProducer<String, byte[]> producer;
    private final Schema schema;

    public AvroProducer(String bootstrapServers) throws IOException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");

// Establecer la propiedad de reintentos, Número de reintentos
        properties.put(ProducerConfig.RETRIES_CONFIG, 3); 
// Asegura que todos los réplicas reconozcan la escritura,
        properties.put(ProducerConfig.ACKS_CONFIG, "all"); 
// Solo un mensaje a la vez
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); 
// Habilitar idempotencia, no quiero enviar duplicados
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 

        this.producer = new KafkaProducer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void sendMessage(String topic, int userId, String userEmail) {
        GenericRecord record = new GenericData.Record(schema);
        record.put("userId", userId);
        record.put("userEmail", userEmail);
        record.put("timestamp", java.time.Instant.now().toString());

        String key = String.format("user-signed-up-%s", java.time.LocalDate.now());

        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record));

        producer.send(producerRecord, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
**handleFailure(exception, producerRecord);
**            } else {
                System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset());
            }
        });
    }

private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) {
        // Log the error for monitoring
        System.err.println("Error sending message: " + exception.getMessage());

        // Implement local persistence as a fallback
        saveToLocalStorage(producerRecord);

        // Optionally: Notify an external monitoring system or alert
    }

    private void saveToLocalStorage(ProducerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE, 
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje guardado localmente para reenvío: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving to local storage: " + e.getMessage());
        }
    }

    private byte[] serialize(GenericRecord record) {
        // Crear un ByteArrayOutputStream para almacenar los bytes serializados
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    // Crear un escritor de datos para el registro Avro
    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema());

    // Crear un encoder para escribir en el ByteArrayOutputStream
    Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);

    try {
        // Escribir el registro en el encoder
        datumWriter.write(record, encoder);
        // Finalizar la escritura
        encoder.flush();
    } catch (IOException e) {
        throw new AvroSerializationException("Error serializing Avro record", e);
    }

    // Devolver los bytes serializados
    return outputStream.toByteArray();
    }

    public void close() {
        producer.close();
    }
}

재시도 고려사항
**재시도를 활성화할 때 제대로 처리되지 않으면 메시지 순서가 바뀔 위험이 있다는 점에 유의하는 것이 중요합니다.
이를 방지하려면:
**max.in.flight.requests.per.connection
: 이 속성을 1로 설정하면 메시지가 한 번에 하나씩 전송되고 순서대로 처리되도록 할 수 있습니다. 그러나 이는 성능에 영향을 미칠 수 있습니다.
이러한 구성과 적절한 오류 처리를 통해 Kafka 생산자가 더욱 강력해지고 필요한 순서를 유지하면서 메시지 생성 오류를 처리할 수 있는지 확인할 수 있습니다.

**카프카 소비자
**메시지를 읽고 처리하는 소비자:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.DatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

public class AvroConsumer {
    private final KafkaConsumer<String, byte[]> consumer;
    private final Schema schema;

    public AvroConsumer(String bootstrapServers, String groupId) throws IOException {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");

        this.consumer = new KafkaConsumer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void consume(String topic) {
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, byte[]> record : records) {
                try {
                    processMessage(record.value());
                } catch (Exception e) {
                    handleProcessingError(e, record);
                }
            }
        }
    }

    private void processMessage(byte[] data) throws IOException {
        DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
        var decoder = DecoderFactory.get().binaryDecoder(data, null);
        GenericRecord record = reader.read(null, decoder);

        System.out.printf("Consumido mensaje: %s - %s - %s%n", 
            record.get("userId"), 
            record.get("userEmail"), 
            record.get("timestamp"));
    }

    private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) {
        System.err.println("Error processing message: " + e.getMessage());

        // Implement logic to save failed messages for later processing
        saveFailedMessage(record);
    }

    private void saveFailedMessage(ConsumerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_consumed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE,
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving consumed message to local storage: " + e.getMessage());
        }
    }

    public void close() {
        consumer.close();
    }
}

키의 실제 예
다양한 이벤트와 다양한 파티션이 있는 환경에서 현실적인 키는 다음과 같습니다.

{
  "type": "record",
  "name": "UserSignedUp",
  "namespace": "com.example",
  "fields": [
    { "name": "userId", "type": "int" },
    { "name": "userEmail", "type": "string" },
    { "name": "timestamp", "type": "string" } // Formato ISO 8601
  ]
}

이를 통해 특정 날짜, 특정 유형과 관련된 모든 이벤트를 동일한 파티션으로 전송하여 순서대로 처리할 수 있습니다. 또한 필요한 경우 추가 세부정보(예: 세션 또는 거래 ID)를 포함하여 키를 다양화할 수 있습니다.
Avro를 사용하여 Kafka에서 오류를 처리하고 메시지 순서를 보장하기 위한 이러한 구현 및 전략을 통해 강력하고 효율적인 이벤트 관리 시스템을 구축할 수 있습니다.

이제 좀 더 유능한 Kafka 생산자이자 소비자가 되었습니다.

회로 차단기, 로컬 지속성 및 DLQ를 갖춘 Kafka 프로듀서.

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;

public class AvroProducer {
    private final KafkaProducer<String, byte[]> producer;
    private final Schema schema;

    public AvroProducer(String bootstrapServers) throws IOException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");

// Establecer la propiedad de reintentos, Número de reintentos
        properties.put(ProducerConfig.RETRIES_CONFIG, 3); 
// Asegura que todos los réplicas reconozcan la escritura,
        properties.put(ProducerConfig.ACKS_CONFIG, "all"); 
// Solo un mensaje a la vez
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); 
// Habilitar idempotencia, no quiero enviar duplicados
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 

        this.producer = new KafkaProducer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void sendMessage(String topic, int userId, String userEmail) {
        GenericRecord record = new GenericData.Record(schema);
        record.put("userId", userId);
        record.put("userEmail", userEmail);
        record.put("timestamp", java.time.Instant.now().toString());

        String key = String.format("user-signed-up-%s", java.time.LocalDate.now());

        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record));

        producer.send(producerRecord, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
**handleFailure(exception, producerRecord);
**            } else {
                System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset());
            }
        });
    }

private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) {
        // Log the error for monitoring
        System.err.println("Error sending message: " + exception.getMessage());

        // Implement local persistence as a fallback
        saveToLocalStorage(producerRecord);

        // Optionally: Notify an external monitoring system or alert
    }

    private void saveToLocalStorage(ProducerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE, 
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje guardado localmente para reenvío: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving to local storage: " + e.getMessage());
        }
    }

    private byte[] serialize(GenericRecord record) {
        // Crear un ByteArrayOutputStream para almacenar los bytes serializados
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    // Crear un escritor de datos para el registro Avro
    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema());

    // Crear un encoder para escribir en el ByteArrayOutputStream
    Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);

    try {
        // Escribir el registro en el encoder
        datumWriter.write(record, encoder);
        // Finalizar la escritura
        encoder.flush();
    } catch (IOException e) {
        throw new AvroSerializationException("Error serializing Avro record", e);
    }

    // Devolver los bytes serializados
    return outputStream.toByteArray();
    }

    public void close() {
        producer.close();
    }
}

DLQ 관리 기능을 갖춘 Kafka Consumer.

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.DatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

public class AvroConsumer {
    private final KafkaConsumer<String, byte[]> consumer;
    private final Schema schema;

    public AvroConsumer(String bootstrapServers, String groupId) throws IOException {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");

        this.consumer = new KafkaConsumer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void consume(String topic) {
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, byte[]> record : records) {
                try {
                    processMessage(record.value());
                } catch (Exception e) {
                    handleProcessingError(e, record);
                }
            }
        }
    }

    private void processMessage(byte[] data) throws IOException {
        DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
        var decoder = DecoderFactory.get().binaryDecoder(data, null);
        GenericRecord record = reader.read(null, decoder);

        System.out.printf("Consumido mensaje: %s - %s - %s%n", 
            record.get("userId"), 
            record.get("userEmail"), 
            record.get("timestamp"));
    }

    private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) {
        System.err.println("Error processing message: " + e.getMessage());

        // Implement logic to save failed messages for later processing
        saveFailedMessage(record);
    }

    private void saveFailedMessage(ConsumerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_consumed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE,
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving consumed message to local storage: " + e.getMessage());
        }
    }

    public void close() {
        consumer.close();
    }
}

user-signed-up-2024-11-04
order-created-2024-11-04
payment-processed-2024-11-04

코드 설명
회로 차단기:
Resilience4j는 생산자의 차단기 회로를 관리하는 데 사용됩니다. 실패율 임계값과 열린 상태에서의 대기 시간이 구성됩니다.
로컬 지속성 및 DLQ:
실패한 메시지는 로컬 파일(failed_messages.log)과 오류 대기열(dead_letter_queue.log)에 모두 저장됩니다.
오류 처리:
생산자와 소비자에서는 오류가 적절하게 처리되고 기록됩니다.
DLQ 처리:
소비자는 기본 주제의 메시지를 소비한 후 DLQ에 저장된 메시지도 처리합니다.
로그 기록:
System.err 및 System.out 메시지는 오류 및 중요한 이벤트를 기록하는 데 사용됩니다.
최종 고려사항
이 구현을 통해:
이는 탄력적인 방식으로 메시지가 전송되고 처리되도록 보장합니다.
일시적이거나 지속적인 오류에 대해서는 적절한 처리가 제공됩니다.
로직에서는 데드 레터 큐(Dead Letter Queue)를 사용하여 효과적인 복구가 가능합니다.
차단기 회로는 장기간 장애 발생 시 시스템이 포화되는 것을 방지하는 데 도움이 됩니다.
이 접근 방식은 Kafka에서 메시지의 질서 있는 전달을 유지하면서 물리적, 논리적 문제를 처리할 수 있는 강력한 시스템을 만듭니다.

위 내용은 Kafka를 사용하여 전달되고 주문된 메시지를 가져오고 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.