ホームページ >バックエンド開発 >Python チュートリアル >Kafka プロトコル実践ガイド
私は Apache Kafka プロトコルを低レベルでかなり使いました。公式ガイドだけに従ってこれを始めるのは簡単ではなかったので、コードを何度も読みました。この投稿では、原始的な値から意味のある要求まで段階的にガイドして、有利なスタートを切りたいと思います。
この投稿の内容:
プログラミング言語として Python を使用します。ただし、コードは依存関係がなく、選択した言語に簡単に移植できます。
Apache Kafka には、さまざまなデータ型、オプションのフィールドなどを備えた、バージョン管理されたカスタム バイナリ プロトコルがあります。残念ながら、Protobuf のようなよく知られたシリアル化形式は使用されません。プロトコルメッセージのスキーマはJSONで記述されます。シリアル化と逆シリアル化を行う実際の Java コードは、この記述から生成されます。
Java の世界では、公式のクライアント ライブラリを使用できます。ただし、別のプラットフォームを使用している場合は、サードパーティの実装に依存することになります。それらは存在しますが、主にプロデューサーとコンシューマーに焦点を当てており、管理クライアントの一部の側面にはほとんど焦点を当てていません。他に何かする必要がある場合は、ご自身で行ってください。
この投稿は、Kafka プロトコルのハッキングを開始するのに役立ちます。 (Kafka プロトコル用の準備が整った Python (逆) シリアル化ライブラリを探している場合は、Kio1 をチェックしてください。Rust については、私が取り組んでいるライブラリをご覧ください。)
この投稿のコードと、Github のこのリポジトリで同様のテストをいくつか見つけることができます。
このページで公式のプロトコルの説明を見つけることができます。少なくとも「準備編」と「プロトコル」のセクションを読んで、よく理解しておくことをお勧めします。
ここにいくつかのハイライトがあります。 Kafka プロトコルは、TCP ベースのバイナリ要求/応答プロトコルです:
各 API メッセージ タイプはリクエストとレスポンスのペアで構成され、API キーと呼ばれる数値によって識別されます。たとえば、最も特徴的な Kafka RPC である Produce と Fetch には、API キー 0 と 1 が対応しています。現在、90 近くの API メッセージ タイプがあります (クライアント ブローカーではなく、ブローカー間メッセージもあります)。
リクエストとレスポンスは、バージョン管理されたスキーマによって記述されます。バージョン管理により、フィールドの追加や削除、データ型の変更など、プロトコルの進化が可能になります。
Kafka プロトコルの使用を開始するためにできることがいくつかあります。
Kafka コードは、プロトコルに関する (実質的に) 真実の情報源です。 Github から Kafka コードをチェックアウトし、興味のあるリリース (例: 3.8.0) に切り替えます。
git clone git@github.com:apache/kafka.git git checkout 3.8.0
API メッセージ定義は、clients/src/main/resources/common/message に JSON であります。各 JSON ファイルには、1 つのメッセージ2 タイプの定義とそのすべてのバージョンが含まれています。 client/src/main/resources/common/message/README.md には、スキーマ定義形式の概要が記載されています。デフォルト値、柔軟なバージョン、タグ付きフィールドなどに注意してください。
興味のある具体的な API メッセージ タイプとは別に、clients/src/main/resources/common/message/RequestHeader.json および ResponseHeader.json を見てください。これらは、各リクエストとレスポンスの交換で使用されるヘッダーについて説明しています。 .
コードジェネレーターを実行しましょう:
./gradlew processMessages
これで、clients/src/generated/java/org/apache/kafka/common/message で生成されたクラスを見つけることができます。
clients/src/generated/java/org/apache/kafka/common/message/ApiMessageType.java を見てください。このユーティリティ:
他のファイルは、対応するスキーマ JSON から 1 対 1 で生成されます (互換性のため、データ接尾辞が付いている場合もあります)。これらのファイルには次のものが含まれます:
内部クラスはメッセージの複雑な構造を表すだけでなく、内部クラスにも注意してください。
Docker で Kafka を実行すると、ブローカーを実行してプロトコルをテストしたり、ネットワーク交換をキャプチャしたりするのに便利な方法です。バージョン 3.7.0 以降、Kafka チームは公式の Docker イメージを構築し、次のように実行できます。
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
古いバージョンに興味がある場合は、Docker Hub で他のイメージを検索してください。ただし、Kafka プロトコルには下位互換性と上位互換性があることを考慮すると、これは必要ない可能性があります。新しいブローカーは古いプロトコル バージョンを問題なく認識し、古いクライアントは新しいブローカーと通信できます。
これを読んでいる方は、おそらくお使いのマシンに Kafka コマンド ライン ツールがすでにインストールされていると思いますが、念のため、これらのツールを Docker で実行することもできます。たとえば、これを実行してトピックを作成します:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Kafka コードについて理解したところで、実際のプロトコルを見てみましょう。 Wireshark は、このような検査に広く使用されているツールです。 Kafka プロトコルを分析できます (バージョンが十分に新しい場合は、最新バージョンもサポートします)。
オペレーティング システム パッケージが古く、新しいバージョンの Kafka プロトコルを分析できないため、バージョン 4.5.0 のソースから Wireshark をビルドしました。 Wireshark 4.5.0 は主に Kafka 3.7 プロトコル バージョンをサポートする必要があります。ただし、利用可能なバージョンを試して、それがどのように機能するかを確認することはできます。
ポート 9092 キャプチャ フィルター (1) と Kafka 表示フィルター (2) を使用して、ループバック インターフェイスで Wireshark を実行してみましょう:
トピックを作成し、Wireshark が何を表示するかを確認してください:
./gradlew processMessages
表示フィルターは無関係なものをすべて削除し、Kafka のリクエストと応答のみを残します。 Wireshark はプロトコル内のほとんどのメッセージ バージョンを理解するため (もちろん Wireshark のバージョンによって異なります)、各メッセージの構造を簡単に調べることができます。 Wireshark は、対応するバイトも表示します。
Wireshark は、特定のケースでプロトコルがどのように動作するのか、実装の何が問題なのかを理解するのに役立つ優れたデバッグ ツールです。
このプロトコルでは多数のプリミティブ型が定義されており、詳しい説明はここでご覧いただけます。それらの読み取りおよび書き込みコードを実装しましょう。このファイルにはすべての関数が含まれており、対応するテスト ファイルも確認してください。
これらは既知の固定長の整数です: 1、2、4、または 8 バイト。当然のことながら、このようなフィールドはプロトコル全体でたくさん見つかります。このクラスでは、それらの読み取りと書き込みが Kafka でどのように (自明に) 実装されているかを見ることができます。
最初に、バッファから正確なバイト数を読み取る関数を定義しましょう3:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Python の BinaryIO 型ヒントは、バイトの読み取りと書き込みが可能なオブジェクトを表します。 read、write、tell (現在の位置を取得するため)、seek (位置を変更するため) などのメソッドがあります。
これで、INT8 の読み取りを実装できます:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Kafka はビッグエンディアン (別名ネットワーク) バイト順序を使用するため、byteorder="big" となります。
今書いています:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
INT16、INT32、および INT64 についてはこれを繰り返しません。唯一の大きな違いは、バイト数 (対応する 2、4、および 8) とチェックされる範囲 ([-(2**15)、2* *15 - 1]、[-(2**31)、2**31 - 1]、[-(2**63)、2**63 - 1] に相当します)。
UINT16 は INT16 に似ています:
./gradlew processMessages
ここで signed=False であることに注意してください。
BOOLEAN は本質的に INT8 に追加のロジックを加えたものです: == 0 は false を意味し、!= 0 は true を意味します。
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
MetadataRequestData 生成クラスのallowAutoTopicCreation フィールドで BOOLEAN の例を確認できます。
FLOAT64 は、倍精度 64 ビット IEEE 754 値です。 Python には、int のような to_bytes と from_bytes が float にはありません。そのため、代わりに標準ライブラリの struct モジュールを使用します。
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
>d は、「ビッグエンディアン バイト オーダーの double 値」を意味します。
可変長整数は、値が小さい場合に、値ごとに使用できるビット数を減らすアプローチです。 Kafka は、プロトコル バッファーからの varint アプローチを使用します。アイデアはシンプルです:
バリアント内の各バイトには、それに続くバイトがバリアントの一部であるかどうかを示す継続ビットがあります。これはバイトの最上位ビット (MSB) です (符号ビットとも呼ばれます)。下位 7 ビットはペイロードです。結果の整数は、その構成バイトの 7 ビット ペイロードを追加することによって構築されます。
詳細については、Protobuf 仕様と Kafka 実装 (読み取り、書き込み) を確認できます。
この型はプロトコル フィールド自体では使用されませんが、以下で説明するコンパクトなコレクションに使用されます。
実装してみましょう。自信を持っていただくために、真実のソースである Kafka の ByteUtils クラスから直接いくつかの例を取得します。
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
これを実行すると、次の結果が得られます:
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
これを、おそらく最もパフォーマンスが高いわけではありませんが、簡単な方法で実装してみましょう:
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
UUID は、エンティティを一意に識別するために使用される 128 ビットの値です。たとえば、CreateTopicsResponse.
でトピック ID を渡すために使用されます。Kafka コードでこれらがどのように読み取られ、書き込まれるかを確認できます。再現は簡単です:
def write_int8(value: int, buffer: BinaryIO) -> None: if -(2**7) <= value <= 2**7 - 1: buffer.write(value.to_bytes(1, byteorder="big", signed=True)) else: raise ValueError(f"Value {value} is out of range for INT8")
Kafka は null/None をゼロ UUID として扱うので、ここでも同じことを行っていることに注意してください。
Kafka プロトコルには 4 種類の文字列があります:
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_STRING | NULLABLE_STRING |
non-nullable | COMPACT_STRING | STRING |
Compactness は、文字列の長さが INT16 でエンコードされるか UNSIGNED_VARINT でエンコードされるかを示します。メッセージのバージョンによって異なります(2017年頃に導入されました)。 Nullability は、値が null になれるかどうかです。それはメッセージの目的とバージョンにも依存します (プロトコルの進化中に文字列フィールドがオプションになる場合があります)。
文字列はプロトコル内で広く使用されています。たとえば、生成されたクラス MetadataRequestData.MetadataRequestTopic.
のフィールド名を確認してください。文字列は非常に単純にエンコードされます。最初に長さが指定され、次に UTF-8 でエンコードされた本体が続きます。許可される最大長は 32767 バイトです。 Null 文字列の長さは -1 であり、明らかに本文はありません。
コンパクトと非コンパクトの唯一の違いは文字列の長さがエンコードされる方法であるため、両方のモードに対して 1 つの関数を使用できます。
NULL 許容文字列の読み取りと書き込みから始めましょう:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
null 非許容の文字列関数は、次のものに基づいて構築できます。
./gradlew processMessages
バイト配列は文字列と非常によく似ています。これらは同じ潜在的な null 可能性とコンパクト性を持っています:
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_BYTES | NULLABLE_BYTES |
non-nullable | COMPACT_BYTES | BYTES |
これらも同じ方法でエンコードされます: length body。当然のことながら、本文は UTF-8 文字列としてではなく、不透明なバイト配列として扱われます。バイト配列の最大長は 2147483647;
生成されたクラス JoinGroupRequestData.JoinGroupRequestProtocol のフィールド メタデータ内のバイトの例を見つけることができます。
git clone git@github.com:apache/kafka.git git checkout 3.8.0
ご覧のとおり、これらの関数と文字列に対応する関数との違いはわずかです。
このプロトコルは、バイト以外の型の配列、文字列、数値、構造体 (ただし、ネストされた配列は除く)、ARRAY および COMPACT_ARRAY をサポートします。 コンパクトさはバイト配列や文字列と同じです。
Null 可能性は、何らかの理由でプロトコル仕様に明示的に記載されていません。ただし、配列は null 値を許容することができます。これは、ここにあるように、スキーマ定義の nullableVersions によって制御されます。
read_array_length と write_array_length がすでに実装されていることを考慮して、リーダー関数とライター関数を実装しましょう。
./gradlew processMessages
RECORDS は Kafka レコードをエンコードします。この構造はかなり複雑なので、このガイドでは説明しません (ただし、ご希望の場合はコメントでお知らせください?️)。簡単にするために、レコードを NULLABLE_BYTES または COMPACT_NULLABLE_BYTES として扱うことができます (メッセージのバージョンによって異なります)。
タグ付きフィールドは、オプションのデータをメッセージに添付できるようにする Kafka プロトコルの拡張機能です。このアイデアには 2 つの要素があります:
たとえば、このフィールドを見てください。これには、このフィールドがどのバージョン以降にタグ付けされているかを示す taggedVersions があります (ほとんどの場合、フィールドが追加されたときと同じバージョンです)。
タグ付きフィールドは次のもので構成されます:
タグ付きフィールドの詳細については、KIP-482 を参照してください。
実装しましょう:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
ここでは「不明」というタイトルが付けられています。既知のフィールドはその構造内に作成する必要があります。
高レベルのメッセージ構造は非常に単純です。仕様によると:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
つまり、メッセージ自体の前にバイト単位のサイズが付加されます。要求メッセージと応答メッセージはどちらも、ヘッダーとその直後に本文が続きます。何らかの理由で、これは明示的に文書化されていません4が、信じていただけますか?またはコードを確認してください。
リクエスト ヘッダーには 0、1、2 の 3 つのバージョンが存在します。これらはプロトコルで次のように指定されます。
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
TAG_BUFFER は、前述したタグ付きフィールドです。
これらを Python データ クラスとして実装しましょう:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
ご覧のとおり、バージョン 2 にはタグ付きフィールドがいくつかありますが、予期される既知のフィールドはありません。タグ付きフィールドが誤ってブローカーに送信された場合、それは無視されます。
応答ヘッダーには 0 と 1 の 2 つのバージョンが存在します。これらはプロトコルで次のように指定されます。
./gradlew processMessages
これらも実装しましょう:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
リクエストヘッダーの読み取りとレスポンスヘッダーの書き込みは実装しません。これは簡潔にするためです。サーバー側をプログラミングしていないため、この例では応答ヘッダーの送信と要求ヘッダーの受信は行いません。ただし、サーバー側にも興味がある場合は、両方の関数を実装する必要があります (これは簡単なはずです)。
リクエストヘッダーとレスポンスヘッダーの correlation_id フィールドに特に注意してください。このプロトコルはパイプライン処理をサポートしています。クライアントは接続ごとに複数の未処理のリクエストを持つことができます。相関 ID を使用すると、リクエストに対する応答を照合できます。
どのバージョンを使用する必要があるかは、API キーとメッセージのバージョンによって決まります。現在、プロトコル ガイドには明示的に記載されていません5.
生成されたクラス ApiMessageType の requestHeaderVersion 関数と responseHeaderVersion 関数を参照として使用します。
ここまでの知識とコードをすべて理解したので、最後に ApiVersions リクエストを送信し、レスポンスを受信して読み取りましょう。 ApiVersions は通常、クライアントが送信する最初のリクエストです。その目的は、ブローカーによってサポートされている API バージョンと機能を見つけることです。最新バージョン 3 を実装します。
プロトコル仕様では、次のように定義されています。
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
データクラスを作成しましょう:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
そして応答:
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
[api_keys] は「api_keys の配列」を意味します。ここで、api_keys は以下の 2 行で定義された構造です。
これを Python データ クラスに変換します:
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
配列について話すとき、コンパクトな配列が必要か、それとも非コンパクトな配列が必要かを知る必要があります。これを確認するために、ApiVersionsRequest.json のスキーマ定義を見てみましょう。 "flexibleVersions": "3 " と表示されます。これは、コンパクト配列がバージョン 3 から使用されることを意味します (これについては、スキーマ ディレクトリの README.md を参照してください)。ここではバージョン 3 を使用しているため、コンパクトな配列を使用します。
リクエスト クラスとレスポンス クラスを実装すると、これらのリクエストを送受信できるようになります。この ApiVersions v3 の場合、v2 要求ヘッダーと v0 応答ヘッダーが必要です (生成された ApiMessageType.java を確認してください)。 API キー (18) は、ApiVersionsRequest.json またはプロトコル仕様で見つかります。
git clone git@github.com:apache/kafka.git git checkout 3.8.0
このコードを実行すると、コンソールに応答ヘッダーとメッセージが表示されます。おめでとうございます。Kafka ブローカーとの正しいネットワーク交換が実行されました!
_unknownTaggedFields に 3 つのタグ付きフィールドが配置されていることがわかります。生成された ApiVersionsResponseData クラスの読み取りメソッドと書き込みメソッド、および ApiVersionsResponse.json 内のメッセージ定義は、それらを解釈するのに役立ちます。この宿題を検討してみませんか?
私の本業では、オープンソース ライブラリ Kio を開発しました。これにより、Python から任意の Kafka API 呼び出しを簡単に実行できるようになります。シリアル化/逆シリアル化コードは、Kafka 自体と同様に、JSON プロトコル定義から生成されます。生成されたコードは、実際の Java Kafka コードに対するプロパティ テストなど、厳密にテストされます。 ↩
または、必要に応じて「メッセージ」: 一部のスキーマは API 用ではありませんが、例:ディスク上のデータの場合。 ↩
read_exact 関数には、基礎となるバッファがすでにメモリ内にあるときにデータが複製されるという欠点があります。ただし、教育目的にはその方が便利です。 ↩
これを修正するために PR を作成しました。 ↩
繰り返しになりますが、これを修正するために PR を作成しました。 ↩
以上がKafka プロトコル実践ガイドの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。