Maison >développement back-end >Tutoriel Python >Guide pratique du protocole Kafka
J'ai pas mal travaillé avec le protocole Apache Kafka au bas niveau. Ce n'était pas facile de commencer à faire cela en suivant uniquement le guide officiel et j'ai beaucoup lu le code. Avec cet article, je souhaite vous donner une longueur d'avance en vous guidant pas à pas depuis des valeurs primitives jusqu'à des demandes significatives.
Dans cet article :
Nous utiliserons Python comme langage de programmation. Cependant, le code sera sans dépendance et facilement portable dans la langue de votre choix.
Apache Kafka dispose d'un protocole binaire personnalisé versionné, avec différents types de données, champs facultatifs, etc. Malheureusement, il n'utilise pas de format de sérialisation bien connu comme Protobuf. Le schéma des messages de protocole est décrit en JSON. Le code Java réel qui effectue la sérialisation et la désérialisation est généré à partir de cette description.
Lorsque vous êtes dans le monde Java, vous pouvez utiliser la bibliothèque client officielle. Mais si vous utilisez une autre plateforme, vous comptez sur des implémentations tierces. Ils existent, mais ils se concentrent principalement sur le producteur et le consommateur, rarement sur certains aspects du client administrateur. Si vous devez faire autre chose, vous êtes seul.
Cet article vous aidera à commencer à pirater le protocole Kafka. (Si vous recherchez une bibliothèque de (dé)sérialisation Python prête pour le protocole Kafka, consultez Kio1. Pour Rust, jetez un œil à la bibliothèque sur laquelle je travaille.)
Vous pouvez trouver le code de cet article et d'autres tests similaires dans ce référentiel sur Github.
Vous pouvez retrouver la description officielle du protocole sur cette page. Je vous encourage à vous en familiariser, lisez au moins les sections "Préliminaires" et "Le Protocole".
Voici quelques faits saillants. Le protocole Kafka est un protocole de requête-réponse binaire basé sur TCP :
Chaque type de message API se compose de la paire demande et réponse et est identifié par une valeur numérique appelée clé API. Par exemple, Produce et Fetch, les RPC Kafka les plus caractéristiques, ont respectivement les clés API 0 et 1. Il existe aujourd'hui près de 90 types de messages API (certains d'entre eux sont inter-courtiers et non client-courtier).
Les requêtes et les réponses sont décrites par des schémas versionnés. Le versioning permet l'évolution du protocole, par exemple, l'ajout ou la suppression de champs ou la modification de leur type de données.
Voici quelques choses que vous pouvez faire pour commencer à travailler avec le protocole Kafka.
Le code Kafka est la source de vérité (pratiquement) sur le protocole. Consultez le code Kafka sur Github et passez à la version qui vous intéresse (par exemple 3.8.0) :
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Vous pouvez trouver les définitions des messages API en JSON dans clients/src/main/resources/common/message. Chaque fichier JSON contient la définition d'un type de message2 avec toutes ses versions. clients/src/main/resources/common/message/README.md donne un bon aperçu du format de définition de schéma. Faites attention à des éléments tels que les valeurs par défaut, les versions flexibles et les champs balisés.
Outre les types de messages API concrets qui vous intéressent, jetez un œil à clients/src/main/resources/common/message/RequestHeader.json et ResponseHeader.json, qui décrivent les en-têtes utilisés dans chaque échange requête-réponse. .
Exécutons le générateur de code :
./gradlew processMessages
Vous pouvez maintenant trouver les classes générées dans clients/src/generated/java/org/apache/kafka/common/message.
Jetez un œil à clients/src/generated/java/org/apache/kafka/common/message/ApiMessageType.java. Cet utilitaire :
Les autres fichiers sont générés un à un à partir des schémas JSON correspondants (parfois avec le suffixe Data, c'est une question de compatibilité). Dans ces fichiers vous trouverez :
Faites attention aux classes internes car elles représentent la structure complexe du message.
Exécuter Kafka dans Docker est un moyen pratique de faire fonctionner un courtier pour tester le protocole ou capturer l'échange réseau. Depuis la version 3.7.0, l'équipe Kafka crée des images Docker officielles, que vous pouvez exécuter en tant que :
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Si vous êtes intéressé par les anciennes versions, recherchez d'autres images dans Docker Hub. Cependant, cela n'est peut-être pas nécessaire étant donné que le protocole Kafka est compatible en amont et en aval : les nouveaux courtiers reconnaîtront très bien les anciennes versions du protocole et les anciens clients pourront communiquer avec les nouveaux courtiers.
Si vous lisez ceci, vous disposez probablement déjà des outils de ligne de commande Kafka sur votre machine, mais juste au cas où, vous pouvez également les exécuter dans Docker. Par exemple, exécutez ceci pour créer un sujet :
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Après nous être familiarisés avec le code Kafka, regardons le protocole en action. Wireshark est un outil largement utilisé pour de telles inspections. Il peut décortiquer le protocole Kafka (et prend en charge les dernières versions si votre version est suffisamment récente).
J'ai construit Wireshark à partir des sources de la version 4.5.0, car mon package de système d'exploitation est ancien et incapable de disséquer le protocole Kafka avec les nouvelles versions. Wireshark 4.5.0 devrait principalement prendre en charge les versions du protocole Kafka 3.7. Cependant, vous pouvez essayer la version disponible et voir comment elle fonctionne pour vous.
Exécutons Wireshark sur l'interface de bouclage avec le filtre de capture du port 9092 (1) et le filtre d'affichage kafka (2) :
Créez un sujet et voyez ce que Wireshark nous montre :
./gradlew processMessages
Le filtre d'affichage supprime tout ce qui n'est pas pertinent et ne laisse que les requêtes et réponses Kafka. Comme Wireshark comprend la plupart des versions de messages dans le protocole (en fonction de la version de Wireshark, bien sûr), vous pouvez facilement examiner la structure de chaque message. Wireshark affichera également les octets correspondants.
Wireshark est un excellent outil de débogage qui pourrait vous aider à comprendre comment le protocole fonctionne dans un cas particulier et quel est le problème avec votre implémentation.
Le protocole définit un certain nombre de types primitifs, dont vous pouvez trouver la description complète ici. Implémentons le code de lecture et d'écriture pour eux. Vous pouvez retrouver toutes les fonctions dans ce fichier, consultez également le fichier de test correspondant.
Ce sont des nombres entiers de longueur fixe connue : 1, 2, 4 ou 8 octets. Naturellement, vous pouvez trouver de tels champs dans tout le protocole. Dans ce cours, vous verrez comment (trivialement) leur lecture et leur écriture sont implémentées dans Kafka.
Définissons d'abord la fonction permettant de lire un nombre exact d'octets à partir d'un tampon3 :
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
L'indice de type BinaryIO en Python représente un objet à partir duquel les octets peuvent être lus et dans lequel ils peuvent être écrits. Il a des méthodes comme lire, écrire, dire (pour obtenir la position actuelle), chercher (pour changer la position).
Nous pouvons maintenant implémenter la lecture INT8 :
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Kafka utilise l'ordre des octets big-endian (réseau AKA), d'où byteorder="big".
Maintenant j'écris :
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Je ne répéterai pas cela pour INT16, INT32 et INT64 : les seules différences significatives sont le nombre d'octets (2, 4 et 8 respectivement) et les plages vérifiées ([-(2**15), 2* *15 - 1], [-(2**31), 2**31 - 1] et [-(2**63), 2**63 - 1] en conséquence).
UINT16 est similaire à INT16 :
./gradlew processMessages
Notez le signé=False ici.
BOOLEAN est essentiellement INT8 avec une logique supplémentaire : == 0 signifie faux, != 0 signifie vrai.
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Vous pouvez voir un exemple de BOOLEAN dans le champ allowAutoTopicCreation de la classe générée MetadataRequestData.
FLOAT64 est une valeur IEEE 754 64 bits double précision. Python n'a pas to_bytes et from_bytes pour float comme il l'a pour int. Nous utiliserons donc à la place le module struct de la bibliothèque standard.
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
>d signifie "une valeur double dans l'ordre des octets big-endian".
Les entiers de longueur variable sont une approche qui permet d'utiliser moins de bits par valeur lorsque les valeurs sont petites. Kafka utilise l'approche Varint de Protocol Buffers. L'idée est simple :
Chaque octet du varint a un bit de continuation qui indique si l'octet qui le suit fait partie du varint. Il s'agit du bit de poids fort (MSB) de l'octet (parfois également appelé bit de signe). Les 7 bits inférieurs sont une charge utile ; l'entier résultant est construit en ajoutant ensemble les charges utiles de 7 bits de ses octets constitutifs.
Vous pouvez consulter la spécification Protobuf et l'implémentation de Kafka (lecture, écriture) pour plus de détails.
Ce type n'est pas utilisé dans les champs de protocole en soi, mais il est utilisé pour les collections compactes décrites ci-dessous.
Mettez-le en œuvre. Pour plus de confiance, nous obtenons quelques exemples directement de la source de vérité, la classe ByteUtils de Kafka :
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
En exécutant ceci, nous obtiendrons :
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
Implémentons cela de la manière probablement pas la plus performante, mais simple :
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
Les UUID sont des valeurs de 128 bits utilisées pour identifier les entités de manière unique. Par exemple, ils sont utilisés pour transmettre des identifiants de sujet dans CreateTopicsResponse.
Vous pouvez voir comment ils sont lus et écrits dans le code Kafka. C'est simple à reproduire :
def write_int8(value: int, buffer: BinaryIO) -> None: if -(2**7) <= value <= 2**7 - 1: buffer.write(value.to_bytes(1, byteorder="big", signed=True)) else: raise ValueError(f"Value {value} is out of range for INT8")
Notez que Kafka traite null/None comme un UUID nul, nous faisons donc la même chose ici.
Le protocole Kafka comporte 4 types de chaînes :
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_STRING | NULLABLE_STRING |
non-nullable | COMPACT_STRING | STRING |
La compacité indique si la longueur de la chaîne est codée avec INT16 ou avec UNSIGNED_VARINT. Cela dépend de la version du message (il a été introduit vers 2017). La nullité indique si la valeur peut être nulle. Cela dépend de l'objectif du message et de la version également (parfois les champs de chaîne deviennent facultatifs lors de l'évolution du protocole).
Les chaînes sont omniprésentes dans le protocole. Par exemple, consultez le nom du champ dans la classe générée MetadataRequestData.MetadataRequestTopic.
Les chaînes sont codées assez simplement : il y a d'abord la longueur, puis vient le corps codé en UTF-8. La longueur maximale autorisée est de 32 767 octets. Les chaînes nulles ont une longueur de -1 et évidemment pas de corps.
Comme la seule différence entre compact et non compact réside dans la manière dont la longueur de la chaîne est codée, nous pouvons avoir une fonction pour les deux modes.
Commençons par lire et écrire des chaînes nullables :
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Des fonctions de chaîne non nullables peuvent être construites sur celles-ci :
./gradlew processMessages
Les tableaux d'octets sont très similaires aux chaînes. Ils ont la même nullité potentielle et la même compacité :
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_BYTES | NULLABLE_BYTES |
non-nullable | COMPACT_BYTES | BYTES |
Ils sont également codés de la même manière : longueur corps. Naturellement, le corps n’est pas traité comme une chaîne UTF-8, mais comme un tableau d’octets opaque. La longueur maximale d'un tableau d'octets est de 2147483647 ;
Vous pouvez trouver un exemple d'octets dans les métadonnées du champ dans la classe générée JoinGroupRequestData.JoinGroupRequestProtocol.
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Comme vous pouvez le constater, la différence entre ces fonctions et les fonctions correspondantes pour les chaînes est faible.
Le protocole prend en charge les tableaux de types autres que les octets : chaînes, nombres, structures (mais pas les tableaux imbriqués) : ARRAY et COMPACT_ARRAY. La compacité est la même que dans les tableaux d'octets et les chaînes.
La nullité n'est pas explicitement mentionnée dans la spécification du protocole pour une raison quelconque. Cependant, les tableaux peuvent être nullables. Ceci est contrôlé par nullableVersions dans les définitions de schéma, comme ici.
Considérant que nous avons déjà implémenté read_array_length et write_array_length, implémentons les fonctions de lecture et d'écriture :
./gradlew processMessages
ENREGISTREMENTS encodent les enregistrements Kafka. La structure est assez complexe et je ne vais pas la décrire dans ce guide (cependant, merci de me le faire savoir dans les commentaires ?️ si vous souhaitez l'avoir.) Pour plus de simplicité, nous pouvons traiter les enregistrements comme NULLABLE_BYTES ou COMPACT_NULLABLE_BYTES ( selon la version du message).
Les champs balisés sont une extension du protocole Kafka qui permet de joindre des données facultatives aux messages. L'idée est double :
Jetez par exemple un oeil à ce champ. Il a taggedVersions, qui indique depuis quelle version ce champ est balisé (dans la plupart des cas, c'est la même version lorsque le champ a été ajouté).
Un champ balisé se compose de :
Vous pouvez trouver plus de détails sur les champs balisés dans KIP-482.
Mettez en œuvre :
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Ici, ils sont intitulés « inconnus ». Les champs connus doivent l'être à l'intérieur de leurs structures.
La structure du message de haut niveau est très simple. Selon le cahier des charges :
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
C'est-à-dire qu'il s'agit d'un message lui-même précédé de sa taille en octets. Les messages de demande et de réponse sont constitués de l’en-tête immédiatement suivi du corps. Pour une raison quelconque, cela n'est pas explicitement documenté4, mais vous pouvez me faire confiance ? ou vérifiez le code.
L'en-tête de requête existe en trois versions : 0, 1 et 2. Elles sont spécifiées dans le protocole comme :
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
TAG_BUFFER correspond aux champs balisés mentionnés précédemment.
Implémentons-les en tant que classes de données Python :
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Comme vous pouvez le voir, il y a des champs balisés dans la version 2, il n'y a aucun champ connu attendu. Si un champ balisé est envoyé par erreur au courtier, il sera ignoré.
L'en-tête de réponse existe en deux versions : 0 et 1. Elles sont spécifiées dans le protocole comme suit :
./gradlew processMessages
Mettons-les également en œuvre :
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Nous n'implémentons pas de lecture pour les en-têtes de requête et d'écriture pour ceux de réponse. Par souci de concision : nous n'allons pas envoyer d'en-têtes de réponse et recevoir ceux de requête dans nos exemples car nous ne programmons pas le côté serveur. Cependant, si vous êtes également intéressé par le côté serveur, vous devez implémenter les deux fonctions (ce qui devrait être simple).
Notez particulièrement le champ corrélation_id dans les en-têtes de requête et de réponse. Le protocole prend en charge le pipeline : le client peut avoir plus d'une requête en attente par connexion. L'ID de corrélation lui permet de faire correspondre les réponses aux demandes.
La version à utiliser dépend de la clé API et de la version du message. Ce n'est actuellement pas documenté explicitement dans le guide du protocole5.
Utilisez les fonctions requestHeaderVersion et ResponseHeaderVersion dans la classe générée ApiMessageType comme référence.
Maintenant, ayant toutes ces connaissances et ce code, envoyons enfin une requête ApiVersions et recevons et lisons une réponse. ApiVersions est normalement la première requête envoyée par le client. Son objectif est de trouver les versions et fonctionnalités de l'API prises en charge par le courtier. Nous implémentons la dernière version 3.
Dans la spécification du protocole, il est défini comme :
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Créons la classe de données :
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Et la réponse :
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
[api_keys] signifie "un tableau d'api_keys", où api_keys est la structure définie deux lignes ci-dessous.
Conversion en classes de données Python :
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
Quand nous parlons de tableaux, nous devons savoir si nous avons besoin de tableaux compacts ou non compacts. Pour le savoir, jetons un œil à la définition du schéma dans ApiVersionsRequest.json. Vous pouvez voir "flexibleVersions": "3", ce qui signifie que les tableaux compacts sont utilisés à partir de la version 3 (plus d'informations à ce sujet dans README.md dans le répertoire du schéma). Puisque nous travaillons ici avec la version 3, nous utilisons des tableaux compacts.
Une fois les classes de requêtes et de réponses implémentées, nous pouvons envoyer et recevoir ces requêtes. Pour cette ApiVersions v3, nous avons besoin de l'en-tête de requête v2 et de l'en-tête de réponse v0 (vérifiez l'ApiMessageType.java généré). La clé API (18) que vous pouvez trouver dans ApiVersionsRequest.json ou dans la spécification du protocole.
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Si vous exécutez ce code, vous verrez l'en-tête de réponse et le message imprimés dans la console. Félicitations, vous avez effectué un échange réseau correct avec le courtier Kafka !
Vous remarquerez trois champs balisés placés dans _unknownTaggedFields. Les méthodes de lecture et d'écriture de la classe ApiVersionsResponseData générée ainsi que la définition du message dans ApiVersionsResponse.json vous aideront à les interpréter. Considérez ce devoir ?
Dans mon travail quotidien, nous avons développé une bibliothèque open source Kio. Cela nous permet d'effectuer facilement des appels arbitraires à l'API Kafka depuis Python. Le code de sérialisation/désérialisation, comme dans Kafka lui-même, est généré à partir des définitions du protocole JSON. Le code généré est rigoureusement testé, y compris des tests de propriétés par rapport au vrai code Java Kafka. ↩
Ou "message" si vous préférez : certains schémas ne sont pas destinés à l'API, mais par ex. pour les données sur disque. ↩
La fonction read_exact présente l'inconvénient de dupliquer les données lorsque le tampon sous-jacent est déjà en mémoire. Cependant, c'est plus pratique à des fins éducatives. ↩
J'ai fait un PR pour résoudre ce problème. ↩
Encore une fois, j'ai fait un PR pour résoudre ce problème. ↩
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!