Heim  >  Artikel  >  Java  >  So erhalten und konsumieren Sie Nachrichten, die mit Kafka zugestellt und bestellt werden

So erhalten und konsumieren Sie Nachrichten, die mit Kafka zugestellt und bestellt werden

Linda Hamilton
Linda HamiltonOriginal
2024-11-06 21:08:02281Durchsuche

Como conseguir y la consumición entregada y ordenada de mensajes con Kafka

Um sicherzustellen, dass Ereignisse in Apache Kafka in einer vollständig konsistenten Reihenfolge gesendet und konsumiert werden, ist es wichtig zu verstehen, wie Nachrichtenpartitionierung und Verbraucherzuweisung funktionieren.

Verwenden von Partitionen in Kafka

  1. Themenpartitionierung:

    • Kafka organisiert Nachrichten in Partitionen innerhalb eines Themas. Jede Partition behält die Reihenfolge der Nachrichten bei, die sie empfängt, was bedeutet, dass Nachrichten in der Reihenfolge verarbeitet werden, in der sie an diese Partition gesendet wurden.
    • Um die Ordnung sicherzustellen, ist es wichtig, dass alle Nachrichten, die sich auf denselben Kontext beziehen (z. B. eine Benutzer-ID oder eine Transaktions-ID), an dieselbe Partition gesendet werden. Dies wird durch die Verwendung eines Partitionsschlüssels beim Senden von Nachrichten erreicht. Kafka verwendet diesen Schlüssel, um mithilfe einer Hash-Funktion[1][5] zu bestimmen, an welche Partition die Nachricht gesendet werden soll.
  2. Nachrichtenschlüssel:

    • Beim Versenden einer Nachricht kann ein Schlüssel angegeben werden. Alle Nachrichten mit demselben Schlüssel werden an dieselbe Partition gesendet, wodurch sichergestellt wird, dass sie in derselben Reihenfolge verarbeitet werden, in der sie erstellt wurden. Wenn beispielsweise die Benutzer-ID als Schlüssel verwendet wird, werden alle mit diesem Benutzer verbundenen Ereignisse in dieselbe Partition verschoben.

Verbrauchergruppen

  1. Verbraucherzuweisung:

    • Verbraucher werden in Kafka in Verbrauchergruppen eingeteilt. Jede Gruppe kann mehrere Verbraucher haben, aber jede Partition kann jeweils nur von einem Verbraucher innerhalb der Gruppe gelesen werden.
    • Das bedeutet, dass einige Verbraucher inaktiv sind, wenn Sie mehr Verbraucher als Partitionen haben. Um die Ordnung aufrechtzuerhalten und die Effizienz zu maximieren, ist es ratsam, mindestens so viele Partitionen zu haben, wie es Verbraucher in der Gruppe gibt.
  2. Offset-Management:

    • Kafka speichert den Lesestatus jedes Verbrauchers mithilfe von Offsets, bei denen es sich um inkrementelle numerische Kennungen für jede Nachricht innerhalb einer Partition handelt. Dadurch können Verbraucher bei Ausfällen dort weitermachen, wo sie aufgehört haben.

Zusätzliche Strategien

  • Überlastungen vermeiden: Bei der Auswahl von Partitionsschlüsseln ist es wichtig, die Verkehrsverteilung zu berücksichtigen, um zu vermeiden, dass einige Partitionen überlastet werden, während andere nicht ausreichend genutzt werden.
  • Replikation und Fehlertoleranz: Stellen Sie sicher, dass Sie eine angemessene Replikation (größer als 1) für die Partitionen konfigurieren, was nicht nur die Verfügbarkeit, sondern auch die Ausfallsicherheit des Systems verbessert.

