Apache Kafka でイベントが完全に一貫した順序で送信および消費されるようにするには、メッセージの分割とコンシューマの割り当てがどのように機能するかを理解することが不可欠です。
Kafka でのパーティションの使用
-
トピックの分割:
- Kafka はメッセージをトピック内の パーティション に編成します。各パーティションは、受信したメッセージの順序を維持します。つまり、メッセージはそのパーティションに送信された順序で処理されます。
- 順序を確保するには、同じコンテキスト (ユーザー ID やトランザクション ID など) に関連するすべてのメッセージが同じパーティションに送信されることが重要です。これは、メッセージの送信時にパーティション キーを使用することで実現されます。 Kafka はこのキーを使用して、ハッシュ関数を使用してメッセージを送信するパーティションを決定します[1][5]。
-
メッセージキー:
- メッセージを送信するときに、キーを指定できます。同じキーを持つすべてのメッセージは同じパーティションに送信されるため、メッセージは生成されたのと同じ順序で消費されます。たとえば、ユーザー ID がキーとして使用される場合、そのユーザーに関連するすべてのイベントは同じパーティションに移動します。
消費者団体
-
消費者割り当て:
- Kafka のコンシューマは、コンシューマ グループ にグループ化されます。各グループには複数のコンシューマを含めることができますが、各パーティションを一度に読み取ることができるのは、グループ内の 1 人のコンシューマのみです。
- これは、パーティションより多くのコンシューマがある場合、一部のコンシューマが非アクティブになることを意味します。順序を維持し、効率を最大化するには、少なくともグループ内のコンシューマーの数と同じ数のパーティションを作成することをお勧めします。
-
オフセット管理:
- Kafka は、オフセット を使用して各コンシューマーの読み取り状態を保存します。これは、パーティション内の各メッセージの増分数値識別子です。これにより、消費者は障害が発生した場合に中断したところから再開できます。
追加の戦略
- 過負荷の回避: パーティション キーを選択するときは、一部のパーティションが過負荷になり、他のパーティションが十分に活用されていないことを避けるためにトラフィック分散を考慮することが重要です。
- レプリケーションとフォールト トレランス: パーティションに対して適切なレプリケーション (1 より大きい) を構成してください。これにより、可用性が向上するだけでなく、障害に対するシステムの回復力も向上します。
Avro を使用して Kafka でメッセージの生成と消費のシステムを実装し、メッセージが順序どおりに処理され、起こり得る失敗を処理できるようにするための完全な例を次に示します。これには、Avro スキーマの定義、プロデューサー コードとコンシューマー コード、エラー処理の戦略が含まれます。
アブロスキーム
まず、ペイロードの Avro スキーマを定義します。メッセージの構造を記述する user_signed_up.avsc というファイルを作成します。
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
鍵の生成
メッセージの生成と消費の順序を確保するために、message-type-date として構造化されたキーを使用します (例: user-signed-up-2024-11-04)。
プロデューサー カフカ
以下は、Avro スキームを使用して Kafka にメッセージを送信する プロデューサー のコードです。
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<string byte> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<string byte> producerRecord = new ProducerRecord(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<string byte> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<genericrecord> datumWriter = new GenericDatumWriter(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } } </genericrecord></string></string></string></string>
再試行に関する考慮事項
**再試行を有効にする場合、適切に処理しないとメッセージの順序が変更されるリスクがあることに注意することが重要です。
これを回避するには:
**max.in.flight.requests.per.connection: このプロパティを 1 に設定すると、メッセージが一度に 1 つずつ送信され、順番に処理されるようになります。ただし、これはパフォーマンスに影響を与える可能性があります。
この構成と適切なエラー処理により、Kafka プロデューサがより堅牢になり、必要な順序を維持しながらメッセージ生成の障害を処理できるようになります。
**Kafka コンシューマー
**メッセージを読んで処理するコンシューマ:
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<string byte> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<string byte> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<string byte> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<genericrecord> reader = new GenericDatumReader(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<string byte> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } } </string></string></genericrecord></string></string></string>
キーの現実的な例
多くの異なるイベントと多くの異なるパーティションがある環境では、現実的なキーは次のようになります。
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
これにより、特定の日付の特定のタイプに関連するすべてのイベントを同じパーティションに送信し、順番に処理できます。さらに、必要に応じて詳細 (セッション ID やトランザクション ID など) を含めることでキーを多様化できます。
Avro を使用して Kafka で障害を処理し、メッセージの順序を確保するためのこの実装と戦略により、イベントを管理するための堅牢で効率的なシステムを構築できます。
現在は、より有能な Kafka プロデューサー兼コンシューマーです。
サーキット ブレーカー、ローカル永続性、DLQ を備えた Kafka プロデューサー。
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<string byte> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<string byte> producerRecord = new ProducerRecord(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<string byte> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<genericrecord> datumWriter = new GenericDatumWriter(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } } </genericrecord></string></string></string></string>
DLQ 管理を備えた Kafka コンシューマ。
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<string byte> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<string byte> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<string byte> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<genericrecord> reader = new GenericDatumReader(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<string byte> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } } </string></string></genericrecord></string></string></string>
user-signed-up-2024-11-04 order-created-2024-11-04 payment-processed-2024-11-04
コードの説明
サーキットブレーカー:
Resilience4j は、プロデューサーのブレーカー回路を管理するために使用されます。故障率の閾値とオープン状態での待ち時間を設定します。
ローカル永続性と DLQ:
失敗したメッセージは、ローカル ファイル (failed_messages.log) とエラー キュー (dead_letter_queue.log) の両方に保存されます。
エラー処理:
プロデューサーとコンシューマーでは、エラーが適切に処理され、ログに記録されます。
DLQ 処理:
コンシューマーは、メイン トピックからのメッセージを消費した後、DLQ に保存されたメッセージも処理します。
ロギング:
System.err および System.out メッセージは、エラーと重要なイベントを記録するために使用されます。
最終的な考慮事項
この実装では:
これにより、メッセージが復元力のある方法で送信および処理されることが保証されます。
一時的または永続的なエラーに対して適切な処理が提供されます。
ロジックにより、Dead Letter Queue を使用することで効果的な回復が可能になります。
ブレーカー回路は、長期にわたる障害が発生した場合にシステムが飽和状態になるのを防ぎます。
このアプローチにより、Kafka でのメッセージの秩序ある配信を維持しながら、物理的および論理的問題を処理できる堅牢なシステムが作成されます。
以上がKafka で配信および注文されたメッセージを取得して消費する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

javadevelopmentisnotentirelylylypratform-IndopentDuetoseveralfactors.1)jvmvariationsaffectperformanceandbehavioracrossdifferentos.2)nativeLibrariesviajniintroducePlatform-specificissues.3)giaiasystemsdifferbeTioneplateplatifflics.4)

Javaコードは、さまざまなプラットフォームで実行するときにパフォーマンスの違いがあります。 1)JVMの実装と最適化戦略は、OracleJDKやOpenJDKなどとは異なります。 2)メモリ管理やスレッドスケジューリングなどのオペレーティングシステムの特性もパフォーマンスに影響します。 3)適切なJVMを選択し、JVMパラメーターとコード最適化を調整することにより、パフォーマンスを改善できます。

java'splatformindepentedencehaslimitationsincludingporformanceoverhead、versioncompatibulisisues、changleSwithnativeLibraryIntegration、プラットフォーム固有の機能、およびjvminStallation/maintenation。

PlatformEndependEncealLowsProgramStorunonAnyPlatformWithOdification、whilecross-platformdevelopmentReadreessomeplatform-specificAdjustments.platformindependence、explifiedByjava、unableSiversAlexecutionButMayCompromperformance

jitcompalilationinjavaenhancesperformance whelemaintaining formindepence.1)itdynamicallyTrantesiNTODENATIVEMACHINECODEATRUNTIME、最適化されたコードを最適化すること、

javaispopularforsoss-platformdesktopapplicationsduetoits "writeonce、runaynay" philosophy.1)itusesbytecodatiTatrunnanyjvm-adipplatform.2)ライブラリリケンディンガンドジャヴァフククレアティック - ルルクリス

Javaでプラットフォーム固有のコードを作成する理由には、特定のオペレーティングシステム機能へのアクセス、特定のハードウェアとの対話、パフォーマンスの最適化が含まれます。 1)JNAまたはJNIを使用して、Windowsレジストリにアクセスします。 2)JNIを介してLinux固有のハードウェアドライバーと対話します。 3)金属を使用して、JNIを介してMacOSのゲームパフォーマンスを最適化します。それにもかかわらず、プラットフォーム固有のコードを書くことは、コードの移植性に影響を与え、複雑さを高め、パフォーマンスのオーバーヘッドとセキュリティのリスクをもたらす可能性があります。

Javaは、クラウドネイティブアプリケーション、マルチプラットフォームの展開、および言語間の相互運用性を通じて、プラットフォームの独立性をさらに強化します。 1)クラウドネイティブアプリケーションは、GraalvmとQuarkusを使用してスタートアップ速度を向上させます。 2)Javaは、埋め込みデバイス、モバイルデバイス、量子コンピューターに拡張されます。 3)Graalvmを通じて、JavaはPythonやJavaScriptなどの言語とシームレスに統合して、言語間の相互運用性を高めます。


ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

Video Face Swap
完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

人気の記事

ホットツール

MinGW - Minimalist GNU for Windows
このプロジェクトは osdn.net/projects/mingw に移行中です。引き続きそこでフォローしていただけます。 MinGW: GNU Compiler Collection (GCC) のネイティブ Windows ポートであり、ネイティブ Windows アプリケーションを構築するための自由に配布可能なインポート ライブラリとヘッダー ファイルであり、C99 機能をサポートする MSVC ランタイムの拡張機能が含まれています。すべての MinGW ソフトウェアは 64 ビット Windows プラットフォームで実行できます。

SAP NetWeaver Server Adapter for Eclipse
Eclipse を SAP NetWeaver アプリケーション サーバーと統合します。

Safe Exam Browser
Safe Exam Browser は、オンライン試験を安全に受験するための安全なブラウザ環境です。このソフトウェアは、あらゆるコンピュータを安全なワークステーションに変えます。あらゆるユーティリティへのアクセスを制御し、学生が無許可のリソースを使用するのを防ぎます。

mPDF
mPDF は、UTF-8 でエンコードされた HTML から PDF ファイルを生成できる PHP ライブラリです。オリジナルの作者である Ian Back は、Web サイトから「オンザフライ」で PDF ファイルを出力し、さまざまな言語を処理するために mPDF を作成しました。 HTML2FPDF などのオリジナルのスクリプトよりも遅く、Unicode フォントを使用すると生成されるファイルが大きくなりますが、CSS スタイルなどをサポートし、多くの機能強化が施されています。 RTL (アラビア語とヘブライ語) や CJK (中国語、日本語、韓国語) を含むほぼすべての言語をサポートします。ネストされたブロックレベル要素 (P、DIV など) をサポートします。

ドリームウィーバー CS6
ビジュアル Web 開発ツール

ホットトピック









