Apache Kafka でイベントが完全に一貫した順序で送信および消費されるようにするには、メッセージの分割とコンシューマの割り当てがどのように機能するかを理解することが不可欠です。
トピックの分割:
メッセージキー:
消費者割り当て:
オフセット管理:
Avro を使用して Kafka でメッセージの生成と消費のシステムを実装し、メッセージが順序どおりに処理され、起こり得る失敗を処理できるようにするための完全な例を次に示します。これには、Avro スキーマの定義、プロデューサー コードとコンシューマー コード、エラー処理の戦略が含まれます。
アブロスキーム
まず、ペイロードの Avro スキーマを定義します。メッセージの構造を記述する user_signed_up.avsc というファイルを作成します。
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
鍵の生成
メッセージの生成と消費の順序を確保するために、message-type-date として構造化されたキーを使用します (例: user-signed-up-2024-11-04)。
プロデューサー カフカ
以下は、Avro スキームを使用して Kafka にメッセージを送信する プロデューサー のコードです。
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<String, byte[]> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } }
再試行に関する考慮事項
**再試行を有効にする場合、適切に処理しないとメッセージの順序が変更されるリスクがあることに注意することが重要です。
これを回避するには:
**max.in.flight.requests.per.connection: このプロパティを 1 に設定すると、メッセージが一度に 1 つずつ送信され、順番に処理されるようになります。ただし、これはパフォーマンスに影響を与える可能性があります。
この構成と適切なエラー処理により、Kafka プロデューサがより堅牢になり、必要な順序を維持しながらメッセージ生成の障害を処理できるようになります。
**Kafka コンシューマー
**メッセージを読んで処理するコンシューマ:
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<String, byte[]> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, byte[]> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } }
キーの現実的な例
多くの異なるイベントと多くの異なるパーティションがある環境では、現実的なキーは次のようになります。
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
これにより、特定の日付の特定のタイプに関連するすべてのイベントを同じパーティションに送信し、順番に処理できます。さらに、必要に応じて詳細 (セッション ID やトランザクション ID など) を含めることでキーを多様化できます。
Avro を使用して Kafka で障害を処理し、メッセージの順序を確保するためのこの実装と戦略により、イベントを管理するための堅牢で効率的なシステムを構築できます。
現在は、より有能な Kafka プロデューサー兼コンシューマーです。
サーキット ブレーカー、ローカル永続性、DLQ を備えた Kafka プロデューサー。
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<String, byte[]> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } }
DLQ 管理を備えた Kafka コンシューマ。
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<String, byte[]> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, byte[]> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } }
user-signed-up-2024-11-04 order-created-2024-11-04 payment-processed-2024-11-04
コードの説明
サーキットブレーカー:
Resilience4j は、プロデューサーのブレーカー回路を管理するために使用されます。故障率の閾値とオープン状態での待ち時間を設定します。
ローカル永続性と DLQ:
失敗したメッセージは、ローカル ファイル (failed_messages.log) とエラー キュー (dead_letter_queue.log) の両方に保存されます。
エラー処理:
プロデューサーとコンシューマーでは、エラーが適切に処理され、ログに記録されます。
DLQ 処理:
コンシューマーは、メイン トピックからのメッセージを消費した後、DLQ に保存されたメッセージも処理します。
ロギング:
System.err および System.out メッセージは、エラーと重要なイベントを記録するために使用されます。
最終的な考慮事項
この実装では:
これにより、メッセージが復元力のある方法で送信および処理されることが保証されます。
一時的または永続的なエラーに対して適切な処理が提供されます。
ロジックにより、Dead Letter Queue を使用することで効果的な回復が可能になります。
ブレーカー回路は、長期にわたる障害が発生した場合にシステムが飽和状態になるのを防ぎます。
このアプローチにより、Kafka でのメッセージの秩序ある配信を維持しながら、物理的および論理的問題を処理できる堅牢なシステムが作成されます。
以上がKafka で配信および注文されたメッセージを取得して消費する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。