搜索
首页后端开发Python教程Kafka协议实用指南

我在低级别上使用过很多 Apache Kafka 协议。仅按照官方指南开始执行此操作并不容易,而且我阅读了很多代码。通过这篇文章,我想一步步指导您从原始值到有意义的请求,为您提供一个良好的开端。

在这篇文章中:

  1. 探索 Kafka 协议代码以及使用 Wireshark 运行的协议。
  2. 学习如何读取和写入原始值。
  3. 组合基元来执行有意义的请求。

我们将使用Python作为编程语言。但是,代码将是零依赖性的,并且可以轻松移植到您选择的语言。

简介

Apache Kafka 有一个自定义的二进制协议,该协议是有版本的,具有各种数据类型、可选字段等。不幸的是,它没有使用像 Protobuf 这样众所周知的序列化格式。协议消息架构以 JSON 格式描述。执行序列化和反序列化的实际 Java 代码是根据此描述生成的。

当你身处Java世界时,你可以使用官方的客户端库。但如果您使用其他平台,则需要依赖第三方实现。它们存在,但主要关注生产者和消费者,很少关注管理客户端的某些方面。如果您需要做其他事情,您就得靠自己了。

这篇文章将帮助您开始破解 Kafka 协议。 (如果您正在为 Kafka 协议寻找现成的 Python(反)序列化库,请查看 Kio1。对于 Rust,请查看我正在开发的库。)

您可以在 Github 上的这个存储库中找到这篇文章中的代码以及更多类似的测试。

协议概述

您可以在此页面找到官方协议说明。我鼓励您熟悉它,至少阅读“预备知识”和“协议”部分。

以下是一些亮点。 Kafka 协议是基于 TCP 的二进制请求-响应协议:

  • 基于 TCP:Kafka 代理侦听 TCP 堆栈上的端口(这提供了一些好处,例如排序保证)。
  • 二进制:消息以二进制形式编码,需要根据预定义的模式进行特殊的序列化和反序列化。
  • 请求-响应:交换由客户端发起,服务器端是被动的,只回复请求。

每种 API 消息类型都由请求和响应对组成,并由称为 API 密钥的数值进行标识。例如,Kafka 最具特色的 RPC 的 Produce 和 Fetch 对应的 API 密钥为 0 和 1。如今,API 消息类型接近 90 种(其中一些是经纪商间的消息类型,而不是客户经纪商间的消息类型)。

请求和响应由版本化模式描述。版本控制允许协议演变,例如添加或删除字段或更改其数据类型。

第一步

您可以执行以下一些操作来开始使用 Kafka 协议。

学习Kafka协议代码

Kafka 代码是该协议(实际上)的真相来源。从 Github 查看 Kafka 代码并切换到您感兴趣的版本(例如 3.8.0):

git clone git@github.com:apache/kafka.git
git checkout 3.8.0

您可以在clients/src/main/resources/common/message中找到JSON格式的API消息定义。每个 JSON 文件包含一条消息2 类型及其所有版本的定义。 client/src/main/resources/common/message/README.md 很好地概述了模式定义格式。注意默认值、灵活版本和标记字段等内容。

除了您感兴趣的具体API消息类型之外,还可以查看clients/src/main/resources/common/message/RequestHeader.json和ResponseHeader.json,它们描述了每个请求-响应交换中使用的标头.

让我们运行代码生成器:

./gradlew processMessages

现在你可以在clients/src/ generated/java/org/apache/kafka/common/message中找到生成的类。

看看clients/src/ generated/java/org/apache/kafka/common/message/ApiMessageType.java。该实用程序:

  • 描述了整套现有的 API 消息类型及其架构和版本;
  • 将API消息版本映射到requestHeaderVersion和responseHeaderVersion函数中的请求和响应标头版本。

其他文件是从相应的架构 JSON 一对一生成的(有时带有数据后缀,这是一个兼容性问题)。在这些文件中,您会发现:

  1. 版本化模式定义 SCHEMA_0、SCHEMA_1 等。有时模式在版本之间保持相同。这是正常的,意味着只有请求-响应对应部分发生了变化。
  2. 读取和写入方法,您可以在其中找到协议序列化和反序列化的基本事实。

注意内部类,它们代表了消息的复杂结构。

在 Docker 中运行 Kafka

在 Docker 中运行 Kafka 是一种让代理运行来测试协议或捕获网络交换的便捷方法。从 3.7.0 版本开始,Kafka 团队构建了官方 Docker 镜像,您可以通过以下方式运行:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0

如果您对旧版本感兴趣,请在 Docker Hub 中搜索其他镜像。然而,考虑到 Kafka 协议是向后和向前兼容的,这可能是不需要的:新的代理将很好地识别旧的协议版本,并且旧的客户端可以与新的代理进行通信。

如果您阅读本文,您的计算机上可能已经安装了 Kafka 命令行工具,但为了以防万一,您也可以在 Docker 中运行它们。例如,运行此命令来创建一个主题:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0

使用 Wireshark 检查协议

熟悉了 Kafka 代码后,让我们看看实际的协议。 Wireshark 是此类检查广泛使用的工具。它可以剖析 Kafka 协议(如果您的版本足够新,则支持最新版本)。

我从4.5.0版本的源代码构建了Wireshark,因为我的操作系统包很旧,无法用新版本解析Kafka协议。 Wireshark 4.5.0 应该主要支持 Kafka 3.7 协议版本。不过,您可以尝试可用的版本,看看它如何适合您。

让我们在环回接口上运行 Wireshark,使用端口 9092 捕获过滤器 (1) 和 kafka 显示过滤器 (2):

Kafka protocol practical guide

创建一个主题,看看 Wireshark 向我们展示了什么:

./gradlew processMessages

Kafka protocol practical guide

显示过滤器删除所有不相关的内容,只留下 Kafka 请求和响应。由于 Wireshark 可以理解协议中的大多数消息版本(当然取决于 Wireshark 版本),因此您可以方便地查看每个消息的结构。 Wireshark 也会显示相应的字节。

Wireshark 是一个很棒的调试工具,可以帮助您了解协议在特定情况下如何工作以及您的实现存在什么问题。

读取和写入原始值

该协议定义了许多原始类型,您可以在此处找到完整的描述。让我们为它们实现读写代码。你可以在这个文件中找到所有的函数,也可以查看相应的测试文件。

固定长度整数值:INT8、INT16、INT32、INT64 和 UINT16

这些是已知固定长度的整数:1、2、4 或 8 字节。当然,您可以在整个协议中找到很多这样的字段。在本课程中,您可能会(简单地)看到他们的读写是如何在 Kafka 中实现的。

我们首先定义从缓冲区读取确切字节数的函数3:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0

Python 中的 BinaryIO 类型提示表示一个可以从中读取字节并可以写入字节的对象。它有 read、write、tell(用于获取当前位置)、seek(用于更改位置)等方法。

现在我们可以实现读取INT8了:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2

Kafka 使用 big-endian(又名网络)字节排序,因此 byteorder="big"。

现在写:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0

对于 INT16、INT32 和 INT64,我不会重复这一点:唯一显着的区别是字节数(分别为 2、4 和 8)和检查范围 ([-(2**15), 2* *15 - 1]、[-(2**31)、2**31 - 1] 和 [-(2**63)、2**63 - 1]相应地)。

UINT16 与 INT16 类似:

./gradlew processMessages

注意这里的signed=False。

布尔值

BOOLEAN 本质上是带有额外逻辑的 INT8:== 0 表示 false,!= 0 表示 true。

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0

您可以在MetadataRequestData生成的类的allowAutoTopicCreation字段中看到BOOLEAN的示例。

浮点数64

FLOAT64 是双精度 64 位 IEEE 754 值。 Python 不像 int 那样有用于 float 的 to_bytes 和 from_bytes 。因此,我们将使用标准库中的 struct 模块。

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2

>d 表示“大端字节顺序中的双精度值”。

UNSIGNED_VARINT:可变长度整数值

可变长度整数是一种允许在值较小时每个值使用较少位数的方法。 Kafka 使用 Protocol Buffers 的 Varint 方法。这个想法很简单:

varint 中的每个字节都有一个连续位,指示其后面的字节是否是 varint 的一部分。这是字节的最高有效位 (MSB)(有时也称为符号位)。低7位是有效负载;生成的整数是通过将其组成字节的 7 位有效负载附加在一起而构建的。

详细信息可以查看Protobuf规范和Kafka实现(读、写)。

此类型本身不用于协议字段,但它用于下面描述的紧凑集合。

让我们实现它。为了信心起见,我们直接从事实来源,Kafka 的 ByteUtils 类中获取一些示例:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2

运行这个,我们会得到:

def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes:
    value = buffer.read(num_bytes)
    if len(value) != num_bytes:
        raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}")
    return value

让我们以可能不是最高效但最简单的方式来实现它:

def read_int8(buffer: BinaryIO) -> int:
    return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)

通用唯一识别码

UUID 是用于唯一标识实体的 128 位值。例如,它们用于在 CreateTopicsResponse 中传递主题 ID。

你可以看到它们在Kafka代码中是如何读写的。重现起来很简单:

def write_int8(value: int, buffer: BinaryIO) -> None:
    if -(2**7) 



<p>请注意,Kafka 将 null/None 视为零 UUID,因此我们在这里也这样做。</p>

<h3>
  
  
  弦乐
</h3>

<p>Kafka协议有4种类型的字符串:</p>

<div><table>
<thead>
<tr>
<th></th>
<th><strong>compact</strong></th>
<th><strong>non-compact</strong></th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>nullable</strong></td>
<td>COMPACT_NULLABLE_STRING</td>
<td>NULLABLE_STRING</td>
</tr>
<tr>
<td><strong>non-nullable</strong></td>
<td>COMPACT_STRING</td>
<td>STRING</td>
</tr>
</tbody>
</table></div>

<p>紧凑性指示字符串长度是使用 INT16 还是使用 UNSIGNED_VARINT 编码。这取决于消息版本(2017年左右推出)。可空性是指该值是否可以为空。这也取决于消息的目的和版本(有时字符串字段在协议演变过程中变得可选)。</p>

<p>字符串在协议中无处不在。例如,查看生成的类MetadataRequestData.MetadataRequestTopic中的字段名称。</p>

<p>字符串的编码非常简单:首先是长度,然后是 UTF-8 编码的正文。允许的最大长度为 32767 字节。空字符串的长度为 -1 并且显然没有正文。</p>

<p>由于紧凑型和非紧凑型之间的唯一区别在于字符串长度的编码方式,因此我们可以为两种模式使用一个函数。</p>

<p>让我们从读取和写入可为空字符串开始:<br>
</p>

<pre class="brush:php;toolbar:false">git clone git@github.com:apache/kafka.git
git checkout 3.8.0

不可为 null 的字符串函数可以构建在这些函数之上:

./gradlew processMessages

字节数组

字节数组与字符串非常相似。它们具有相同的潜在可空性和紧凑性:

compact non-compact
nullable COMPACT_NULLABLE_BYTES NULLABLE_BYTES
non-nullable COMPACT_BYTES BYTES

它们也以相同的方式编码:长度主体。当然,主体不会被视为 UTF-8 字符串,而是被视为不透明的字节数组。字节数组的最大长度为 2147483647;

您可以在生成的类JoinGroupRequestData.JoinGroupRequestProtocol中找到字段元数据中字节的示例。

git clone git@github.com:apache/kafka.git
git checkout 3.8.0

如您所见,这些函数与字符串对应的函数之间的差异很小。

其他阵列

该协议支持字节以外类型的数组:字符串、数字、结构(但不包括嵌套数组):ARRAY 和 COMPACT_ARRAY。 紧凑性与字节数组和字符串相同。

出于某种原因,协议规范中没有明确提及可空性。但是,数组可以为空。这是由架构定义中的 nullableVersions 控制的,如下所示。

考虑到我们已经实现了 read_array_length 和 write_array_length,让我们实现 reader 和 writer 函数:

./gradlew processMessages

记录

RECORDS 对 Kafka 记录进行编码。该结构非常复杂,我不会在本指南中描述它(但是,如果您想要它,请在评论中告诉我?️。)为了简单起见,我们可以将记录视为 NULLABLE_BYTES 或 COMPACT_NULLABLE_BYTES (取决于消息版本)。

标记字段

标记字段是 Kafka 协议的扩展,它允许将可选数据附加到消息中。这个想法是双重的:

  1. 如果服务客户端不理解标记的字段,它会将其保存为未知并忽略它。
  2. 如果某个字段很少使用,可以跳过其默认值传输。

例如,看看这个字段。它有 taggedVersions,它表示从哪个版本开始标记该字段(在大多数情况下,它与添加该字段时的版本相同)。

标记字段包含:

  1. UNSIGNED_VARINT 类型的标签。
  2. COMPACT_BYTES类型的数据。

您可以在 KIP-482 中找到有关标记字段的更多详细信息。

让我们实现:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0

这里它们的标题是“未知”。已知字段需要在其结构内进行构建。

消息结构

高级消息结构非常简单。根据规范:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2

也就是说,它是一条消息本身,前面有其大小(以字节为单位)。请求和响应消息均由紧随其后的标头组成。由于某种原因,这没有明确记录4,但你可以相信我吗?或查看代码。

请求和响应头

请求头存在三个版本:0、1、2。它们在协议中指定为:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2

TAG_BUFFER 就是前面提到的标记字段。

让我们将它们实现为 Python 数据类:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0

如您所见,版本 2 中有一些标记字段,没有预期的已知字段。如果某些标记字段被错误地发送到代理,它将被忽略。

响应头存在两个版本:0和1。它们在协议中指定为:

./gradlew processMessages

让我们也实现它们:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0

我们没有实现请求标头的读取和响应标头的写入。这是为了简洁起见:在我们的示例中,我们不会发送响应标头并接收请求标头,因为我们不会对服务器端进行编程。但是,如果您也对服务器端感兴趣,则需要实现这两个功能(这应该很简单)。

相关ID

特别注意请求和响应标头中的correlation_id 字段。该协议支持管道:客户端每个连接可以有多个未完成的请求。相关 ID 允许其将响应与请求进行匹配。

标头版本选择

必须使用哪个版本是API密钥和消息版本的函数。目前协议指南中尚未明确记录5.
参考生成的类ApiMessageType中的requestHeaderVersion和responseHeaderVersion函数。

发送请求和接收响应

现在,掌握了所有这些知识和代码,让我们最终发送 ApiVersions 请求并接收和读取响应。 ApiVersions 通常是客户端发送的第一个请求。其目的是查找代理支持的 API 版本和功能。我们实施了最新版本 3。

在协议规范中,定义为:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2

让我们创建数据类:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2

回复:

def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes:
    value = buffer.read(num_bytes)
    if len(value) != num_bytes:
        raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}")
    return value

[api_keys] 表示“api_keys 数组”,其中 api_keys 是下面两行定义的结构体。

将其转换为 Python 数据类:

def read_int8(buffer: BinaryIO) -> int:
    return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)

当我们谈论数组时,我们需要知道我们是否需要紧凑数组或非紧凑数组。为了找到答案,我们来看看 ApiVersionsRequest.json 中的架构定义。可以看到 "flexibleVersions": "3 ",这意味着从版本 3 开始使用紧凑数组(更多信息请参见 schema 目录中的 README.md)。由于我们在这里使用版本 3,因此我们使用紧凑数组。

实现请求和响应类后,我们可以发送和接收这些请求。对于此 ApiVersions v3,我们需要 v2 请求标头和 v0 响应标头(检查生成的 ApiMessageType.java)。您可以在 ApiVersionsRequest.json 或协议规范中找到 API 密钥 (18)。

git clone git@github.com:apache/kafka.git
git checkout 3.8.0

如果运行此代码,您将看到控制台中打印的响应标头和消息。恭喜,您已经与 Kafka Broker 进行了正确的网络交换!

您会注意到 _unknownTaggedFields 中放入了三个标记字段。生成的 ApiVersionsResponseData 类的读写方法以及 ApiVersionsResponse.json 中的消息定义将帮助您解释它们。考虑一下这个作业吗?


  1. 在我的日常工作中,我们开发了一个开源库 Kio。它允许我们轻松地从 Python 进行任意 Kafka API 调用。序列化/反序列化代码,就像 Kafka 本身一样,是根据 JSON 协议定义生成的。生成的代码经过严格测试,包括针对真实 Java Kafka 代码的属性测试。 ↩

  2. 如果您愿意,也可以使用“消息”:某些模式不适用于 API,但例如对于磁盘上的数据。 ↩

  3. read_exact 函数有一个缺点,即当底层缓冲区已在内存中时,它会复制数据。然而,它对于教育目的来说更方便。 ↩

  4. 我做了一个 PR 来解决这个问题。 ↩

  5. 我再次发布了 PR 来解决这个问题。 ↩

以上是Kafka协议实用指南的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
什么是Python Switch语句?什么是Python Switch语句?Apr 30, 2025 pm 02:08 PM

本文讨论了Python版本3.10中介绍的新“匹配”语句,该语句与其他语言相同。它增强了代码的可读性,并为传统的if-elif-el提供了性能优势

Python中有什么例外组?Python中有什么例外组?Apr 30, 2025 pm 02:07 PM

Python 3.11中的异常组允许同时处理多个异常,从而改善了并发场景和复杂操作中的错误管理。

Python中的功能注释是什么?Python中的功能注释是什么?Apr 30, 2025 pm 02:06 PM

Python中的功能注释将元数据添加到函数中,以进行类型检查,文档和IDE支持。它们增强了代码的可读性,维护,并且在API开发,数据科学和图书馆创建中至关重要。

Python的单位测试是什么?Python的单位测试是什么?Apr 30, 2025 pm 02:05 PM

本文讨论了Python中的单位测试,其好处以及如何有效编写它们。它突出显示了诸如UNITSEST和PYTEST等工具进行测试。

Python中的访问说明符是什么?Python中的访问说明符是什么?Apr 30, 2025 pm 02:03 PM

文章讨论了Python中的访问说明符,这些说明符使用命名惯例表明班级成员的可见性,而不是严格的执法。

Python中的__Init __()是什么?自我如何在其中发挥作用?Python中的__Init __()是什么?自我如何在其中发挥作用?Apr 30, 2025 pm 02:02 PM

文章讨论了Python的\ _ \ _ Init \ _ \ _()方法和Self在初始化对象属性中的作用。还涵盖了其他类方法和继承对\ _ \ _ Init \ _ \ _()的影响。

python中的@classmethod,@staticmethod和实例方法有什么区别?python中的@classmethod,@staticmethod和实例方法有什么区别?Apr 30, 2025 pm 02:01 PM

本文讨论了python中@classmethod,@staticmethod和实例方法之间的差异,详细介绍了它们的属性,用例和好处。它说明了如何根据所需功能选择正确的方法类型和DA

您如何将元素附加到Python数组?您如何将元素附加到Python数组?Apr 30, 2025 am 12:19 AM

Inpython,YouAppendElementStoAlistusingTheAppend()方法。1)useappend()forsingleelements:my_list.append(4).2)useextend()orextend()或= formultiplelements:my_list.extend.extend(emote_list)ormy_list = [4,5,6] .3)useInsert()forspefificpositions:my_list.insert(1,5).beaware

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

功能强大的PHP集成开发环境

螳螂BT

螳螂BT

Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。

SecLists

SecLists

SecLists是最终安全测试人员的伙伴。它是一个包含各种类型列表的集合,这些列表在安全评估过程中经常使用,都在一个地方。SecLists通过方便地提供安全测试人员可能需要的所有列表,帮助提高安全测试的效率和生产力。列表类型包括用户名、密码、URL、模糊测试有效载荷、敏感数据模式、Web shell等等。测试人员只需将此存储库拉到新的测试机上,他就可以访问到所需的每种类型的列表。

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

DVWA

DVWA

Damn Vulnerable Web App (DVWA) 是一个PHP/MySQL的Web应用程序,非常容易受到攻击。它的主要目标是成为安全专业人员在合法环境中测试自己的技能和工具的辅助工具,帮助Web开发人员更好地理解保护Web应用程序的过程,并帮助教师/学生在课堂环境中教授/学习Web应用程序安全。DVWA的目标是通过简单直接的界面练习一些最常见的Web漏洞,难度各不相同。请注意,该软件中