ホームページ  >  記事  >  Java  >  Kafka で配信および注文されたメッセージを取得して消費する方法

Kafka で配信および注文されたメッセージを取得して消費する方法

Linda Hamilton
Linda Hamiltonオリジナル
2024-11-06 21:08:02202ブラウズ

Como conseguir y la consumición entregada y ordenada de mensajes con Kafka

Apache Kafka でイベントが完全に一貫した順序で送信および消費されるようにするには、メッセージの分割とコンシューマの割り当てがどのように機能するかを理解することが不可欠です。

Kafka でのパーティションの使用

  1. トピックの分割:

    • Kafka はメッセージをトピック内の パーティション に編成します。各パーティションは、受信したメッセージの順序を維持します。つまり、メッセージはそのパーティションに送信された順序で処理されます。
    • 順序を確保するには、同じコンテキスト (ユーザー ID やトランザクション ID など) に関連するすべてのメッセージが同じパーティションに送信されることが重要です。これは、メッセージの送信時にパーティション キーを使用することで実現されます。 Kafka はこのキーを使用して、ハッシュ関数を使用してメッセージを送信するパーティションを決定します[1][5]。
  2. メッセージキー:

    • メッセージを送信するときに、キーを指​​定できます。同じキーを持つすべてのメッセージは同じパーティションに送信されるため、メッセージは生成されたのと同じ順序で消費されます。たとえば、ユーザー ID がキーとして使用される場合、そのユーザーに関連するすべてのイベントは同じパーティションに移動します。

消費者団体

  1. 消費者割り当て:

    • Kafka のコンシューマは、コンシューマ グループ にグループ化されます。各グループには複数のコンシューマを含めることができますが、各パーティションを一度に読み取ることができるのは、グループ内の 1 人のコンシューマのみです。
    • これは、パーティションより多くのコンシューマがある場合、一部のコンシューマが非アクティブになることを意味します。順序を維持し、効率を最大化するには、少なくともグループ内のコンシューマーの数と同じ数のパーティションを作成することをお勧めします。
  2. オフセット管理:

    • Kafka は、オフセット を使用して各コンシューマーの読み取り状態を保存します。これは、パーティション内の各メッセージの増分数値識別子です。これにより、消費者は障害が発生した場合に中断したところから再開できます。

追加の戦略

  • 過負荷の回避: パーティション キーを選択するときは、一部のパーティションが過負荷になり、他のパーティションが十分に活用されていないことを避けるためにトラフィック分散を考慮することが重要です。
  • レプリケーションとフォールト トレランス: パーティションに対して適切なレプリケーション (1 より大きい) を構成してください。これにより、可用性が向上するだけでなく、障害に対するシステムの回復力も向上します。

Avro を使用して Kafka でメッセージの生成と消費のシステムを実装し、メッセージが順序どおりに処理され、起こり得る失敗を処理できるようにするための完全な例を次に示します。これには、Avro スキーマの定義、プロデューサー コードとコンシューマー コード、エラー処理の戦略が含まれます。
アブロスキーム
まず、ペイロードの Avro スキーマを定義します。メッセージの構造を記述する user_signed_up.avsc というファイルを作成します。

{
  "type": "record",
  "name": "UserSignedUp",
  "namespace": "com.example",
  "fields": [
    { "name": "userId", "type": "int" },
    { "name": "userEmail", "type": "string" },
    { "name": "timestamp", "type": "string" } // Formato ISO 8601
  ]
}

鍵の生成
メッセージの生成と消費の順序を確保するために、message-type-date として構造化されたキーを使用します (例: user-signed-up-2024-11-04)。
プロデューサー カフカ
以下は、Avro スキームを使用して Kafka にメッセージを送信する プロデューサー のコードです。

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;

public class AvroProducer {
    private final KafkaProducer<String, byte[]> producer;
    private final Schema schema;

    public AvroProducer(String bootstrapServers) throws IOException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");

// Establecer la propiedad de reintentos, Número de reintentos
        properties.put(ProducerConfig.RETRIES_CONFIG, 3); 
// Asegura que todos los réplicas reconozcan la escritura,
        properties.put(ProducerConfig.ACKS_CONFIG, "all"); 
// Solo un mensaje a la vez
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); 
// Habilitar idempotencia, no quiero enviar duplicados
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 

        this.producer = new KafkaProducer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void sendMessage(String topic, int userId, String userEmail) {
        GenericRecord record = new GenericData.Record(schema);
        record.put("userId", userId);
        record.put("userEmail", userEmail);
        record.put("timestamp", java.time.Instant.now().toString());

        String key = String.format("user-signed-up-%s", java.time.LocalDate.now());

        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record));

        producer.send(producerRecord, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
**handleFailure(exception, producerRecord);
**            } else {
                System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset());
            }
        });
    }

private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) {
        // Log the error for monitoring
        System.err.println("Error sending message: " + exception.getMessage());

        // Implement local persistence as a fallback
        saveToLocalStorage(producerRecord);

        // Optionally: Notify an external monitoring system or alert
    }

    private void saveToLocalStorage(ProducerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE, 
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje guardado localmente para reenvío: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving to local storage: " + e.getMessage());
        }
    }

    private byte[] serialize(GenericRecord record) {
        // Crear un ByteArrayOutputStream para almacenar los bytes serializados
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    // Crear un escritor de datos para el registro Avro
    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema());

    // Crear un encoder para escribir en el ByteArrayOutputStream
    Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);

    try {
        // Escribir el registro en el encoder
        datumWriter.write(record, encoder);
        // Finalizar la escritura
        encoder.flush();
    } catch (IOException e) {
        throw new AvroSerializationException("Error serializing Avro record", e);
    }

    // Devolver los bytes serializados
    return outputStream.toByteArray();
    }

    public void close() {
        producer.close();
    }
}

再試行に関する考慮事項
**再試行を有効にする場合、適切に処理しないとメッセージの順序が変更されるリスクがあることに注意することが重要です。
これを回避するには:
**max.in.flight.requests.per.connection
: このプロパティを 1 に設定すると、メッセージが一度に 1 つずつ送信され、順番に処理されるようになります。ただし、これはパフォーマンスに影響を与える可能性があります。
この構成と適切なエラー処理により、Kafka プロデューサがより堅牢になり、必要な順序を維持しながらメッセージ生成の障害を処理できるようになります。

**Kafka コンシューマー
**メッセージを読んで処理するコンシューマ:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.DatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

public class AvroConsumer {
    private final KafkaConsumer<String, byte[]> consumer;
    private final Schema schema;

    public AvroConsumer(String bootstrapServers, String groupId) throws IOException {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");

        this.consumer = new KafkaConsumer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void consume(String topic) {
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, byte[]> record : records) {
                try {
                    processMessage(record.value());
                } catch (Exception e) {
                    handleProcessingError(e, record);
                }
            }
        }
    }

    private void processMessage(byte[] data) throws IOException {
        DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
        var decoder = DecoderFactory.get().binaryDecoder(data, null);
        GenericRecord record = reader.read(null, decoder);

        System.out.printf("Consumido mensaje: %s - %s - %s%n", 
            record.get("userId"), 
            record.get("userEmail"), 
            record.get("timestamp"));
    }

    private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) {
        System.err.println("Error processing message: " + e.getMessage());

        // Implement logic to save failed messages for later processing
        saveFailedMessage(record);
    }

    private void saveFailedMessage(ConsumerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_consumed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE,
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving consumed message to local storage: " + e.getMessage());
        }
    }

    public void close() {
        consumer.close();
    }
}

キーの現実的な例
多くの異なるイベントと多くの異なるパーティションがある環境では、現実的なキーは次のようになります。

{
  "type": "record",
  "name": "UserSignedUp",
  "namespace": "com.example",
  "fields": [
    { "name": "userId", "type": "int" },
    { "name": "userEmail", "type": "string" },
    { "name": "timestamp", "type": "string" } // Formato ISO 8601
  ]
}

これにより、特定の日付の特定のタイプに関連するすべてのイベントを同じパーティションに送信し、順番に処理できます。さらに、必要に応じて詳細 (セッション ID やトランザクション ID など) を含めることでキーを多様化できます。
Avro を使用して Kafka で障害を処理し、メッセージの順序を確保するためのこの実装と戦略により、イベントを管理するための堅牢で効率的なシステムを構築できます。

現在は、より有能な Kafka プロデューサー兼コンシューマーです。

サーキット ブレーカー、ローカル永続性、DLQ を備えた Kafka プロデューサー。

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;

public class AvroProducer {
    private final KafkaProducer<String, byte[]> producer;
    private final Schema schema;

