Kafka メッセージ キューの基本的な実装原則
概要
Kafka は分散型です。大量のデータを高スループットかつ低遅延で処理できる、スケーラブルなメッセージ キュー システム。 Kafka は元々 LinkedIn によって開発され、現在は Apache Software Foundation のトップレベル プロジェクトです。
アーキテクチャ
Kafka は、複数のサーバーで構成される分散システムです。各サーバーはノードと呼ばれ、各ノードは独立したプロセスです。ノードはネットワークを介して接続され、クラスターを形成します。
Kafka クラスター内のデータはパーティションに保存され、各パーティションは順序付けされた不変のログ ファイルです。パーティションは、Kafka データ ストレージの基本単位であり、データ レプリケーションとフェイルオーバーのための Kafka の基本単位です。
Kafka クラスター内のデータは、プロデューサーとコンシューマーによってアクセスされます。プロデューサーは Kafka クラスターにデータを書き込み、コンシューマーは Kafka クラスターからデータを読み取ります。
データ ストレージ
Kafka のデータはパーティションに保存され、各パーティションは順序付けされた不変のログ ファイルです。パーティションは、Kafka データ ストレージの基本単位であり、データ レプリケーションとフェイルオーバーのための Kafka の基本単位です。
各パーティションには一意の ID があり、リーダー ノードと複数のレプリカ ノードで構成されます。リーダー ノードはパーティションへのデータの書き込みを担当し、レプリカ ノードはリーダー ノードからのデータのコピーを担当します。
プロデューサーが Kafka クラスターにデータを書き込むと、データはリーダー ノードに書き込まれます。リーダー ノードはデータをレプリカ ノードに複製します。コンシューマーが Kafka クラスターからデータを読み取る場合、データはレプリカ ノードから読み取られます。
データ レプリケーション
Kafka のデータ レプリケーションは、コピー メカニズムを通じて実現されます。各パーティションには、リーダー ノードと複数のレプリカ ノードがあります。リーダー ノードはパーティションへのデータの書き込みを担当し、レプリカ ノードはリーダー ノードからのデータのコピーを担当します。
リーダー ノードに障害が発生すると、レプリカ ノードの 1 つが新しいリーダー ノードになります。新しいリーダー ノードは引き続きパーティションにデータを書き込み、他のレプリカ ノードからデータをコピーします。
Kafka のデータ レプリケーション メカニズムにより、データの信頼性と可用性が保証されます。リーダー ノードに障害が発生した場合でも、データは失われず、コンシューマーは引き続き Kafka クラスターからデータを読み取ることができます。
フェイルオーバー
Kafka のフェイルオーバーは、レプリカ メカニズムを通じて実装されます。リーダー ノードに障害が発生すると、レプリカ ノードの 1 つが新しいリーダー ノードになります。新しいリーダー ノードは引き続きパーティションにデータを書き込み、他のレプリカ ノードからデータをコピーします。
Kafka のフェイルオーバー メカニズムにより、データの信頼性と可用性が確保されます。リーダー ノードに障害が発生した場合でも、データは失われず、コンシューマーは引き続き Kafka クラスターからデータを読み取ることができます。
プロデューサー
プロデューサーは、Kafka クラスターにデータを書き込むクライアントです。プロデューサは、Java アプリケーション、Python アプリケーション、C アプリケーションなど、HTTP リクエストを送信できる任意のクライアントにすることができます。
プロデューサーが Kafka クラスターにデータを書き込むときは、書き込むパーティションを指定する必要があります。プロデューサーは、特定のパーティションにデータを書き込むか、ランダムなパーティションにデータを書き込むかを選択できます。
プロデューサーは、データのメッセージ キーとメッセージ値を指定することもできます。メッセージ キーはメッセージを一意に識別するために使用され、メッセージ値はメッセージの実際の内容です。
コンシューマ
コンシューマは、Kafka クラスターからデータを読み取るクライアントです。コンシューマには、Java アプリケーション、Python アプリケーション、C アプリケーションなど、HTTP リクエストを受信できる任意のクライアントを指定できます。
コンシューマーが Kafka クラスターからデータを読み取るときは、読み取るパーティションを指定する必要があります。コンシューマーは、特定のパーティションからデータを読み取るか、すべてのパーティションからデータを読み取るかを選択できます。
消費者は、読み取るオフセットを指定することもできます。オフセットは、パーティション内のメッセージを一意に識別するために使用されます。コンシューマは、特定のオフセットからデータの読み取りを開始するか、最新のオフセットからデータの読み取りを開始するかを選択できます。
アプリケーション シナリオ
Kafka は、次のようなさまざまなアプリケーション シナリオで使用できます。
コード例
次は、Java で書かれた Kafka プロデューサの例です。
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { // Create a Kafka producer Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // Create a Kafka record ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "hello, world"); // Send the record to Kafka producer.send(record); // Close the producer producer.close(); } }
次は、Java で書かれた例です。 Java Kafka コンシューマの例:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { // Create a Kafka consumer Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // Subscribe to a topic consumer.subscribe(Collections.singletonList("my-topic")); // Poll for new records while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key() + ": " + record.value()); } } // Close the consumer consumer.close(); } }
以上がKafka メッセージ キューの基礎となる実装メカニズムについての深い理解の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。