Maison  >  Article  >  Java  >  Comment recevoir et consommer des messages livrés et commandés avec Kafka

Comment recevoir et consommer des messages livrés et commandés avec Kafka

Linda Hamilton
Linda Hamiltonoriginal
2024-11-06 21:08:02202parcourir

Como conseguir y la consumición entregada y ordenada de mensajes con Kafka

Pour garantir que les événements sont envoyés et consommés dans un ordre totalement cohérent dans Apache Kafka, il est essentiel de comprendre comment fonctionnent le partitionnement des messages et l'affectation des consommateurs.

Utiliser des partitions dans Kafka

  1. Partitionnement des sujets :

    • Kafka organise les messages en partitions au sein d'un sujet. Chaque partition conserve l'ordre des messages qu'elle reçoit, ce qui signifie que les messages sont traités dans l'ordre dans lequel ils ont été envoyés à cette partition.
    • Pour assurer l'ordre, il est crucial que tous les messages liés au même contexte (par exemple, un identifiant utilisateur ou un identifiant de transaction) soient envoyés vers la même partition. Ceci est réalisé en utilisant une clé de partition lors de l'envoi de messages. Kafka utilise cette clé pour déterminer à quelle partition envoyer le message à l'aide d'une fonction de hachage[1][5].
  2. Clés de message :

    • Lors de l'envoi d'un message, une clé peut être spécifiée. Tous les messages avec la même clé seront envoyés à la même partition, ce qui garantit qu'ils seront consommés dans le même ordre dans lequel ils ont été produits. Par exemple, si l'ID utilisateur est utilisé comme clé, tous les événements liés à cet utilisateur iront vers la même partition.

Groupes de consommateurs

  1. Affectation du consommateur :

    • Les consommateurs de Kafka sont regroupés en groupes de consommateurs. Chaque groupe peut avoir plusieurs consommateurs, mais chaque partition ne peut être lue que par un seul consommateur du groupe à la fois.
    • Cela signifie que si vous avez plus de consommateurs que de partitions, certains consommateurs seront inactifs. Pour maintenir l'ordre et maximiser l'efficacité, il est conseillé d'avoir au moins autant de cloisons qu'il y a de consommateurs dans le groupe.
  2. Gestion des compensations :

    • Kafka stocke l'état de lecture de chaque consommateur à l'aide de offsets, qui sont des identifiants numériques incrémentiels pour chaque message au sein d'une partition. Cela permet aux consommateurs de reprendre là où ils s’étaient arrêtés en cas d’échec.

Stratégies supplémentaires

  • Éviter les surcharges : Lors du choix des clés de partition, il est important de prendre en compte la répartition du trafic pour éviter que certaines partitions ne soient surchargées tandis que d'autres sont sous-utilisées.
  • Réplication et tolérance aux pannes : Assurez-vous de configurer une réplication adéquate (supérieure à 1) pour les partitions, ce qui améliore non seulement la disponibilité mais également la résilience du système aux pannes.

Pour implémenter un système de production et de consommation de messages dans Kafka à l'aide d'Avro, garantissant que les messages sont traités dans l'ordre et gérant les éventuelles pannes, voici un exemple complet. Cela inclut la définition du schéma Avro, du code producteur et consommateur, ainsi que des stratégies de gestion des erreurs.
Schéma Avro
Tout d’abord, nous définissons le schéma Avro pour notre charge utile. Nous allons créer un fichier appelé user_signed_up.avsc qui décrit la structure du message.

{
  "type": "record",
  "name": "UserSignedUp",
  "namespace": "com.example",
  "fields": [
    { "name": "userId", "type": "int" },
    { "name": "userEmail", "type": "string" },
    { "name": "timestamp", "type": "string" } // Formato ISO 8601
  ]
}

Génération de clé
Pour assurer l'ordre dans la production et la consommation des messages, nous utiliserons une clé structurée en message-type-date, par exemple : user-signed-up-2024-11-04.
Producteur Kafka
Voici le code du producteur qui envoie des messages à Kafka en utilisant le schéma Avro :

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;

public class AvroProducer {
    private final KafkaProducer<String, byte[]> producer;
    private final Schema schema;

    public AvroProducer(String bootstrapServers) throws IOException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");

// Establecer la propiedad de reintentos, Número de reintentos
        properties.put(ProducerConfig.RETRIES_CONFIG, 3); 
// Asegura que todos los réplicas reconozcan la escritura,
        properties.put(ProducerConfig.ACKS_CONFIG, "all"); 
// Solo un mensaje a la vez
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); 
// Habilitar idempotencia, no quiero enviar duplicados
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 

        this.producer = new KafkaProducer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void sendMessage(String topic, int userId, String userEmail) {
        GenericRecord record = new GenericData.Record(schema);
        record.put("userId", userId);
        record.put("userEmail", userEmail);
        record.put("timestamp", java.time.Instant.now().toString());

        String key = String.format("user-signed-up-%s", java.time.LocalDate.now());

        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record));

        producer.send(producerRecord, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
**handleFailure(exception, producerRecord);
**            } else {
                System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset());
            }
        });
    }

private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) {
        // Log the error for monitoring
        System.err.println("Error sending message: " + exception.getMessage());

        // Implement local persistence as a fallback
        saveToLocalStorage(producerRecord);

        // Optionally: Notify an external monitoring system or alert
    }

    private void saveToLocalStorage(ProducerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE, 
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje guardado localmente para reenvío: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving to local storage: " + e.getMessage());
        }
    }

    private byte[] serialize(GenericRecord record) {
        // Crear un ByteArrayOutputStream para almacenar los bytes serializados
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    // Crear un escritor de datos para el registro Avro
    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema());

    // Crear un encoder para escribir en el ByteArrayOutputStream
    Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);

    try {
        // Escribir el registro en el encoder
        datumWriter.write(record, encoder);
        // Finalizar la escritura
        encoder.flush();
    } catch (IOException e) {
        throw new AvroSerializationException("Error serializing Avro record", e);
    }

    // Devolver los bytes serializados
    return outputStream.toByteArray();
    }

    public void close() {
        producer.close();
    }
}

Considérations relatives à la nouvelle tentative
**Il est important de noter que lors de l'activation des nouvelles tentatives, il peut y avoir un risque de réorganisation des messages s'ils ne sont pas traités correctement.
Pour éviter cela :
**max.in.flight.requests.per.connection
 : vous pouvez définir cette propriété sur 1 pour garantir que les messages sont envoyés un par un et traités dans l'ordre. Cependant, cela peut affecter les performances.
Avec cette configuration et une gestion appropriée des erreurs, vous pouvez vous assurer que votre producteur Kafka est plus robuste et capable de gérer les échecs dans la production de messages tout en maintenant l'ordre nécessaire.

**Consommateur Kafka
**Le consommateur qui lit et traite les messages :

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.DatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

public class AvroConsumer {
    private final KafkaConsumer<String, byte[]> consumer;
    private final Schema schema;

    public AvroConsumer(String bootstrapServers, String groupId) throws IOException {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");

        this.consumer = new KafkaConsumer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void consume(String topic) {
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, byte[]> record : records) {
                try {
                    processMessage(record.value());
                } catch (Exception e) {
                    handleProcessingError(e, record);
                }
            }
        }
    }

    private void processMessage(byte[] data) throws IOException {
        DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
        var decoder = DecoderFactory.get().binaryDecoder(data, null);
        GenericRecord record = reader.read(null, decoder);

        System.out.printf("Consumido mensaje: %s - %s - %s%n", 
            record.get("userId"), 
            record.get("userEmail"), 
            record.get("timestamp"));
    }

    private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) {
        System.err.println("Error processing message: " + e.getMessage());

        // Implement logic to save failed messages for later processing
        saveFailedMessage(record);
    }

    private void saveFailedMessage(ConsumerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_consumed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE,
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving consumed message to local storage: " + e.getMessage());
        }
    }

    public void close() {
        consumer.close();
    }
}

Exemple réaliste de clés
Dans un environnement avec de nombreux événements différents et de nombreuses partitions différentes, une clé réaliste pourrait ressembler à :

{
  "type": "record",
  "name": "UserSignedUp",
  "namespace": "com.example",
  "fields": [
    { "name": "userId", "type": "int" },
    { "name": "userEmail", "type": "string" },
    { "name": "timestamp", "type": "string" } // Formato ISO 8601
  ]
}

Cela permet à tous les événements liés à un type spécifique à une date spécifique d'être envoyés vers la même partition et traités dans l'ordre. De plus, vous pouvez diversifier les clés en incluant plus de détails si nécessaire (comme un identifiant de session ou de transaction).
Avec cette implémentation et des stratégies pour gérer les échecs et garantir l'ordre des messages dans Kafka à l'aide d'Avro, vous pouvez créer un système robuste et efficace pour gérer les événements.

Maintenant, un producteur et consommateur Kafka un peu plus compétent.

Producteur Kafka avec disjoncteur, persistance locale et DLQ.

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;

public class AvroProducer {
    private final KafkaProducer<String, byte[]> producer;
    private final Schema schema;

