Kafka プロトコル実践ガイド

Susan Sarandon
Susan Sarandonオリジナル
2024-12-28 17:11:14396ブラウズ

私は Apache Kafka プロトコルを低レベルでかなり使いました。公式ガイドだけに従ってこれを始めるのは簡単ではなかったので、コードを何度も読みました。この投稿では、原始的な値から意味のある要求まで段階的にガイドして、有利なスタートを切りたいと思います。

この投稿の内容:

  1. Kafka プロトコル コードと、Wireshark で動作するプロトコルを調べます。
  2. プリミティブ値の読み取りと書き込み方法を学びます。
  3. プリミティブを組み合わせて意味のあるリクエストを実行します。

プログラミング言語として Python を使用します。ただし、コードは依存関係がなく、選択した言語に簡単に移植できます。

イントロ

Apache Kafka には、さまざまなデータ型、オプションのフィールドなどを備えた、バージョン管理されたカスタム バイナリ プロトコルがあります。残念ながら、Protobuf のようなよく知られたシリアル化形式は使用されません。プロトコルメッセージのスキーマはJSONで記述されます。シリアル化と逆シリアル化を行う実際の Java コードは、この記述から生成されます。

Java の世界では、公式のクライアント ライブラリを使用できます。ただし、別のプラットフォームを使用している場合は、サードパーティの実装に依存することになります。それらは存在しますが、主にプロデューサーとコンシューマーに焦点を当てており、管理クライアントの一部の側面にはほとんど焦点を当てていません。他に何かする必要がある場合は、ご自身で行ってください。

この投稿は、Kafka プロトコルのハッキングを開始するのに役立ちます。 (Kafka プロトコル用の準備が整った Python (逆) シリアル化ライブラリを探している場合は、Kio1 をチェックしてください。Rust については、私が取り組んでいるライブラリをご覧ください。)

この投稿のコードと、Github のこのリポジトリで同様のテストをいくつか見つけることができます。

プロトコルの概要

このページで公式のプロトコルの説明を見つけることができます。少なくとも「準備編」と「プロトコル」のセクションを読んで、よく理解しておくことをお勧めします。

ここにいくつかのハイライトがあります。 Kafka プロトコルは、TCP ベースのバイナリ要求/応答プロトコルです:

  • TCP ベース: Kafka ブローカーは TCP スタック上のポートでリッスンします (これにより、順序付けの保証などの利点が得られます)。
  • バイナリ: メッセージはバイナリ形式でエンコードされ、事前定義されたスキーマに従って特別なシリアル化と逆シリアル化が必要です。
  • リクエスト-レスポンス: 交換はクライアントによって開始され、サーバーは受動的でリクエストにのみ応答します。

各 API メッセージ タイプはリクエストとレスポンスのペアで構成され、API キーと呼ばれる数値によって識別されます。たとえば、最も特徴的な Kafka RPC である Produce と Fetch には、API キー 0 と 1 が対応しています。現在、90 近くの API メッセージ タイプがあります (クライアント ブローカーではなく、ブローカー間メッセージもあります)。

リクエストとレスポンスは、バージョン管理されたスキーマによって記述されます。バージョン管理により、フィールドの追加や削除、データ型の変更など、プロトコルの進化が可能になります。

最初のステップ

Kafka プロトコルの使用を開始するためにできることがいくつかあります。

Kafka プロトコル コードを学習する

Kafka コードは、プロトコルに関する (実質的に) 真実の情報源です。 Github から Kafka コードをチェックアウトし、興味のあるリリース (例: 3.8.0) に切り替えます。

git clone git@github.com:apache/kafka.git
git checkout 3.8.0

API メッセージ定義は、clients/src/main/resources/common/message に JSON であります。各 JSON ファイルには、1 つのメッセージ2 タイプの定義とそのすべてのバージョンが含まれています。 client/src/main/resources/common/message/README.md には、スキーマ定義形式の概要が記載されています。デフォルト値、柔軟なバージョン、タグ付きフィールドなどに注意してください。

興味のある具体的な API メッセージ タイプとは別に、clients/src/main/resources/common/message/RequestHeader.json および ResponseHeader.json を見てください。これらは、各リクエストとレスポンスの交換で使用されるヘッダーについて説明しています。 .

コードジェネレーターを実行しましょう:

./gradlew processMessages

これで、clients/src/generated/java/org/apache/kafka/common/message で生成されたクラスを見つけることができます。

clients/src/generated/java/org/apache/kafka/common/message/ApiMessageType.java を見てください。このユーティリティ:

  • 既存の API メッセージ タイプのセット全体とそのスキーマおよびバージョンについて説明します。
  • requestHeaderVersion関数とresponseHeaderVersion関数で、APIメッセージのバージョンをリクエストヘッダーとレスポンスヘッダーのバージョンにマッピングします。

他のファイルは、対応するスキーマ JSON から 1 対 1 で生成されます (互換性のため、データ接尾辞が付いている場合もあります)。これらのファイルには次のものが含まれます:

  1. バージョン管理されたスキーマ定義 SCHEMA_0、SCHEMA_1 など。スキーマがバージョン間で同じままである場合があります。これは正常であり、要求と応答の相手のみが変更されたことを意味します。
  2. プロトコルのシリアル化と逆シリアル化のグラウンド トゥルースを見つけることができる読み取りおよび書き込みメソッド。

内部クラスはメッセージの複雑な構造を表すだけでなく、内部クラスにも注意してください。

Docker で Kafka を実行する

Docker で Kafka を実行すると、ブローカーを実行してプロトコルをテストしたり、ネットワーク交換をキャプチャしたりするのに便利な方法です。バージョン 3.7.0 以降、Kafka チームは公式の Docker イメージを構築し、次のように実行できます。

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0

古いバージョンに興味がある場合は、Docker Hub で他のイメージを検索してください。ただし、Kafka プロトコルには下位互換性と上位互換性があることを考慮すると、これは必要ない可能性があります。新しいブローカーは古いプロトコル バージョンを問題なく認識し、古いクライアントは新しいブローカーと通信できます。

これを読んでいる方は、おそらくお使いのマシンに Kafka コマンド ライン ツールがすでにインストールされていると思いますが、念のため、これらのツールを Docker で実行することもできます。たとえば、これを実行してトピックを作成します:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0

Wireshark でプロトコルを検査する

Kafka コードについて理解したところで、実際のプロトコルを見てみましょう。 Wireshark は、このような検査に広く使用されているツールです。 Kafka プロトコルを分析できます (バージョンが十分に新しい場合は、最新バージョンもサポートします)。

オペレーティング システム パッケージが古く、新しいバージョンの Kafka プロトコルを分析できないため、バージョン 4.5.0 のソースから Wireshark をビルドしました。 Wireshark 4.5.0 は主に Kafka 3.7 プロトコル バージョンをサポートする必要があります。ただし、利用可能なバージョンを試して、それがどのように機能するかを確認することはできます。

ポート 9092 キャプチャ フィルター (1) と Kafka 表示フィルター (2) を使用して、ループバック インターフェイスで Wireshark を実行してみましょう:

Kafka protocol practical guide

トピックを作成し、Wireshark が何を表示するかを確認してください:

./gradlew processMessages

Kafka protocol practical guide

表示フィルターは無関係なものをすべて削除し、Kafka のリクエストと応答のみを残します。 Wireshark はプロトコル内のほとんどのメッセージ バージョンを理解するため (もちろん Wireshark のバージョンによって異なります)、各メッセージの構造を簡単に調べることができます。 Wireshark は、対応するバイトも表示します。

Wireshark は、特定のケースでプロトコルがどのように動作するのか、実装の何が問題なのかを理解するのに役立つ優れたデバッグ ツールです。

プリミティブ値の読み取りと書き込み

このプロトコルでは多数のプリミティブ型が定義されており、詳しい説明はここでご覧いただけます。それらの読み取りおよび書き込みコードを実装しましょう。このファイルにはすべての関数が含まれており、対応するテスト ファイルも確認してください。

固定長整数値: INT8、INT16、INT32、INT64、および UINT16

これらは既知の固定長の整数です: 1、2、4、または 8 バイト。当然のことながら、このようなフィールドはプロトコル全体でたくさん見つかります。このクラスでは、それらの読み取りと書き込みが Kafka でどのように (自明に) 実装されているかを見ることができます。

最初に、バッファから正確なバイト数を読み取る関数を定義しましょう3:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0

Python の BinaryIO 型ヒントは、バイトの読み取りと書き込みが可能なオブジェクトを表します。 read、write、tell (現在の位置を取得するため)、seek (位置を変更するため) などのメソッドがあります。

これで、INT8 の読み取りを実装できます:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2

Kafka はビッグエンディアン (別名ネットワーク) バイト順序を使用するため、byteorder="big" となります。

今書いています:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0

INT16、INT32、および INT64 についてはこれを繰り返しません。唯一の大きな違いは、バイト数 (対応する 2、4、および 8) とチェックされる範囲 ([-(2**15)、2* *15 - 1]、[-(2**31)、2**31 - 1]、[-(2**63)、2**63 - 1] に相当します)。

UINT16 は INT16 に似ています:

./gradlew processMessages

ここで signed=False であることに注意してください。

ブール値

BOOLEAN は本質的に INT8 に追加のロジックを加えたものです: == 0 は false を意味し、!= 0 は true を意味します。

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0

MetadataRequestData 生成クラスのallowAutoTopicCreation フィールドで BOOLEAN の例を確認できます。

FLOAT64

FLOAT64 は、倍精度 64 ビット IEEE 754 値です。 Python には、int のような to_bytes と from_bytes が float にはありません。そのため、代わりに標準ライブラリの struct モジュールを使用します。

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2

>d は、「ビッグエンディアン バイト オーダーの double 値」を意味します。

UNSIGNED_VARINT: 可変長整数値

可変長整数は、値が小さい場合に、値ごとに使用できるビット数を減らすアプローチです。 Kafka は、プロトコル バッファーからの varint アプローチを使用します。アイデアはシンプルです:

バリアント内の各バイトには、それに続くバイトがバリアントの一部であるかどうかを示す継続ビットがあります。これはバイトの最上位ビット (MSB) です (符号ビットとも呼ばれます)。下位 7 ビットはペイロードです。結果の整数は、その構成バイトの 7 ビット ペイロードを追加することによって構築されます。

詳細については、Protobuf 仕様と Kafka 実装 (読み取り、書き込み) を確認できます。

この型はプロトコル フィールド自体では使用されませんが、以下で説明するコンパクトなコレクションに使用されます。

実装してみましょう。自信を持っていただくために、真実のソースである Kafka の ByteUtils クラスから直接いくつかの例を取得します。

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2

これを実行すると、次の結果が得られます:

def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes:
    value = buffer.read(num_bytes)
    if len(value) != num_bytes:
        raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}")
    return value

これを、おそらく最もパフォーマンスが高いわけではありませんが、簡単な方法で実装してみましょう:

def read_int8(buffer: BinaryIO) -> int:
    return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)

UUID

UUID は、エンティティを一意に識別するために使用される 128 ビットの値です。たとえば、CreateTopicsResponse.

でトピック ID を渡すために使用されます。

Kafka コードでこれらがどのように読み取られ、書き込まれるかを確認できます。再現は簡単です:

def write_int8(value: int, buffer: BinaryIO) -> None:
    if -(2**7) <= value <= 2**7 - 1:
        buffer.write(value.to_bytes(1, byteorder="big", signed=True))
    else:
        raise ValueError(f"Value {value} is out of range for INT8")

Kafka は null/None をゼロ UUID として扱うので、ここでも同じことを行っていることに注意してください。

文字列

Kafka プロトコルには 4 種類の文字列があります:

compact non-compact
nullable COMPACT_NULLABLE_STRING NULLABLE_STRING
non-nullable COMPACT_STRING STRING

Compactness は、文字列の長さが INT16 でエンコードされるか UNSIGNED_VARINT でエンコードされるかを示します。メッセージのバージョンによって異なります(2017年頃に導入されました)。 Nullability は、値が null になれるかどうかです。それはメッセージの目的とバージョンにも依存します (プロトコルの進化中に文字列フィールドがオプションになる場合があります)。

文字列はプロトコル内で広く使用されています。たとえば、生成されたクラス MetadataRequestData.MetadataRequestTopic.

のフィールド名を確認してください。

文字列は非常に単純にエンコードされます。最初に長さが指定され、次に UTF-8 でエンコードされた本体が続きます。許可される最大長は 32767 バイトです。 Null 文字列の長さは -1 であり、明らかに本文はありません。

コンパクトと非コンパクトの唯一の違いは文字列の長さがエンコードされる方法であるため、両方のモードに対して 1 つの関数を使用できます。

NULL 許容文字列の読み取りと書き込みから始めましょう:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0

null 非許容の文字列関数は、次のものに基づいて構築できます。

./gradlew processMessages

バイト配列

バイト配列は文字列と非常によく似ています。これらは同じ潜在的な null 可能性とコンパクト性を持っています:

compact non-compact
nullable COMPACT_NULLABLE_BYTES NULLABLE_BYTES
non-nullable COMPACT_BYTES BYTES

これらも同じ方法でエンコードされます: length body。当然のことながら、本文は UTF-8 文字列としてではなく、不透明なバイト配列として扱われます。バイト配列の最大長は 2147483647;

生成されたクラス JoinGroupRequestData.JoinGroupRequestProtocol のフィールド メタデータ内のバイトの例を見つけることができます。

git clone git@github.com:apache/kafka.git
git checkout 3.8.0

ご覧のとおり、これらの関数と文字列に対応する関数との違いはわずかです。

その他のアレイ

このプロトコルは、バイト以外の型の配列、文字列、数値、構造体 (ただし、ネストされた配列は除く)、ARRAY および COMPACT_ARRAY をサポートします。 コンパクトさはバイト配列や文字列と同じです。

Null 可能性は、何らかの理由でプロトコル仕様に明示的に記載されていません。ただし、配列は null 値を許容することができます。これは、ここにあるように、スキーマ定義の nullableVersions によって制御されます。

read_array_length と write_array_length がすでに実装されていることを考慮して、リーダー関数とライター関数を実装しましょう。

./gradlew processMessages

記録

RECORDS は Kafka レコードをエンコードします。この構造はかなり複雑なので、このガイドでは説明しません (ただし、ご希望の場合はコメントでお知らせください?️)。簡単にするために、レコードを NULLABLE_BYTES または COMPACT_NULLABLE_BYTES として扱うことができます (メッセージのバージョンによって異なります)。

タグ付きフィールド

タグ付きフィールドは、オプションのデータをメッセージに添付できるようにする Kafka プロトコルの拡張機能です。このアイデアには 2 つの要素があります:

  1. サービスのクライアントがタグ付きフィールドを理解できない場合、それを不明として保存し、無視します。
  2. フィールドがほとんど使用されない場合、そのデフォルト値は転送からスキップできます。

たとえば、このフィールドを見てください。これには、このフィールドがどのバージョン以降にタグ付けされているかを示す taggedVersions があります (ほとんどの場合、フィールドが追加されたときと同じバージョンです)。

タグ付きフィールドは次のもので構成されます:

  1. UNSIGNED_VARINT 型のタグ。
  2. COMPACT_BYTES 型のデータ。

タグ付きフィールドの詳細については、KIP-482 を参照してください。

実装しましょう:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0

ここでは「不明」というタイトルが付けられています。既知のフィールドはその構造内に作成する必要があります。

メッセージ構造

高レベルのメッセージ構造は非常に単純です。仕様によると:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2

つまり、メッセージ自体の前にバイト単位のサイズが付加されます。要求メッセージと応答メッセージはどちらも、ヘッダーとその直後に本文が続きます。何らかの理由で、これは明示的に文書化されていません4が、信じていただけますか?またはコードを確認してください。

リクエストヘッダーとレスポンスヘッダー

リクエスト ヘッダーには 0、1、2 の 3 つのバージョンが存在します。これらはプロトコルで次のように指定されます。

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2

TAG_BUFFER は、前述したタグ付きフィールドです。

これらを Python データ クラスとして実装しましょう:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0

ご覧のとおり、バージョン 2 にはタグ付きフィールドがいくつかありますが、予期される既知のフィールドはありません。タグ付きフィールドが誤ってブローカーに送信された場合、それは無視されます。

応答ヘッダーには 0 と 1 の 2 つのバージョンが存在します。これらはプロトコルで次のように指定されます。

./gradlew processMessages

これらも実装しましょう:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0

リクエストヘッダーの読み取りとレスポンスヘッダーの書き込みは実装しません。これは簡潔にするためです。サーバー側をプログラミングしていないため、この例では応答ヘッダーの送信と要求ヘッダーの受信は行いません。ただし、サーバー側にも興味がある場合は、両方の関数を実装する必要があります (これは簡単なはずです)。

相関ID

リクエストヘッダーとレスポンスヘッダーの correlation_id フィールドに特に注意してください。このプロトコルはパイプライン処理をサポートしています。クライアントは接続ごとに複数の未処理のリクエストを持つことができます。相関 ID を使用すると、リクエストに対する応答を照合できます。

ヘッダーのバージョンの選択

どのバージョンを使用する必要があるかは、API キーとメッセージのバージョンによって決まります。現在、プロトコル ガイドには明示的に記載されていません5.
生成されたクラス ApiMessageType の requestHeaderVersion 関数と responseHeaderVersion 関数を参照として使用します。

リクエストの送信とレスポンスの受信

ここまでの知識とコードをすべて理解したので、最後に ApiVersions リクエストを送信し、レスポンスを受信して​​読み取りましょう。 ApiVersions は通常、クライアントが送信する最初のリクエストです。その目的は、ブローカーによってサポートされている API バージョンと機能を見つけることです。最新バージョン 3 を実装します。

プロトコル仕様では、次のように定義されています。

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2

データクラスを作成しましょう:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2

そして応答:

def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes:
    value = buffer.read(num_bytes)
    if len(value) != num_bytes:
        raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}")
    return value

[api_keys] は「api_keys の配列」を意味します。ここで、api_keys は以下の 2 行で定義された構造です。

これを Python データ クラスに変換します:

def read_int8(buffer: BinaryIO) -> int:
    return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)

配列について話すとき、コンパクトな配列が必要か、それとも非コンパクトな配列が必要かを知る必要があります。これを確認するために、ApiVersionsRequest.json のスキーマ定義を見てみましょう。 "flexibleVersions": "3 " と表示されます。これは、コンパクト配列がバージョン 3 から使用されることを意味します (これについては、スキーマ ディレクトリの README.md を参照してください)。ここではバージョン 3 を使用しているため、コンパクトな配列を使用します。

リクエスト クラスとレスポンス クラスを実装すると、これらのリクエストを送受信できるようになります。この ApiVersions v3 の場合、v2 要求ヘッダーと v0 応答ヘッダーが必要です (生成された ApiMessageType.java を確認してください)。 API キー (18) は、ApiVersionsRequest.json またはプロトコル仕様で見つかります。

git clone git@github.com:apache/kafka.git
git checkout 3.8.0

このコードを実行すると、コンソールに応答ヘッダーとメッセージが表示されます。おめでとうございます。Kafka ブローカーとの正しいネットワーク交換が実行されました!

_unknownTaggedFields に 3 つのタグ付きフィールドが配置されていることがわかります。生成された ApiVersionsResponseData クラスの読み取りメソッドと書き込みメソッド、および ApiVersionsResponse.json 内のメッセージ定義は、それらを解釈するのに役立ちます。この宿題を検討してみませんか?


  1. 私の本業では、オープンソース ライブラリ Kio を開発しました。これにより、Python から任意の Kafka API 呼び出しを簡単に実行できるようになります。シリアル化/逆シリアル化コードは、Kafka 自体と同様に、JSON プロトコル定義から生成されます。生成されたコードは、実際の Java Kafka コードに対するプロパティ テストなど、厳密にテストされます。 ↩

  2. または、必要に応じて「メッセージ」: 一部のスキーマは API 用ではありませんが、例:ディスク上のデータの場合。 ↩

  3. read_exact 関数には、基礎となるバッファがすでにメモリ内にあるときにデータが複製されるという欠点があります。ただし、教育目的にはその方が便利です。 ↩

  4. これを修正するために PR を作成しました。 ↩

  5. 繰り返しになりますが、これを修正するために PR を作成しました。 ↩

以上がKafka プロトコル実践ガイドの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。