Um sicherzustellen, dass Ereignisse in Apache Kafka in einer vollständig konsistenten Reihenfolge gesendet und konsumiert werden, ist es wichtig zu verstehen, wie Nachrichtenpartitionierung und Verbraucherzuweisung funktionieren.
Themenpartitionierung:
Nachrichtenschlüssel:
Verbraucherzuweisung:
Offset-Management:
Um ein Nachrichtenproduktions- und -verbrauchssystem in Kafka mit Avro zu implementieren, um sicherzustellen, dass Nachrichten in der richtigen Reihenfolge verarbeitet werden und mögliche Fehler behandelt werden, finden Sie hier ein vollständiges Beispiel. Dazu gehört die Definition des Avro-Schemas, des Producer- und Consumer-Codes sowie Strategien zur Fehlerbehandlung.
Avro-Schema
Zuerst definieren wir das Avro-Schema für unsere Nutzlast. Wir erstellen eine Datei mit dem Namen user_signed_up.avsc, die die Struktur der Nachricht beschreibt.
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
Schlüsselgenerierung
Um Ordnung bei der Erstellung und Nutzung von Nachrichten zu gewährleisten, verwenden wir einen Schlüssel, der als Nachrichtentyp-Datum strukturiert ist, zum Beispiel: user-signed-up-2024-11-04.
Produzent Kafka
Hier ist der Code für den Produzenten, der mithilfe des Avro-Schemas Nachrichten an Kafka sendet:
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<String, byte[]> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } }
Überlegungen zum erneuten Versuch
**Es ist wichtig zu beachten, dass bei der Aktivierung von Wiederholungsversuchen das Risiko einer Neuordnung der Nachrichten bestehen kann, wenn diese nicht ordnungsgemäß gehandhabt werden.
Um dies zu vermeiden:
**max.in.flight.requests.per.connection: Sie können diese Eigenschaft auf 1 setzen, um sicherzustellen, dass Nachrichten einzeln gesendet und der Reihe nach verarbeitet werden. Dies kann jedoch die Leistung beeinträchtigen.
Mit dieser Konfiguration und der richtigen Fehlerbehandlung können Sie sicherstellen, dass Ihr Kafka-Produzent robuster und in der Lage ist, Fehler bei der Nachrichtenproduktion zu verarbeiten und gleichzeitig die erforderliche Reihenfolge aufrechtzuerhalten.
**Kafka-Verbraucher
**Der Verbraucher, der die Nachrichten liest und verarbeitet:
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<String, byte[]> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, byte[]> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } }
Realistisches Beispiel für Schlüssel
In einer Umgebung mit vielen verschiedenen Ereignissen und vielen verschiedenen Partitionen könnte ein realistischer Schlüssel etwa so aussehen:
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
Dadurch können alle Ereignisse, die sich auf einen bestimmten Typ beziehen, an einem bestimmten Datum an dieselbe Partition gesendet und der Reihe nach verarbeitet werden. Darüber hinaus können Sie die Schlüssel diversifizieren, indem Sie bei Bedarf weitere Details hinzufügen (z. B. eine Sitzungs- oder Transaktions-ID).
Mit dieser Implementierung und Strategien zur Fehlerbehandlung und Sicherstellung der Nachrichtenreihenfolge in Kafka mithilfe von Avro können Sie ein robustes und effizientes System zur Verwaltung von Ereignissen aufbauen.
Jetzt ein etwas leistungsfähigerer Kafka-Produzent und -Konsumer.
Kafka-Produzent mit Leistungsschalter, lokaler Persistenz und DLQ.
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<String, byte[]> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } }
Kafka Consumer mit DLQ Management.
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<String, byte[]> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, byte[]> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } }
user-signed-up-2024-11-04 order-created-2024-11-04 payment-processed-2024-11-04
Code-Erklärung
Leistungsschalter:
Resilience4j wird zur Verwaltung des Leistungsschalterkreises des Herstellers verwendet. Es sind ein Schwellenwert für die Ausfallrate und eine Wartezeit im geöffneten Zustand konfiguriert.
Lokale Persistenz und DLQ:
Fehlgeschlagene Nachrichten werden sowohl in einer lokalen Datei (failed_messages.log) als auch in einer Fehlerwarteschlange (dead_letter_queue.log) gespeichert.
Fehlerbehandlung:
Im Producer und Consumer werden Fehler angemessen behandelt und protokolliert.
DLQ-Verarbeitung:
Der Verbraucher verarbeitet auch im DLQ gespeicherte Nachrichten, nachdem er Nachrichten aus dem Hauptthema konsumiert hat.
Protokollierung:
System.err- und System.out-Nachrichten werden zum Protokollieren von Fehlern und wichtigen Ereignissen verwendet.
Abschließende Überlegungen
Mit dieser Implementierung:
Es stellt sicher, dass Nachrichten zuverlässig gesendet und verarbeitet werden.
Für vorübergehende oder anhaltende Fehler ist eine ordnungsgemäße Behandlung vorgesehen.
Logic ermöglicht eine effektive Wiederherstellung durch die Verwendung einer Warteschlange für unzustellbare Nachrichten.
Der Leistungsschalter verhindert, dass das System bei längeren Ausfällen überlastet wird.
Dieser Ansatz schafft ein robustes System, das physische und logische Probleme bewältigen kann und gleichzeitig die ordnungsgemäße Zustellung von Nachrichten in Kafka aufrechterhält.
Das obige ist der detaillierte Inhalt vonSo erhalten und konsumieren Sie Nachrichten, die mit Kafka zugestellt und bestellt werden. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!