Um ein Nachrichtenproduktions- und -verbrauchssystem in Kafka mit Avro zu implementieren, um sicherzustellen, dass Nachrichten in der richtigen Reihenfolge verarbeitet werden und mögliche Fehler behandelt werden, finden Sie hier ein vollständiges Beispiel. Dazu gehört die Definition des Avro-Schemas, des Producer- und Consumer-Codes sowie Strategien zur Fehlerbehandlung.
Avro-Schema
Zuerst definieren wir das Avro-Schema für unsere Nutzlast. Wir erstellen eine Datei mit dem Namen user_signed_up.avsc, die die Struktur der Nachricht beschreibt.

{
  "type": "record",
  "name": "UserSignedUp",
  "namespace": "com.example",
  "fields": [
    { "name": "userId", "type": "int" },
    { "name": "userEmail", "type": "string" },
    { "name": "timestamp", "type": "string" } // Formato ISO 8601
  ]
}

Schlüsselgenerierung
Um Ordnung bei der Erstellung und Nutzung von Nachrichten zu gewährleisten, verwenden wir einen Schlüssel, der als Nachrichtentyp-Datum strukturiert ist, zum Beispiel: user-signed-up-2024-11-04.
Produzent Kafka
Hier ist der Code für den Produzenten, der mithilfe des Avro-Schemas Nachrichten an Kafka sendet:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;

public class AvroProducer {
    private final KafkaProducer<String, byte[]> producer;
    private final Schema schema;

    public AvroProducer(String bootstrapServers) throws IOException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");

// Establecer la propiedad de reintentos, Número de reintentos
        properties.put(ProducerConfig.RETRIES_CONFIG, 3); 
// Asegura que todos los réplicas reconozcan la escritura,
        properties.put(ProducerConfig.ACKS_CONFIG, "all"); 
// Solo un mensaje a la vez
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); 
// Habilitar idempotencia, no quiero enviar duplicados
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 

        this.producer = new KafkaProducer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void sendMessage(String topic, int userId, String userEmail) {
        GenericRecord record = new GenericData.Record(schema);
        record.put("userId", userId);
        record.put("userEmail", userEmail);
        record.put("timestamp", java.time.Instant.now().toString());

        String key = String.format("user-signed-up-%s", java.time.LocalDate.now());

        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record));

        producer.send(producerRecord, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
**handleFailure(exception, producerRecord);
**            } else {
                System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset());
            }
        });
    }

private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) {
        // Log the error for monitoring
        System.err.println("Error sending message: " + exception.getMessage());

        // Implement local persistence as a fallback
        saveToLocalStorage(producerRecord);

        // Optionally: Notify an external monitoring system or alert
    }

    private void saveToLocalStorage(ProducerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE, 
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje guardado localmente para reenvío: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving to local storage: " + e.getMessage());
        }
    }

    private byte[] serialize(GenericRecord record) {
        // Crear un ByteArrayOutputStream para almacenar los bytes serializados
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    // Crear un escritor de datos para el registro Avro
    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema());

    // Crear un encoder para escribir en el ByteArrayOutputStream
    Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);

    try {
        // Escribir el registro en el encoder
        datumWriter.write(record, encoder);
        // Finalizar la escritura
        encoder.flush();
    } catch (IOException e) {
        throw new AvroSerializationException("Error serializing Avro record", e);
    }

    // Devolver los bytes serializados
    return outputStream.toByteArray();
    }

    public void close() {
        producer.close();
    }
}

Überlegungen zum erneuten Versuch
**Es ist wichtig zu beachten, dass bei der Aktivierung von Wiederholungsversuchen das Risiko einer Neuordnung der Nachrichten bestehen kann, wenn diese nicht ordnungsgemäß gehandhabt werden.
Um dies zu vermeiden:
**max.in.flight.requests.per.connection
: Sie können diese Eigenschaft auf 1 setzen, um sicherzustellen, dass Nachrichten einzeln gesendet und der Reihe nach verarbeitet werden. Dies kann jedoch die Leistung beeinträchtigen.
Mit dieser Konfiguration und der richtigen Fehlerbehandlung können Sie sicherstellen, dass Ihr Kafka-Produzent robuster und in der Lage ist, Fehler bei der Nachrichtenproduktion zu verarbeiten und gleichzeitig die erforderliche Reihenfolge aufrechtzuerhalten.

**Kafka-Verbraucher
**Der Verbraucher, der die Nachrichten liest und verarbeitet:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.DatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

public class AvroConsumer {
    private final KafkaConsumer<String, byte[]> consumer;
    private final Schema schema;

    public AvroConsumer(String bootstrapServers, String groupId) throws IOException {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");

        this.consumer = new KafkaConsumer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void consume(String topic) {
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, byte[]> record : records) {
                try {
                    processMessage(record.value());
                } catch (Exception e) {
                    handleProcessingError(e, record);
                }
            }
        }
    }

    private void processMessage(byte[] data) throws IOException {
        DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
        var decoder = DecoderFactory.get().binaryDecoder(data, null);
        GenericRecord record = reader.read(null, decoder);

        System.out.printf("Consumido mensaje: %s - %s - %s%n", 
            record.get("userId"), 
            record.get("userEmail"), 
            record.get("timestamp"));
    }

    private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) {
        System.err.println("Error processing message: " + e.getMessage());

        // Implement logic to save failed messages for later processing
        saveFailedMessage(record);
    }

    private void saveFailedMessage(ConsumerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_consumed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE,
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving consumed message to local storage: " + e.getMessage());
        }
    }

    public void close() {
        consumer.close();
    }
}

Realistisches Beispiel für Schlüssel
In einer Umgebung mit vielen verschiedenen Ereignissen und vielen verschiedenen Partitionen könnte ein realistischer Schlüssel etwa so aussehen:

{
  "type": "record",
  "name": "UserSignedUp",
  "namespace": "com.example",
  "fields": [
    { "name": "userId", "type": "int" },
    { "name": "userEmail", "type": "string" },
    { "name": "timestamp", "type": "string" } // Formato ISO 8601
  ]
}

Dadurch können alle Ereignisse, die sich auf einen bestimmten Typ beziehen, an einem bestimmten Datum an dieselbe Partition gesendet und der Reihe nach verarbeitet werden. Darüber hinaus können Sie die Schlüssel diversifizieren, indem Sie bei Bedarf weitere Details hinzufügen (z. B. eine Sitzungs- oder Transaktions-ID).
Mit dieser Implementierung und Strategien zur Fehlerbehandlung und Sicherstellung der Nachrichtenreihenfolge in Kafka mithilfe von Avro können Sie ein robustes und effizientes System zur Verwaltung von Ereignissen aufbauen.

Jetzt ein etwas leistungsfähigerer Kafka-Produzent und -Konsumer.

Kafka-Produzent mit Leistungsschalter, lokaler Persistenz und DLQ.

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;

public class AvroProducer {
    private final KafkaProducer<String, byte[]> producer;
    private final Schema schema;

    public AvroProducer(String bootstrapServers) throws IOException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");

// Establecer la propiedad de reintentos, Número de reintentos
        properties.put(ProducerConfig.RETRIES_CONFIG, 3); 
// Asegura que todos los réplicas reconozcan la escritura,
        properties.put(ProducerConfig.ACKS_CONFIG, "all"); 
// Solo un mensaje a la vez
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); 
// Habilitar idempotencia, no quiero enviar duplicados
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 

        this.producer = new KafkaProducer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void sendMessage(String topic, int userId, String userEmail) {
        GenericRecord record = new GenericData.Record(schema);
        record.put("userId", userId);
        record.put("userEmail", userEmail);
        record.put("timestamp", java.time.Instant.now().toString());

        String key = String.format("user-signed-up-%s", java.time.LocalDate.now());

        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record));

        producer.send(producerRecord, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
**handleFailure(exception, producerRecord);
**            } else {
                System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset());
            }
        });
    }

private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) {
        // Log the error for monitoring
        System.err.println("Error sending message: " + exception.getMessage());

        // Implement local persistence as a fallback
        saveToLocalStorage(producerRecord);

        // Optionally: Notify an external monitoring system or alert
    }

    private void saveToLocalStorage(ProducerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE, 
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje guardado localmente para reenvío: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving to local storage: " + e.getMessage());
        }
    }

    private byte[] serialize(GenericRecord record) {
        // Crear un ByteArrayOutputStream para almacenar los bytes serializados
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    // Crear un escritor de datos para el registro Avro
    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema());

    // Crear un encoder para escribir en el ByteArrayOutputStream
    Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);

    try {
        // Escribir el registro en el encoder
        datumWriter.write(record, encoder);
        // Finalizar la escritura
        encoder.flush();
    } catch (IOException e) {
        throw new AvroSerializationException("Error serializing Avro record", e);
    }

    // Devolver los bytes serializados
    return outputStream.toByteArray();
    }

    public void close() {
        producer.close();
    }
}

Kafka Consumer mit DLQ Management.

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.DatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

public class AvroConsumer {
    private final KafkaConsumer<String, byte[]> consumer;
    private final Schema schema;

    public AvroConsumer(String bootstrapServers, String groupId) throws IOException {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");

        this.consumer = new KafkaConsumer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void consume(String topic) {
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, byte[]> record : records) {
                try {
                    processMessage(record.value());
                } catch (Exception e) {
                    handleProcessingError(e, record);
                }
            }
        }
    }

    private void processMessage(byte[] data) throws IOException {
        DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
        var decoder = DecoderFactory.get().binaryDecoder(data, null);
        GenericRecord record = reader.read(null, decoder);

        System.out.printf("Consumido mensaje: %s - %s - %s%n", 
            record.get("userId"), 
            record.get("userEmail"), 
            record.get("timestamp"));
    }

    private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) {
        System.err.println("Error processing message: " + e.getMessage());

        // Implement logic to save failed messages for later processing
        saveFailedMessage(record);
    }

    private void saveFailedMessage(ConsumerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_consumed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE,
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving consumed message to local storage: " + e.getMessage());
        }
    }

    public void close() {
        consumer.close();
    }
}

user-signed-up-2024-11-04
order-created-2024-11-04
payment-processed-2024-11-04

Code-Erklärung
Leistungsschalter:
Resilience4j wird zur Verwaltung des Leistungsschalterkreises des Herstellers verwendet. Es sind ein Schwellenwert für die Ausfallrate und eine Wartezeit im geöffneten Zustand konfiguriert.
Lokale Persistenz und DLQ:
Fehlgeschlagene Nachrichten werden sowohl in einer lokalen Datei (failed_messages.log) als auch in einer Fehlerwarteschlange (dead_letter_queue.log) gespeichert.
Fehlerbehandlung:
Im Producer und Consumer werden Fehler angemessen behandelt und protokolliert.
DLQ-Verarbeitung:
Der Verbraucher verarbeitet auch im DLQ gespeicherte Nachrichten, nachdem er Nachrichten aus dem Hauptthema konsumiert hat.
Protokollierung:
System.err- und System.out-Nachrichten werden zum Protokollieren von Fehlern und wichtigen Ereignissen verwendet.
Abschließende Überlegungen
Mit dieser Implementierung:
Es stellt sicher, dass Nachrichten zuverlässig gesendet und verarbeitet werden.
Für vorübergehende oder anhaltende Fehler ist eine ordnungsgemäße Behandlung vorgesehen.
Logic ermöglicht eine effektive Wiederherstellung durch die Verwendung einer Warteschlange für unzustellbare Nachrichten.
Der Leistungsschalter verhindert, dass das System bei längeren Ausfällen überlastet wird.
Dieser Ansatz schafft ein robustes System, das physische und logische Probleme bewältigen kann und gleichzeitig die ordnungsgemäße Zustellung von Nachrichten in Kafka aufrechterhält.

Das obige ist der detaillierte Inhalt vonSo erhalten und konsumieren Sie Nachrichten, die mit Kafka zugestellt und bestellt werden. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn