我在低级别上使用过很多 Apache Kafka 协议。仅按照官方指南开始执行此操作并不容易,而且我阅读了很多代码。通过这篇文章,我想一步步指导您从原始值到有意义的请求,为您提供一个良好的开端。
在这篇文章中:
我们将使用Python作为编程语言。但是,代码将是零依赖性的,并且可以轻松移植到您选择的语言。
Apache Kafka 有一个自定义的二进制协议,该协议是有版本的,具有各种数据类型、可选字段等。不幸的是,它没有使用像 Protobuf 这样众所周知的序列化格式。协议消息架构以 JSON 格式描述。执行序列化和反序列化的实际 Java 代码是根据此描述生成的。
当你身处Java世界时,你可以使用官方的客户端库。但如果您使用其他平台,则需要依赖第三方实现。它们存在,但主要关注生产者和消费者,很少关注管理客户端的某些方面。如果您需要做其他事情,您就得靠自己了。
这篇文章将帮助您开始破解 Kafka 协议。 (如果您正在为 Kafka 协议寻找现成的 Python(反)序列化库,请查看 Kio1。对于 Rust,请查看我正在开发的库。)
您可以在 Github 上的这个存储库中找到这篇文章中的代码以及更多类似的测试。
您可以在此页面找到官方协议说明。我鼓励您熟悉它,至少阅读“预备知识”和“协议”部分。
以下是一些亮点。 Kafka 协议是基于 TCP 的二进制请求-响应协议:
每种 API 消息类型都由请求和响应对组成,并由称为 API 密钥的数值进行标识。例如,Kafka 最具特色的 RPC 的 Produce 和 Fetch 对应的 API 密钥为 0 和 1。如今,API 消息类型接近 90 种(其中一些是经纪商间的消息类型,而不是客户经纪商间的消息类型)。
请求和响应由版本化模式描述。版本控制允许协议演变,例如添加或删除字段或更改其数据类型。
您可以执行以下一些操作来开始使用 Kafka 协议。
Kafka 代码是该协议(实际上)的真相来源。从 Github 查看 Kafka 代码并切换到您感兴趣的版本(例如 3.8.0):
git clone git@github.com:apache/kafka.git git checkout 3.8.0
您可以在clients/src/main/resources/common/message中找到JSON格式的API消息定义。每个 JSON 文件包含一条消息2 类型及其所有版本的定义。 client/src/main/resources/common/message/README.md 很好地概述了模式定义格式。注意默认值、灵活版本和标记字段等内容。
除了您感兴趣的具体API消息类型之外,还可以查看clients/src/main/resources/common/message/RequestHeader.json和ResponseHeader.json,它们描述了每个请求-响应交换中使用的标头.
让我们运行代码生成器:
./gradlew processMessages
现在你可以在clients/src/ generated/java/org/apache/kafka/common/message中找到生成的类。
看看clients/src/ generated/java/org/apache/kafka/common/message/ApiMessageType.java。该实用程序:
其他文件是从相应的架构 JSON 一对一生成的(有时带有数据后缀,这是一个兼容性问题)。在这些文件中,您会发现:
注意内部类,它们代表了消息的复杂结构。
在 Docker 中运行 Kafka 是一种让代理运行来测试协议或捕获网络交换的便捷方法。从 3.7.0 版本开始,Kafka 团队构建了官方 Docker 镜像,您可以通过以下方式运行:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
如果您对旧版本感兴趣,请在 Docker Hub 中搜索其他镜像。然而,考虑到 Kafka 协议是向后和向前兼容的,这可能是不需要的:新的代理将很好地识别旧的协议版本,并且旧的客户端可以与新的代理进行通信。
如果您阅读本文,您的计算机上可能已经安装了 Kafka 命令行工具,但为了以防万一,您也可以在 Docker 中运行它们。例如,运行此命令来创建一个主题:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
熟悉了 Kafka 代码后,让我们看看实际的协议。 Wireshark 是此类检查广泛使用的工具。它可以剖析 Kafka 协议(如果您的版本足够新,则支持最新版本)。
我从4.5.0版本的源代码构建了Wireshark,因为我的操作系统包很旧,无法用新版本解析Kafka协议。 Wireshark 4.5.0 应该主要支持 Kafka 3.7 协议版本。不过,您可以尝试可用的版本,看看它如何适合您。
让我们在环回接口上运行 Wireshark,使用端口 9092 捕获过滤器 (1) 和 kafka 显示过滤器 (2):
创建一个主题,看看 Wireshark 向我们展示了什么:
./gradlew processMessages
显示过滤器删除所有不相关的内容,只留下 Kafka 请求和响应。由于 Wireshark 可以理解协议中的大多数消息版本(当然取决于 Wireshark 版本),因此您可以方便地查看每个消息的结构。 Wireshark 也会显示相应的字节。
Wireshark 是一个很棒的调试工具,可以帮助您了解协议在特定情况下如何工作以及您的实现存在什么问题。
该协议定义了许多原始类型,您可以在此处找到完整的描述。让我们为它们实现读写代码。你可以在这个文件中找到所有的函数,也可以查看相应的测试文件。
这些是已知固定长度的整数:1、2、4 或 8 字节。当然,您可以在整个协议中找到很多这样的字段。在本课程中,您可能会(简单地)看到他们的读写是如何在 Kafka 中实现的。
我们首先定义从缓冲区读取确切字节数的函数3:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Python 中的 BinaryIO 类型提示表示一个可以从中读取字节并可以写入字节的对象。它有 read、write、tell(用于获取当前位置)、seek(用于更改位置)等方法。
现在我们可以实现读取INT8了:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Kafka 使用 big-endian(又名网络)字节排序,因此 byteorder="big"。
现在写:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
对于 INT16、INT32 和 INT64,我不会重复这一点:唯一显着的区别是字节数(分别为 2、4 和 8)和检查范围 ([-(2**15), 2* *15 - 1]、[-(2**31)、2**31 - 1] 和 [-(2**63)、2**63 - 1]相应地)。
UINT16 与 INT16 类似:
./gradlew processMessages
注意这里的signed=False。
BOOLEAN 本质上是带有额外逻辑的 INT8:== 0 表示 false,!= 0 表示 true。
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
您可以在MetadataRequestData生成的类的allowAutoTopicCreation字段中看到BOOLEAN的示例。
FLOAT64 是双精度 64 位 IEEE 754 值。 Python 不像 int 那样有用于 float 的 to_bytes 和 from_bytes 。因此,我们将使用标准库中的 struct 模块。
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
>d 表示“大端字节顺序中的双精度值”。
可变长度整数是一种允许在值较小时每个值使用较少位数的方法。 Kafka 使用 Protocol Buffers 的 Varint 方法。这个想法很简单:
varint 中的每个字节都有一个连续位,指示其后面的字节是否是 varint 的一部分。这是字节的最高有效位 (MSB)(有时也称为符号位)。低7位是有效负载;生成的整数是通过将其组成字节的 7 位有效负载附加在一起而构建的。
详细信息可以查看Protobuf规范和Kafka实现(读、写)。
此类型本身不用于协议字段,但它用于下面描述的紧凑集合。
让我们实现它。为了信心起见,我们直接从事实来源,Kafka 的 ByteUtils 类中获取一些示例:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
运行这个,我们会得到:
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
让我们以可能不是最高效但最简单的方式来实现它:
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
UUID 是用于唯一标识实体的 128 位值。例如,它们用于在 CreateTopicsResponse 中传递主题 ID。
你可以看到它们在Kafka代码中是如何读写的。重现起来很简单:
def write_int8(value: int, buffer: BinaryIO) -> None: if -(2**7) <= value <= 2**7 - 1: buffer.write(value.to_bytes(1, byteorder="big", signed=True)) else: raise ValueError(f"Value {value} is out of range for INT8")
请注意,Kafka 将 null/None 视为零 UUID,因此我们在这里也这样做。
Kafka协议有4种类型的字符串:
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_STRING | NULLABLE_STRING |
non-nullable | COMPACT_STRING | STRING |
紧凑性指示字符串长度是使用 INT16 还是使用 UNSIGNED_VARINT 编码。这取决于消息版本(2017年左右推出)。可空性是指该值是否可以为空。这也取决于消息的目的和版本(有时字符串字段在协议演变过程中变得可选)。
字符串在协议中无处不在。例如,查看生成的类MetadataRequestData.MetadataRequestTopic中的字段名称。
字符串的编码非常简单:首先是长度,然后是 UTF-8 编码的正文。允许的最大长度为 32767 字节。空字符串的长度为 -1 并且显然没有正文。
由于紧凑型和非紧凑型之间的唯一区别在于字符串长度的编码方式,因此我们可以为两种模式使用一个函数。
让我们从读取和写入可为空字符串开始:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
不可为 null 的字符串函数可以构建在这些函数之上:
./gradlew processMessages
字节数组与字符串非常相似。它们具有相同的潜在可空性和紧凑性:
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_BYTES | NULLABLE_BYTES |
non-nullable | COMPACT_BYTES | BYTES |
它们也以相同的方式编码:长度主体。当然,主体不会被视为 UTF-8 字符串,而是被视为不透明的字节数组。字节数组的最大长度为 2147483647;
您可以在生成的类JoinGroupRequestData.JoinGroupRequestProtocol中找到字段元数据中字节的示例。
git clone git@github.com:apache/kafka.git git checkout 3.8.0
如您所见,这些函数与字符串对应的函数之间的差异很小。
该协议支持字节以外类型的数组:字符串、数字、结构(但不包括嵌套数组):ARRAY 和 COMPACT_ARRAY。 紧凑性与字节数组和字符串相同。
出于某种原因,协议规范中没有明确提及可空性。但是,数组可以为空。这是由架构定义中的 nullableVersions 控制的,如下所示。
考虑到我们已经实现了 read_array_length 和 write_array_length,让我们实现 reader 和 writer 函数:
./gradlew processMessages
RECORDS 对 Kafka 记录进行编码。该结构非常复杂,我不会在本指南中描述它(但是,如果您想要它,请在评论中告诉我?️。)为了简单起见,我们可以将记录视为 NULLABLE_BYTES 或 COMPACT_NULLABLE_BYTES (取决于消息版本)。
标记字段是 Kafka 协议的扩展,它允许将可选数据附加到消息中。这个想法是双重的:
例如,看看这个字段。它有 taggedVersions,它表示从哪个版本开始标记该字段(在大多数情况下,它与添加该字段时的版本相同)。
标记字段包含:
您可以在 KIP-482 中找到有关标记字段的更多详细信息。
让我们实现:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
这里它们的标题是“未知”。已知字段需要在其结构内进行构建。
高级消息结构非常简单。根据规范:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
也就是说,它是一条消息本身,前面有其大小(以字节为单位)。请求和响应消息均由紧随其后的标头组成。由于某种原因,这没有明确记录4,但你可以相信我吗?或查看代码。
请求头存在三个版本:0、1、2。它们在协议中指定为:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
TAG_BUFFER 就是前面提到的标记字段。
让我们将它们实现为 Python 数据类:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
如您所见,版本 2 中有一些标记字段,没有预期的已知字段。如果某些标记字段被错误地发送到代理,它将被忽略。
响应头存在两个版本:0和1。它们在协议中指定为:
./gradlew processMessages
让我们也实现它们:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
我们没有实现请求标头的读取和响应标头的写入。这是为了简洁起见:在我们的示例中,我们不会发送响应标头并接收请求标头,因为我们不会对服务器端进行编程。但是,如果您也对服务器端感兴趣,则需要实现这两个功能(这应该很简单)。
特别注意请求和响应标头中的correlation_id 字段。该协议支持管道:客户端每个连接可以有多个未完成的请求。相关 ID 允许其将响应与请求进行匹配。
必须使用哪个版本是API密钥和消息版本的函数。目前协议指南中尚未明确记录5.
参考生成的类ApiMessageType中的requestHeaderVersion和responseHeaderVersion函数。
现在,掌握了所有这些知识和代码,让我们最终发送 ApiVersions 请求并接收和读取响应。 ApiVersions 通常是客户端发送的第一个请求。其目的是查找代理支持的 API 版本和功能。我们实施了最新版本 3。
在协议规范中,定义为:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
让我们创建数据类:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
回复:
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
[api_keys] 表示“api_keys 数组”,其中 api_keys 是下面两行定义的结构体。
将其转换为 Python 数据类:
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
当我们谈论数组时,我们需要知道我们是否需要紧凑数组或非紧凑数组。为了找到答案,我们来看看 ApiVersionsRequest.json 中的架构定义。可以看到 "flexibleVersions": "3 ",这意味着从版本 3 开始使用紧凑数组(更多信息请参见 schema 目录中的 README.md)。由于我们在这里使用版本 3,因此我们使用紧凑数组。
实现请求和响应类后,我们可以发送和接收这些请求。对于此 ApiVersions v3,我们需要 v2 请求标头和 v0 响应标头(检查生成的 ApiMessageType.java)。您可以在 ApiVersionsRequest.json 或协议规范中找到 API 密钥 (18)。
git clone git@github.com:apache/kafka.git git checkout 3.8.0
如果运行此代码,您将看到控制台中打印的响应标头和消息。恭喜,您已经与 Kafka Broker 进行了正确的网络交换!
您会注意到 _unknownTaggedFields 中放入了三个标记字段。生成的 ApiVersionsResponseData 类的读写方法以及 ApiVersionsResponse.json 中的消息定义将帮助您解释它们。考虑一下这个作业吗?
在我的日常工作中,我们开发了一个开源库 Kio。它允许我们轻松地从 Python 进行任意 Kafka API 调用。序列化/反序列化代码,就像 Kafka 本身一样,是根据 JSON 协议定义生成的。生成的代码经过严格测试,包括针对真实 Java Kafka 代码的属性测试。 ↩
如果您愿意,也可以使用“消息”:某些模式不适用于 API,但例如对于磁盘上的数据。 ↩
read_exact 函数有一个缺点,即当底层缓冲区已在内存中时,它会复制数据。然而,它对于教育目的来说更方便。 ↩
我做了一个 PR 来解决这个问题。 ↩
我再次发布了 PR 来解决这个问题。 ↩
以上是Kafka协议实用指南的详细内容。更多信息请关注PHP中文网其他相关文章!