    public AvroProducer(String bootstrapServers) throws IOException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");

// Establecer la propiedad de reintentos, Número de reintentos
        properties.put(ProducerConfig.RETRIES_CONFIG, 3); 
// Asegura que todos los réplicas reconozcan la escritura,
        properties.put(ProducerConfig.ACKS_CONFIG, "all"); 
// Solo un mensaje a la vez
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); 
// Habilitar idempotencia, no quiero enviar duplicados
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 

        this.producer = new KafkaProducer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void sendMessage(String topic, int userId, String userEmail) {
        GenericRecord record = new GenericData.Record(schema);
        record.put("userId", userId);
        record.put("userEmail", userEmail);
        record.put("timestamp", java.time.Instant.now().toString());

        String key = String.format("user-signed-up-%s", java.time.LocalDate.now());

        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record));

        producer.send(producerRecord, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
**handleFailure(exception, producerRecord);
**            } else {
                System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset());
            }
        });
    }

private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) {
        // Log the error for monitoring
        System.err.println("Error sending message: " + exception.getMessage());

        // Implement local persistence as a fallback
        saveToLocalStorage(producerRecord);

        // Optionally: Notify an external monitoring system or alert
    }

    private void saveToLocalStorage(ProducerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE, 
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje guardado localmente para reenvío: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving to local storage: " + e.getMessage());
        }
    }

    private byte[] serialize(GenericRecord record) {
        // Crear un ByteArrayOutputStream para almacenar los bytes serializados
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    // Crear un escritor de datos para el registro Avro
    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema());

    // Crear un encoder para escribir en el ByteArrayOutputStream
    Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);

    try {
        // Escribir el registro en el encoder
        datumWriter.write(record, encoder);
        // Finalizar la escritura
        encoder.flush();
    } catch (IOException e) {
        throw new AvroSerializationException("Error serializing Avro record", e);
    }

    // Devolver los bytes serializados
    return outputStream.toByteArray();
    }

    public void close() {
        producer.close();
    }
}

DLQ 管理を備えた Kafka コンシューマ。

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.DatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

public class AvroConsumer {
    private final KafkaConsumer<String, byte[]> consumer;
    private final Schema schema;

    public AvroConsumer(String bootstrapServers, String groupId) throws IOException {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");

        this.consumer = new KafkaConsumer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void consume(String topic) {
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, byte[]> record : records) {
                try {
                    processMessage(record.value());
                } catch (Exception e) {
                    handleProcessingError(e, record);
                }
            }
        }
    }

    private void processMessage(byte[] data) throws IOException {
        DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
        var decoder = DecoderFactory.get().binaryDecoder(data, null);
        GenericRecord record = reader.read(null, decoder);

        System.out.printf("Consumido mensaje: %s - %s - %s%n", 
            record.get("userId"), 
            record.get("userEmail"), 
            record.get("timestamp"));
    }

    private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) {
        System.err.println("Error processing message: " + e.getMessage());

        // Implement logic to save failed messages for later processing
        saveFailedMessage(record);
    }

    private void saveFailedMessage(ConsumerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_consumed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE,
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving consumed message to local storage: " + e.getMessage());
        }
    }

    public void close() {
        consumer.close();
    }
}

user-signed-up-2024-11-04
order-created-2024-11-04
payment-processed-2024-11-04

コードの説明
サーキットブレーカー:
Resilience4j は、プロデューサーのブレーカー回路を管理するために使用されます。故障率の閾値とオープン状態での待ち時間を設定します。
ローカル永続性と DLQ:
失敗したメッセージは、ローカル ファイル (failed_messages.log) とエラー キュー (dead_letter_queue.log) の両方に保存されます。
エラー処理:
プロデューサーとコンシューマーでは、エラーが適切に処理され、ログに記録されます。
DLQ 処理:
コンシューマーは、メイン トピックからのメッセージを消費した後、DLQ に保存されたメッセージも処理します。
ロギング:
System.err および System.out メッセージは、エラーと重要なイベントを記録するために使用されます。
最終的な考慮事項
この実装では:
これにより、メッセージが復元力のある方法で送信および処理されることが保証されます。
一時的または永続的なエラーに対して適切な処理が提供されます。
ロジックにより、Dead Letter Queue を使用することで効果的な回復が可能になります。
ブレーカー回路は、長期にわたる障害が発生した場合にシステムが飽和状態になるのを防ぎます。
このアプローチにより、Kafka でのメッセージの秩序ある配信を維持しながら、物理的および論理的問題を処理できる堅牢なシステムが作成されます。

以上がKafka で配信および注文されたメッセージを取得して消費する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。