


Bagaimana untuk mendapatkan dan menggunakan mesej yang dihantar dan dipesan dengan Kafka
Untuk memastikan acara dihantar dan digunakan dalam susunan yang benar-benar konsisten dalam Apache Kafka, adalah penting untuk memahami cara pembahagian mesej dan tugasan pengguna berfungsi.
Menggunakan Partition dalam Kafka
-
Pembahagian Topik:
- Kafka menyusun mesej ke dalam sekatan dalam topik. Setiap partition mengekalkan susunan mesej yang diterima, bermakna mesej diproses mengikut tertib yang dihantar ke partition itu.
- Untuk memastikan pesanan, adalah penting bahawa semua mesej yang berkaitan dengan konteks yang sama (contohnya, ID pengguna atau ID transaksi) dihantar ke partition yang sama. Ini dicapai dengan menggunakan kunci partition semasa menghantar mesej. Kafka menggunakan kekunci ini untuk menentukan partition yang hendak dihantar mesej menggunakan fungsi cincang[1][5].
-
Kunci Mesej:
- Apabila menghantar mesej, kunci boleh ditentukan. Semua mesej dengan kunci yang sama akan dihantar ke partition yang sama, yang memastikan bahawa ia digunakan dalam susunan yang sama di mana ia dihasilkan. Contohnya, jika ID pengguna digunakan sebagai kunci, semua acara yang berkaitan dengan pengguna tersebut akan pergi ke partition yang sama.
Kumpulan Pengguna
-
Tugasan Pengguna:
- Pengguna di Kafka dikumpulkan ke dalam kumpulan pengguna. Setiap kumpulan boleh mempunyai berbilang pengguna, tetapi setiap partition hanya boleh dibaca oleh seorang pengguna dalam kumpulan pada satu masa.
- Ini bermakna jika anda mempunyai lebih ramai pengguna daripada partition, sesetengah pengguna akan menjadi tidak aktif. Untuk mengekalkan ketertiban dan memaksimumkan kecekapan, adalah dinasihatkan untuk mempunyai sekurang-kurangnya partition sebanyak mana terdapat pengguna dalam kumpulan.
-
Pengurusan Offset:
- Kafka menyimpan keadaan baca setiap pengguna menggunakan offset, yang merupakan pengecam angka tambahan untuk setiap mesej dalam partition. Ini membolehkan pengguna menyambung semula tempat mereka berhenti sekiranya berlaku kegagalan.
Strategi Tambahan
- Elakkan Lebihan: Apabila memilih kunci partition, adalah penting untuk mempertimbangkan pengedaran trafik untuk mengelakkan sesetengah partition terlebih muatan manakala yang lain kurang digunakan.
- Replikasi dan Toleransi Kesalahan: Pastikan untuk mengkonfigurasi replikasi yang mencukupi (lebih daripada 1) untuk partition, yang bukan sahaja meningkatkan ketersediaan tetapi juga daya tahan sistem terhadap kegagalan.
Untuk melaksanakan sistem pengeluaran dan penggunaan mesej dalam Kafka menggunakan Avro, memastikan mesej diproses dengan teratur dan mengendalikan kemungkinan kegagalan, berikut ialah contoh lengkap. Ini termasuk takrifan skema Avro, kod pengeluar dan pengguna, serta strategi untuk mengendalikan ralat.
Skim Avro
Pertama, kami mentakrifkan skema Avro untuk muatan kami. Kami akan mencipta fail bernama user_signed_up.avsc yang menerangkan struktur mesej.
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
Generasi Utama
Untuk memastikan ketertiban dalam pengeluaran dan penggunaan mesej, kami akan menggunakan kunci yang distrukturkan sebagai tarikh jenis mesej, sebagai contoh: pengguna-mendaftar-2024-11-04.
Penerbit Kafka
Berikut ialah kod untuk pengeluar yang menghantar mesej kepada Kafka menggunakan skema Avro:
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<string byte> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<string byte> producerRecord = new ProducerRecord(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<string byte> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<genericrecord> datumWriter = new GenericDatumWriter(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } } </genericrecord></string></string></string></string>
Cuba Semula Pertimbangan
**Adalah penting untuk ambil perhatian bahawa apabila mendayakan percubaan semula, mungkin terdapat risiko penyusunan semula mesej jika tidak dikendalikan dengan betul.
Untuk mengelakkan perkara ini:
**max.in.flight.requests.per.connection: Anda boleh menetapkan sifat ini kepada 1 untuk memastikan mesej dihantar satu demi satu dan diproses mengikut susunan. Walau bagaimanapun, ini mungkin menjejaskan prestasi.
Dengan konfigurasi ini dan pengendalian ralat yang betul, anda boleh memastikan pengeluar Kafka anda lebih mantap dan mampu menangani kegagalan dalam pengeluaran mesej sambil mengekalkan susunan yang diperlukan.
**Pengguna Kafka
**Pengguna yang membaca dan memproses mesej:
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<string byte> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<string byte> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<string byte> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<genericrecord> reader = new GenericDatumReader(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<string byte> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } } </string></string></genericrecord></string></string></string>
Contoh Kunci yang Realistik
Dalam persekitaran dengan banyak acara berbeza dan banyak partition berbeza, kunci realistik mungkin seperti:
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
Ini membenarkan semua acara yang berkaitan dengan jenis tertentu pada tarikh tertentu dihantar ke partition yang sama dan diproses mengikut susunan. Selain itu, anda boleh mempelbagaikan kunci dengan memasukkan butiran lanjut jika perlu (seperti sesi atau ID transaksi).
Dengan pelaksanaan dan strategi ini untuk mengendalikan kegagalan dan memastikan pesanan mesej dalam Kafka menggunakan Avro, anda boleh membina sistem yang mantap dan cekap untuk mengurus acara.
Kini Pengeluar dan pengguna Kafka yang agak berkebolehan.
Pengeluar Kafka dengan Pemutus Litar, Kegigihan Tempatan dan DLQ.
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<string byte> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<string byte> producerRecord = new ProducerRecord(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<string byte> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<genericrecord> datumWriter = new GenericDatumWriter(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } } </genericrecord></string></string></string></string>
Pengguna Kafka dengan Pengurusan DLQ.
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<string byte> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<string byte> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<string byte> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<genericrecord> reader = new GenericDatumReader(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<string byte> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } } </string></string></genericrecord></string></string></string>
user-signed-up-2024-11-04 order-created-2024-11-04 payment-processed-2024-11-04
Penjelasan Kod
Pemutus Litar:
Resilience4j digunakan untuk menguruskan litar pemutus pengeluar. Ambang kadar kegagalan dan masa menunggu dalam keadaan terbuka dikonfigurasikan.
Kegigihan dan DLQ Tempatan:
Mesej yang gagal disimpan pada kedua-dua fail setempat (failed_messages.log) dan baris gilir ralat (dead_letter_queue.log).
Pengendalian Ralat:
Dalam pengeluar dan pengguna, ralat dikendalikan dengan sewajarnya dan direkodkan.
Pemprosesan DLQ:
Pengguna juga memproses mesej yang disimpan dalam DLQ selepas menggunakan mesej daripada topik utama.
Melog:
Mesej System.err dan System.out digunakan untuk log ralat dan peristiwa penting.
Pertimbangan Akhir
Dengan pelaksanaan ini:
Ia memastikan mesej dihantar dan diproses dengan cara yang berdaya tahan.
Pengendalian yang betul disediakan untuk ralat sementara atau berterusan.
Logik membolehkan pemulihan yang berkesan dengan menggunakan Baris Gilir Surat Mati.
Litar pemutus membantu menghalang sistem daripada menjadi tepu sekiranya berlaku kegagalan yang berpanjangan.
Pendekatan ini mewujudkan sistem teguh yang boleh menangani masalah fizikal dan logik sambil mengekalkan penyampaian mesej yang teratur dalam Kafka.
Atas ialah kandungan terperinci Bagaimana untuk mendapatkan dan menggunakan mesej yang dihantar dan dipesan dengan Kafka. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Alat AI Hot

Undresser.AI Undress
Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover
Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool
Gambar buka pakaian secara percuma

Clothoff.io
Penyingkiran pakaian AI

AI Hentai Generator
Menjana ai hentai secara percuma.

Artikel Panas

Alat panas

MinGW - GNU Minimalis untuk Windows
Projek ini dalam proses untuk dipindahkan ke osdn.net/projects/mingw, anda boleh terus mengikuti kami di sana. MinGW: Port Windows asli bagi GNU Compiler Collection (GCC), perpustakaan import yang boleh diedarkan secara bebas dan fail pengepala untuk membina aplikasi Windows asli termasuk sambungan kepada masa jalan MSVC untuk menyokong fungsi C99. Semua perisian MinGW boleh dijalankan pada platform Windows 64-bit.

Notepad++7.3.1
Editor kod yang mudah digunakan dan percuma

Versi Mac WebStorm
Alat pembangunan JavaScript yang berguna

Dreamweaver Mac版
Alat pembangunan web visual

SublimeText3 versi Mac
Perisian penyuntingan kod peringkat Tuhan (SublimeText3)