    public AvroProducer(String bootstrapServers) throws IOException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");

// Establecer la propiedad de reintentos, Número de reintentos
        properties.put(ProducerConfig.RETRIES_CONFIG, 3); 
// Asegura que todos los réplicas reconozcan la escritura,
        properties.put(ProducerConfig.ACKS_CONFIG, "all"); 
// Solo un mensaje a la vez
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); 
// Habilitar idempotencia, no quiero enviar duplicados
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 

        this.producer = new KafkaProducer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void sendMessage(String topic, int userId, String userEmail) {
        GenericRecord record = new GenericData.Record(schema);
        record.put("userId", userId);
        record.put("userEmail", userEmail);
        record.put("timestamp", java.time.Instant.now().toString());

        String key = String.format("user-signed-up-%s", java.time.LocalDate.now());

        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record));

        producer.send(producerRecord, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
**handleFailure(exception, producerRecord);
**            } else {
                System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset());
            }
        });
    }

private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) {
        // Log the error for monitoring
        System.err.println("Error sending message: " + exception.getMessage());

        // Implement local persistence as a fallback
        saveToLocalStorage(producerRecord);

        // Optionally: Notify an external monitoring system or alert
    }

    private void saveToLocalStorage(ProducerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE, 
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje guardado localmente para reenvío: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving to local storage: " + e.getMessage());
        }
    }

    private byte[] serialize(GenericRecord record) {
        // Crear un ByteArrayOutputStream para almacenar los bytes serializados
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    // Crear un escritor de datos para el registro Avro
    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema());

    // Crear un encoder para escribir en el ByteArrayOutputStream
    Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);

    try {
        // Escribir el registro en el encoder
        datumWriter.write(record, encoder);
        // Finalizar la escritura
        encoder.flush();
    } catch (IOException e) {
        throw new AvroSerializationException("Error serializing Avro record", e);
    }

    // Devolver los bytes serializados
    return outputStream.toByteArray();
    }

    public void close() {
        producer.close();
    }
}

Consommateur Kafka avec gestion DLQ.

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.DatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

public class AvroConsumer {
    private final KafkaConsumer<String, byte[]> consumer;
    private final Schema schema;

    public AvroConsumer(String bootstrapServers, String groupId) throws IOException {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");

        this.consumer = new KafkaConsumer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void consume(String topic) {
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, byte[]> record : records) {
                try {
                    processMessage(record.value());
                } catch (Exception e) {
                    handleProcessingError(e, record);
                }
            }
        }
    }

    private void processMessage(byte[] data) throws IOException {
        DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
        var decoder = DecoderFactory.get().binaryDecoder(data, null);
        GenericRecord record = reader.read(null, decoder);

        System.out.printf("Consumido mensaje: %s - %s - %s%n", 
            record.get("userId"), 
            record.get("userEmail"), 
            record.get("timestamp"));
    }

    private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) {
        System.err.println("Error processing message: " + e.getMessage());

        // Implement logic to save failed messages for later processing
        saveFailedMessage(record);
    }

    private void saveFailedMessage(ConsumerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_consumed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE,
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving consumed message to local storage: " + e.getMessage());
        }
    }

    public void close() {
        consumer.close();
    }
}

user-signed-up-2024-11-04
order-created-2024-11-04
payment-processed-2024-11-04

Explication du code
Disjoncteur :
Resilience4j est utilisé pour gérer le circuit disjoncteur du producteur. Un seuil de taux de panne et un temps d'attente à l'état ouvert sont configurés.
Persistance locale et DLQ :
Les messages ayant échoué sont enregistrés à la fois dans un fichier local (failed_messages.log) et dans une file d'attente d'erreurs (dead_letter_queue.log).
Gestion des erreurs :
Chez le producteur et le consommateur, les erreurs sont traitées de manière appropriée et enregistrées.
Traitement DLQ :
Le consommateur traite également les messages stockés dans le DLQ après avoir consommé les messages du sujet principal.
Journalisation :
Les messages System.err et System.out sont utilisés pour enregistrer les erreurs et les événements importants.
Considérations finales
Avec cette implémentation :
Il garantit que les messages sont envoyés et traités de manière résiliente.
Un traitement approprié est prévu pour les erreurs temporaires ou persistantes.
La logique permet une récupération efficace en utilisant une file d'attente de lettres mortes.
Le circuit disjoncteur permet d'éviter la saturation du système en cas de pannes prolongées.
Cette approche crée un système robuste capable de gérer les problèmes physiques et logiques tout en maintenant une livraison ordonnée des messages dans Kafka.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn