Rumah  >  Artikel  >  Java  >  Bagaimana untuk mendapatkan dan menggunakan mesej yang dihantar dan dipesan dengan Kafka

Bagaimana untuk mendapatkan dan menggunakan mesej yang dihantar dan dipesan dengan Kafka

Linda Hamilton
Linda Hamiltonasal
2024-11-06 21:08:02202semak imbas

Como conseguir y la consumición entregada y ordenada de mensajes con Kafka

Untuk memastikan acara dihantar dan digunakan dalam susunan yang benar-benar konsisten dalam Apache Kafka, adalah penting untuk memahami cara pembahagian mesej dan tugasan pengguna berfungsi.

Menggunakan Partition dalam Kafka

  1. Pembahagian Topik:

    • Kafka menyusun mesej ke dalam sekatan dalam topik. Setiap partition mengekalkan susunan mesej yang diterima, bermakna mesej diproses mengikut tertib yang dihantar ke partition itu.
    • Untuk memastikan pesanan, adalah penting bahawa semua mesej yang berkaitan dengan konteks yang sama (contohnya, ID pengguna atau ID transaksi) dihantar ke partition yang sama. Ini dicapai dengan menggunakan kunci partition semasa menghantar mesej. Kafka menggunakan kekunci ini untuk menentukan partition yang hendak dihantar mesej menggunakan fungsi cincang[1][5].
  2. Kunci Mesej:

    • Apabila menghantar mesej, kunci boleh ditentukan. Semua mesej dengan kunci yang sama akan dihantar ke partition yang sama, yang memastikan bahawa ia digunakan dalam susunan yang sama di mana ia dihasilkan. Contohnya, jika ID pengguna digunakan sebagai kunci, semua acara yang berkaitan dengan pengguna tersebut akan pergi ke partition yang sama.

Kumpulan Pengguna

  1. Tugasan Pengguna:

    • Pengguna di Kafka dikumpulkan ke dalam kumpulan pengguna. Setiap kumpulan boleh mempunyai berbilang pengguna, tetapi setiap partition hanya boleh dibaca oleh seorang pengguna dalam kumpulan pada satu masa.
    • Ini bermakna jika anda mempunyai lebih ramai pengguna daripada partition, sesetengah pengguna akan menjadi tidak aktif. Untuk mengekalkan ketertiban dan memaksimumkan kecekapan, adalah dinasihatkan untuk mempunyai sekurang-kurangnya partition sebanyak mana terdapat pengguna dalam kumpulan.
  2. Pengurusan Offset:

    • Kafka menyimpan keadaan baca setiap pengguna menggunakan offset, yang merupakan pengecam angka tambahan untuk setiap mesej dalam partition. Ini membolehkan pengguna menyambung semula tempat mereka berhenti sekiranya berlaku kegagalan.

Strategi Tambahan

  • Elakkan Lebihan: Apabila memilih kunci partition, adalah penting untuk mempertimbangkan pengedaran trafik untuk mengelakkan sesetengah partition terlebih muatan manakala yang lain kurang digunakan.
  • Replikasi dan Toleransi Kesalahan: Pastikan untuk mengkonfigurasi replikasi yang mencukupi (lebih daripada 1) untuk partition, yang bukan sahaja meningkatkan ketersediaan tetapi juga daya tahan sistem terhadap kegagalan.

Untuk melaksanakan sistem pengeluaran dan penggunaan mesej dalam Kafka menggunakan Avro, memastikan mesej diproses dengan teratur dan mengendalikan kemungkinan kegagalan, berikut ialah contoh lengkap. Ini termasuk takrifan skema Avro, kod pengeluar dan pengguna, serta strategi untuk mengendalikan ralat.
Skim Avro
Pertama, kami mentakrifkan skema Avro untuk muatan kami. Kami akan mencipta fail bernama user_signed_up.avsc yang menerangkan struktur mesej.

{
  "type": "record",
  "name": "UserSignedUp",
  "namespace": "com.example",
  "fields": [
    { "name": "userId", "type": "int" },
    { "name": "userEmail", "type": "string" },
    { "name": "timestamp", "type": "string" } // Formato ISO 8601
  ]
}

Generasi Utama
Untuk memastikan ketertiban dalam pengeluaran dan penggunaan mesej, kami akan menggunakan kunci yang distrukturkan sebagai tarikh jenis mesej, sebagai contoh: pengguna-mendaftar-2024-11-04.
Penerbit Kafka
Berikut ialah kod untuk pengeluar yang menghantar mesej kepada Kafka menggunakan skema Avro:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;

public class AvroProducer {
    private final KafkaProducer<String, byte[]> producer;
    private final Schema schema;

    public AvroProducer(String bootstrapServers) throws IOException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");

// Establecer la propiedad de reintentos, Número de reintentos
        properties.put(ProducerConfig.RETRIES_CONFIG, 3); 
// Asegura que todos los réplicas reconozcan la escritura,
        properties.put(ProducerConfig.ACKS_CONFIG, "all"); 
// Solo un mensaje a la vez
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); 
// Habilitar idempotencia, no quiero enviar duplicados
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 

        this.producer = new KafkaProducer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void sendMessage(String topic, int userId, String userEmail) {
        GenericRecord record = new GenericData.Record(schema);
        record.put("userId", userId);
        record.put("userEmail", userEmail);
        record.put("timestamp", java.time.Instant.now().toString());

        String key = String.format("user-signed-up-%s", java.time.LocalDate.now());

        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record));

        producer.send(producerRecord, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
**handleFailure(exception, producerRecord);
**            } else {
                System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset());
            }
        });
    }

private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) {
        // Log the error for monitoring
        System.err.println("Error sending message: " + exception.getMessage());

        // Implement local persistence as a fallback
        saveToLocalStorage(producerRecord);

        // Optionally: Notify an external monitoring system or alert
    }

    private void saveToLocalStorage(ProducerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE, 
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje guardado localmente para reenvío: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving to local storage: " + e.getMessage());
        }
    }

    private byte[] serialize(GenericRecord record) {
        // Crear un ByteArrayOutputStream para almacenar los bytes serializados
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    // Crear un escritor de datos para el registro Avro
    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema());

    // Crear un encoder para escribir en el ByteArrayOutputStream
    Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);

    try {
        // Escribir el registro en el encoder
        datumWriter.write(record, encoder);
        // Finalizar la escritura
        encoder.flush();
    } catch (IOException e) {
        throw new AvroSerializationException("Error serializing Avro record", e);
    }

    // Devolver los bytes serializados
    return outputStream.toByteArray();
    }

    public void close() {
        producer.close();
    }
}

Cuba Semula Pertimbangan
**Adalah penting untuk ambil perhatian bahawa apabila mendayakan percubaan semula, mungkin terdapat risiko penyusunan semula mesej jika tidak dikendalikan dengan betul.
Untuk mengelakkan perkara ini:
**max.in.flight.requests.per.connection
: Anda boleh menetapkan sifat ini kepada 1 untuk memastikan mesej dihantar satu demi satu dan diproses mengikut susunan. Walau bagaimanapun, ini mungkin menjejaskan prestasi.
Dengan konfigurasi ini dan pengendalian ralat yang betul, anda boleh memastikan pengeluar Kafka anda lebih mantap dan mampu menangani kegagalan dalam pengeluaran mesej sambil mengekalkan susunan yang diperlukan.

**Pengguna Kafka
**Pengguna yang membaca dan memproses mesej:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.DatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

public class AvroConsumer {
    private final KafkaConsumer<String, byte[]> consumer;
    private final Schema schema;

    public AvroConsumer(String bootstrapServers, String groupId) throws IOException {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");

        this.consumer = new KafkaConsumer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void consume(String topic) {
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, byte[]> record : records) {
                try {
                    processMessage(record.value());
                } catch (Exception e) {
                    handleProcessingError(e, record);
                }
            }
        }
    }

    private void processMessage(byte[] data) throws IOException {
        DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
        var decoder = DecoderFactory.get().binaryDecoder(data, null);
        GenericRecord record = reader.read(null, decoder);

        System.out.printf("Consumido mensaje: %s - %s - %s%n", 
            record.get("userId"), 
            record.get("userEmail"), 
            record.get("timestamp"));
    }

    private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) {
        System.err.println("Error processing message: " + e.getMessage());

        // Implement logic to save failed messages for later processing
        saveFailedMessage(record);
    }

    private void saveFailedMessage(ConsumerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_consumed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE,
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving consumed message to local storage: " + e.getMessage());
        }
    }

    public void close() {
        consumer.close();
    }
}

Contoh Kunci yang Realistik
Dalam persekitaran dengan banyak acara berbeza dan banyak partition berbeza, kunci realistik mungkin seperti:

{
  "type": "record",
  "name": "UserSignedUp",
  "namespace": "com.example",
  "fields": [
    { "name": "userId", "type": "int" },
    { "name": "userEmail", "type": "string" },
    { "name": "timestamp", "type": "string" } // Formato ISO 8601
  ]
}

Ini membenarkan semua acara yang berkaitan dengan jenis tertentu pada tarikh tertentu dihantar ke partition yang sama dan diproses mengikut susunan. Selain itu, anda boleh mempelbagaikan kunci dengan memasukkan butiran lanjut jika perlu (seperti sesi atau ID transaksi).
Dengan pelaksanaan dan strategi ini untuk mengendalikan kegagalan dan memastikan pesanan mesej dalam Kafka menggunakan Avro, anda boleh membina sistem yang mantap dan cekap untuk mengurus acara.

Kini Pengeluar dan pengguna Kafka yang agak berkebolehan.

Pengeluar Kafka dengan Pemutus Litar, Kegigihan Tempatan dan DLQ.

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;

public class AvroProducer {
    private final KafkaProducer<String, byte[]> producer;
    private final Schema schema;

    public AvroProducer(String bootstrapServers) throws IOException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");

// Establecer la propiedad de reintentos, Número de reintentos
        properties.put(ProducerConfig.RETRIES_CONFIG, 3); 
// Asegura que todos los réplicas reconozcan la escritura,
        properties.put(ProducerConfig.ACKS_CONFIG, "all"); 
// Solo un mensaje a la vez
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); 
// Habilitar idempotencia, no quiero enviar duplicados
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 

        this.producer = new KafkaProducer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void sendMessage(String topic, int userId, String userEmail) {
        GenericRecord record = new GenericData.Record(schema);
        record.put("userId", userId);
        record.put("userEmail", userEmail);
        record.put("timestamp", java.time.Instant.now().toString());

        String key = String.format("user-signed-up-%s", java.time.LocalDate.now());

        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record));

        producer.send(producerRecord, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
**handleFailure(exception, producerRecord);
**            } else {
                System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset());
            }
        });
    }

private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) {
        // Log the error for monitoring
        System.err.println("Error sending message: " + exception.getMessage());

        // Implement local persistence as a fallback
        saveToLocalStorage(producerRecord);

        // Optionally: Notify an external monitoring system or alert
    }

    private void saveToLocalStorage(ProducerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE, 
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje guardado localmente para reenvío: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving to local storage: " + e.getMessage());
        }
    }

    private byte[] serialize(GenericRecord record) {
        // Crear un ByteArrayOutputStream para almacenar los bytes serializados
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    // Crear un escritor de datos para el registro Avro
    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema());

    // Crear un encoder para escribir en el ByteArrayOutputStream
    Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);

    try {
        // Escribir el registro en el encoder
        datumWriter.write(record, encoder);
        // Finalizar la escritura
        encoder.flush();
    } catch (IOException e) {
        throw new AvroSerializationException("Error serializing Avro record", e);
    }

    // Devolver los bytes serializados
    return outputStream.toByteArray();
    }

    public void close() {
        producer.close();
    }
}

Pengguna Kafka dengan Pengurusan DLQ.

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.DatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

public class AvroConsumer {
    private final KafkaConsumer<String, byte[]> consumer;
    private final Schema schema;

    public AvroConsumer(String bootstrapServers, String groupId) throws IOException {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");

        this.consumer = new KafkaConsumer<>(properties);
        this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
    }

    public void consume(String topic) {
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, byte[]> record : records) {
                try {
                    processMessage(record.value());
                } catch (Exception e) {
                    handleProcessingError(e, record);
                }
            }
        }
    }

    private void processMessage(byte[] data) throws IOException {
        DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
        var decoder = DecoderFactory.get().binaryDecoder(data, null);
        GenericRecord record = reader.read(null, decoder);

        System.out.printf("Consumido mensaje: %s - %s - %s%n", 
            record.get("userId"), 
            record.get("userEmail"), 
            record.get("timestamp"));
    }

    private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) {
        System.err.println("Error processing message: " + e.getMessage());

        // Implement logic to save failed messages for later processing
        saveFailedMessage(record);
    }

    private void saveFailedMessage(ConsumerRecord<String, byte[]> record) {
        try {
            // Persist the failed message to a local file or database for later processing
            Files.write(new File("failed_consumed_messages.log").toPath(), 
                         (record.key() + ": " + new String(record.value()) + "\n").getBytes(), 
                         StandardOpenOption.CREATE,
                         StandardOpenOption.APPEND);
            System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key());
        } catch (IOException e) {
            System.err.println("Error saving consumed message to local storage: " + e.getMessage());
        }
    }

    public void close() {
        consumer.close();
    }
}

user-signed-up-2024-11-04
order-created-2024-11-04
payment-processed-2024-11-04

Penjelasan Kod
Pemutus Litar:
Resilience4j digunakan untuk menguruskan litar pemutus pengeluar. Ambang kadar kegagalan dan masa menunggu dalam keadaan terbuka dikonfigurasikan.
Kegigihan dan DLQ Tempatan:
Mesej yang gagal disimpan pada kedua-dua fail setempat (failed_messages.log) dan baris gilir ralat (dead_letter_queue.log).
Pengendalian Ralat:
Dalam pengeluar dan pengguna, ralat dikendalikan dengan sewajarnya dan direkodkan.
Pemprosesan DLQ:
Pengguna juga memproses mesej yang disimpan dalam DLQ selepas menggunakan mesej daripada topik utama.
Melog:
Mesej System.err dan System.out digunakan untuk log ralat dan peristiwa penting.
Pertimbangan Akhir
Dengan pelaksanaan ini:
Ia memastikan mesej dihantar dan diproses dengan cara yang berdaya tahan.
Pengendalian yang betul disediakan untuk ralat sementara atau berterusan.
Logik membolehkan pemulihan yang berkesan dengan menggunakan Baris Gilir Surat Mati.
Litar pemutus membantu menghalang sistem daripada menjadi tepu sekiranya berlaku kegagalan yang berpanjangan.
Pendekatan ini mewujudkan sistem teguh yang boleh menangani masalah fizikal dan logik sambil mengekalkan penyampaian mesej yang teratur dalam Kafka.

Atas ialah kandungan terperinci Bagaimana untuk mendapatkan dan menggunakan mesej yang dihantar dan dipesan dengan Kafka. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn