Apache Kafka에서 이벤트가 완전히 일관된 순서로 전송되고 소비되도록 하려면 메시지 분할 및 소비자 할당이 작동하는 방식을 이해하는 것이 중요합니다.
Kafka에서 파티션 사용
-
주제 분할:
- Kafka는 메시지를 주제 내의 파티션으로 구성합니다. 각 파티션은 수신되는 메시지의 순서를 유지합니다. 즉, 메시지는 해당 파티션으로 전송된 순서대로 처리됩니다.
- 순서를 보장하려면 동일한 컨텍스트(예: 사용자 ID 또는 트랜잭션 ID)와 관련된 모든 메시지가 동일한 파티션으로 전송되는 것이 중요합니다. 이는 메시지를 보낼 때 파티션 키를 사용하여 수행됩니다. Kafka는 이 키를 사용하여 해시 함수[1][5]를 사용하여 메시지를 보낼 파티션을 결정합니다.
-
메시지 키:
- 메시지를 보낼 때 키를 지정할 수 있습니다. 동일한 키를 가진 모든 메시지는 동일한 파티션으로 전송되므로 생성된 순서와 동일한 순서로 사용됩니다. 예를 들어 사용자 ID를 키로 사용하면 해당 사용자와 관련된 모든 이벤트가 동일한 파티션으로 이동합니다.
소비자 그룹
-
소비자 할당:
- Kafka의 소비자는 소비자 그룹으로 그룹화됩니다. 각 그룹에는 여러 소비자가 있을 수 있지만 각 파티션은 그룹 내에서 한 번에 한 소비자만 읽을 수 있습니다.
- 이는 파티션보다 소비자가 많으면 일부 소비자가 비활성 상태가 된다는 의미입니다. 질서를 유지하고 효율성을 극대화하려면 최소한 그룹 내 소비자 수만큼 파티션을 두는 것이 좋습니다.
-
오프셋 관리:
- Kafka는 파티션 내 각 메시지에 대한 증분 숫자 식별자인 오프셋을 사용하여 각 소비자의 읽기 상태를 저장합니다. 이를 통해 소비자는 장애 발생 시 중단한 부분부터 다시 시작할 수 있습니다.
추가 전략
- 오버로드 방지: 파티션 키를 선택할 때 일부 파티션은 과부하되고 다른 파티션은 충분히 활용되지 않도록 트래픽 분산을 고려하는 것이 중요합니다.
- 복제 및 내결함성: 파티션에 대해 적절한 복제(1보다 큼)를 구성해야 가용성은 물론 장애에 대한 시스템의 복원력도 향상됩니다.
Avro를 사용하여 Kafka에서 메시지 생성 및 소비 시스템을 구현하여 메시지가 순서대로 처리되도록 하고 가능한 오류를 처리하기 위한 전체 예는 다음과 같습니다. 여기에는 Avro 스키마 정의, 생산자 및 소비자 코드, 오류 처리 전략이 포함됩니다.
아브로 계획
먼저 페이로드에 대한 Avro 스키마를 정의합니다. 메시지 구조를 설명하는 user_signed_up.avsc라는 파일을 생성하겠습니다.
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
키 생성
메시지 생성 및 소비의 순서를 보장하기 위해 메시지 유형-날짜로 구성된 키를 사용합니다(예: user-signed-up-2024-11-04).
프로듀서 카프카
Avro 구성표를 사용하여 Kafka에 메시지를 보내는 Producer의 코드는 다음과 같습니다.
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<string byte> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<string byte> producerRecord = new ProducerRecord(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<string byte> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<genericrecord> datumWriter = new GenericDatumWriter(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } } </genericrecord></string></string></string></string>
재시도 고려사항
**재시도를 활성화할 때 제대로 처리되지 않으면 메시지 순서가 바뀔 위험이 있다는 점에 유의하는 것이 중요합니다.
이를 방지하려면:
**max.in.flight.requests.per.connection: 이 속성을 1로 설정하면 메시지가 한 번에 하나씩 전송되고 순서대로 처리되도록 할 수 있습니다. 그러나 이는 성능에 영향을 미칠 수 있습니다.
이러한 구성과 적절한 오류 처리를 통해 Kafka 생산자가 더욱 강력해지고 필요한 순서를 유지하면서 메시지 생성 오류를 처리할 수 있는지 확인할 수 있습니다.
**카프카 소비자
**메시지를 읽고 처리하는 소비자:
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<string byte> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<string byte> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<string byte> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<genericrecord> reader = new GenericDatumReader(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<string byte> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } } </string></string></genericrecord></string></string></string>
키의 실제 예
다양한 이벤트와 다양한 파티션이 있는 환경에서 현실적인 키는 다음과 같습니다.
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
이를 통해 특정 날짜, 특정 유형과 관련된 모든 이벤트를 동일한 파티션으로 전송하여 순서대로 처리할 수 있습니다. 또한 필요한 경우 추가 세부정보(예: 세션 또는 거래 ID)를 포함하여 키를 다양화할 수 있습니다.
Avro를 사용하여 Kafka에서 오류를 처리하고 메시지 순서를 보장하기 위한 이러한 구현 및 전략을 통해 강력하고 효율적인 이벤트 관리 시스템을 구축할 수 있습니다.
이제 좀 더 유능한 Kafka 생산자이자 소비자가 되었습니다.
회로 차단기, 로컬 지속성 및 DLQ를 갖춘 Kafka 프로듀서.
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<string byte> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<string byte> producerRecord = new ProducerRecord(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<string byte> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<genericrecord> datumWriter = new GenericDatumWriter(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } } </genericrecord></string></string></string></string>
DLQ 관리 기능을 갖춘 Kafka Consumer.
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<string byte> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<string byte> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<string byte> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<genericrecord> reader = new GenericDatumReader(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<string byte> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } } </string></string></genericrecord></string></string></string>
user-signed-up-2024-11-04 order-created-2024-11-04 payment-processed-2024-11-04
코드 설명
회로 차단기:
Resilience4j는 생산자의 차단기 회로를 관리하는 데 사용됩니다. 실패율 임계값과 열린 상태에서의 대기 시간이 구성됩니다.
로컬 지속성 및 DLQ:
실패한 메시지는 로컬 파일(failed_messages.log)과 오류 대기열(dead_letter_queue.log)에 모두 저장됩니다.
오류 처리:
생산자와 소비자에서는 오류가 적절하게 처리되고 기록됩니다.
DLQ 처리:
소비자는 기본 주제의 메시지를 소비한 후 DLQ에 저장된 메시지도 처리합니다.
로그 기록:
System.err 및 System.out 메시지는 오류 및 중요한 이벤트를 기록하는 데 사용됩니다.
최종 고려사항
이 구현을 통해:
이는 탄력적인 방식으로 메시지가 전송되고 처리되도록 보장합니다.
일시적이거나 지속적인 오류에 대해서는 적절한 처리가 제공됩니다.
로직에서는 데드 레터 큐(Dead Letter Queue)를 사용하여 효과적인 복구가 가능합니다.
차단기 회로는 장기간 장애 발생 시 시스템이 포화되는 것을 방지하는 데 도움이 됩니다.
이 접근 방식은 Kafka에서 메시지의 질서 있는 전달을 유지하면서 물리적, 논리적 문제를 처리할 수 있는 강력한 시스템을 만듭니다.
위 내용은 Kafka를 사용하여 전달되고 주문된 메시지를 가져오고 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

이 기사는 카페인 및 구아바 캐시를 사용하여 자바에서 다단계 캐싱을 구현하여 응용 프로그램 성능을 향상시키는 것에 대해 설명합니다. 구성 및 퇴거 정책 관리 Best Pra와 함께 설정, 통합 및 성능 이점을 다룹니다.

Java의 클래스 로딩에는 부트 스트랩, 확장 및 응용 프로그램 클래스 로더가있는 계층 적 시스템을 사용하여 클래스로드, 링크 및 초기화 클래스가 포함됩니다. 학부모 위임 모델은 핵심 클래스가 먼저로드되어 사용자 정의 클래스 LOA에 영향을 미치도록합니다.

이 기사는 Lambda 표현식, 스트림 API, 메소드 참조 및 선택 사항을 사용하여 기능 프로그래밍을 Java에 통합합니다. 간결함과 불변성을 통한 개선 된 코드 가독성 및 유지 관리 가능성과 같은 이점을 강조합니다.

이 기사는 캐싱 및 게으른 하중과 같은 고급 기능을 사용하여 객체 관계 매핑에 JPA를 사용하는 것에 대해 설명합니다. 잠재적 인 함정을 강조하면서 성능을 최적화하기위한 설정, 엔티티 매핑 및 모범 사례를 다룹니다. [159 문자]

이 기사에서는 Java 프로젝트 관리, 구축 자동화 및 종속성 해상도에 Maven 및 Gradle을 사용하여 접근 방식과 최적화 전략을 비교합니다.

이 기사에서는 선택기와 채널을 사용하여 단일 스레드와 효율적으로 처리하기 위해 선택기 및 채널을 사용하여 Java의 NIO API를 설명합니다. 프로세스, 이점 (확장 성, 성능) 및 잠재적 인 함정 (복잡성,

이 기사에서는 Maven 및 Gradle과 같은 도구를 사용하여 적절한 버전 및 종속성 관리로 사용자 정의 Java 라이브러리 (JAR Files)를 작성하고 사용하는 것에 대해 설명합니다.

이 기사는 네트워크 통신을위한 Java의 소켓 API, 클라이언트 서버 설정, 데이터 처리 및 리소스 관리, 오류 처리 및 보안과 같은 중요한 고려 사항에 대해 자세히 설명합니다. 또한 성능 최적화 기술, i


핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

AI Hentai Generator
AI Hentai를 무료로 생성하십시오.

인기 기사

뜨거운 도구

안전한 시험 브라우저
안전한 시험 브라우저는 온라인 시험을 안전하게 치르기 위한 보안 브라우저 환경입니다. 이 소프트웨어는 모든 컴퓨터를 안전한 워크스테이션으로 바꿔줍니다. 이는 모든 유틸리티에 대한 액세스를 제어하고 학생들이 승인되지 않은 리소스를 사용하는 것을 방지합니다.

ZendStudio 13.5.1 맥
강력한 PHP 통합 개발 환경

MinGW - Windows용 미니멀리스트 GNU
이 프로젝트는 osdn.net/projects/mingw로 마이그레이션되는 중입니다. 계속해서 그곳에서 우리를 팔로우할 수 있습니다. MinGW: GCC(GNU Compiler Collection)의 기본 Windows 포트로, 기본 Windows 애플리케이션을 구축하기 위한 무료 배포 가능 가져오기 라이브러리 및 헤더 파일로 C99 기능을 지원하는 MSVC 런타임에 대한 확장이 포함되어 있습니다. 모든 MinGW 소프트웨어는 64비트 Windows 플랫폼에서 실행될 수 있습니다.

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

에디트플러스 중국어 크랙 버전
작은 크기, 구문 강조, 코드 프롬프트 기능을 지원하지 않음

뜨거운 주제



