저는 낮은 수준에서 Apache Kafka 프로토콜을 꽤 많이 사용했습니다. 공식 가이드만 따라 이 작업을 시작하는 것은 쉽지 않았고 코드를 많이 읽었습니다. 이 게시물을 통해 원시적인 가치부터 의미 있는 요청까지 단계별로 안내하여 여러분에게 유리한 출발을 제공하고 싶습니다.
이 게시물의 내용:
프로그래밍 언어로 Python을 사용하겠습니다. 그러나 코드는 종속성이 없으며 선택한 언어로 쉽게 이식할 수 있습니다.
Apache Kafka에는 다양한 데이터 유형, 선택적 필드 등으로 버전이 관리되는 사용자 정의 바이너리 프로토콜이 있습니다. 불행하게도 Protobuf와 같이 잘 알려진 직렬화 형식을 사용하지 않습니다. 프로토콜 메시지 스키마는 JSON에 설명되어 있습니다. 직렬화 및 역직렬화를 수행하는 실제 Java 코드는 이 설명에서 생성됩니다.
Java 세계에서는 공식 클라이언트 라이브러리를 사용할 수 있습니다. 그러나 다른 플랫폼을 사용하는 경우에는 타사 구현에 의존하게 됩니다. 존재하지만 주로 생산자와 소비자에 초점을 맞추고 관리 클라이언트의 일부 측면에는 거의 초점을 맞추지 않습니다. 다른 일을 해야 할 경우에는 스스로 해결하세요.
이 게시물은 Kafka 프로토콜 해킹을 시작하는 데 도움이 될 것입니다. (Kafka 프로토콜을 위한 준비된 Python (역)직렬화 라이브러리를 찾고 있다면 Kio1를 확인하세요. Rust의 경우 제가 작업 중인 라이브러리를 살펴보세요.)
이 게시물의 코드와 Github의 이 저장소에서 유사한 테스트를 찾을 수 있습니다.
공식 프로토콜 설명은 이 페이지에서 확인하실 수 있습니다. 적어도 "예비" 및 "프로토콜" 섹션을 읽으면서 이 내용을 숙지하시기 바랍니다.
다음은 몇 가지 주요 내용입니다. Kafka 프로토콜은 TCP 기반 바이너리 요청-응답 프로토콜입니다.
각 API 메시지 유형은 요청과 응답 쌍으로 구성되며 API 키라는 숫자 값으로 식별됩니다. 예를 들어 Kafka의 가장 특징적인 RPC인 Produce와 Fetch는 이에 상응하는 API 키 0과 1을 갖습니다. 요즘에는 API 메시지 유형이 90개에 가깝습니다(그 중 일부는 클라이언트-브로커가 아닌 인터브로커입니다).
요청과 응답은 버전이 지정된 스키마로 설명됩니다. 버전 관리를 통해 필드 추가 또는 제거, 데이터 유형 변경 등 프로토콜 발전이 가능해졌습니다.
Kafka 프로토콜 작업을 시작하기 위해 수행할 수 있는 몇 가지 작업은 다음과 같습니다.
Kafka 코드는 프로토콜에 대한 (실질적으로) 진실의 원천입니다. Github에서 Kafka 코드를 확인하고 관심 있는 릴리스(예: 3.8.0)로 전환하세요.
git clone git@github.com:apache/kafka.git git checkout 3.8.0
클라이언트/src/main/resources/common/message에서 JSON의 API 메시지 정의를 찾을 수 있습니다. 각 JSON 파일에는 모든 버전과 함께 하나의 메시지2 유형 정의가 포함되어 있습니다. 클라이언트/src/main/resources/common/message/README.md는 스키마 정의 형식에 대한 좋은 개요를 제공합니다. 기본값, 유연한 버전, 태그가 지정된 필드 등에 주의하세요.
관심 있는 구체적인 API 메시지 유형 외에도 각 요청-응답 교환에 사용되는 헤더를 설명하는 클라이언트/src/main/resources/common/message/RequestHeader.json 및 ResponseHeader.json을 살펴보세요. .
코드 생성기를 실행해 보겠습니다.
./gradlew processMessages
이제 클라이언트/src/generated/java/org/apache/kafka/common/message에서 생성된 클래스를 찾을 수 있습니다.
clients/src/generated/java/org/apache/kafka/common/message/ApiMessageType.java를 살펴보세요. 이 유틸리티:
다른 파일은 해당 스키마 JSON에서 일대일로 생성됩니다(때때로 데이터 접미사가 있는 경우 호환성 문제임). 이 파일에서 다음을 찾을 수 있습니다:
내부 클래스에 주의하세요. 메시지의 복잡한 구조를 나타내기 때문입니다.
Docker에서 Kafka를 실행하는 것은 브로커를 실행하여 프로토콜을 테스트하거나 네트워크 교환을 캡처하는 편리한 방법입니다. 버전 3.7.0부터 Kafka 팀은 다음과 같이 실행할 수 있는 공식 Docker 이미지를 구축합니다.
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
이전 버전에 관심이 있다면 Docker Hub에서 다른 이미지를 검색하세요. 그러나 Kafka 프로토콜이 이전 버전과 호환된다는 점을 고려하면 이는 필요하지 않을 수 있습니다. 새 브로커는 이전 프로토콜 버전을 잘 인식하고 이전 클라이언트는 새 브로커와 통신할 수 있습니다.
이 글을 읽으셨다면 아마도 여러분의 컴퓨터에 Kafka 명령줄 도구가 이미 있을 것입니다. 그러나 만약을 대비해 Docker에서 실행할 수도 있습니다. 예를 들어 다음을 실행하여 주제를 생성합니다.
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Kafka 코드에 익숙해졌으니, 실제 프로토콜을 살펴보겠습니다. Wireshark는 이러한 검사에 널리 사용되는 도구입니다. Kafka 프로토콜을 분석할 수 있습니다(버전이 충분히 최신인 경우 최신 버전을 지원합니다).
운영 체제 패키지가 오래되어 Kafka 프로토콜을 새 버전으로 분석할 수 없기 때문에 버전 4.5.0 소스에서 Wireshark를 구축했습니다. Wireshark 4.5.0은 대부분 Kafka 3.7 프로토콜 버전을 지원해야 합니다. 그러나 사용 가능한 버전을 사용해 보고 그것이 어떻게 작동하는지 확인할 수 있습니다.
포트 9092 캡처 필터(1) 및 kafka 디스플레이 필터(2)를 사용하여 루프백 인터페이스에서 Wireshark를 실행해 보겠습니다.
주제를 만들고 Wireshark가 표시하는 내용을 확인하세요.
./gradlew processMessages
표시 필터는 관련 없는 모든 항목을 제거하고 Kafka 요청 및 응답만 남깁니다. Wireshark는 프로토콜의 대부분의 메시지 버전(물론 Wireshark 버전에 따라 다름)을 이해하므로 각 메시지의 구조를 편리하게 살펴볼 수 있습니다. Wireshark는 해당 바이트도 표시합니다.
Wireshark는 특정 사례에서 프로토콜이 어떻게 작동하는지, 구현에 어떤 문제가 있는지 이해하는 데 도움이 되는 훌륭한 디버깅 도구입니다.
프로토콜은 다양한 기본 유형을 정의하며 자세한 설명은 여기에서 찾을 수 있습니다. 이에 대한 읽기 및 쓰기 코드를 구현해 보겠습니다. 이 파일에서 모든 기능을 찾을 수 있으며, 해당 테스트 파일도 확인해 보세요.
고정 길이가 알려진 정수(1, 2, 4 또는 8바이트)입니다. 당연히 프로토콜 전체에서 이러한 필드를 많이 찾을 수 있습니다. 이 수업에서는 읽기와 쓰기가 Kafka에서 어떻게 구현되는지 (사소하게) 볼 수 있습니다.
먼저 버퍼에서 정확한 바이트 수를 읽는 함수를 정의해 보겠습니다3:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Python의 BinaryIO 유형 힌트는 바이트를 읽고 쓸 수 있는 객체를 나타냅니다. 읽기, 쓰기, 말하기(현재 위치 가져오기), 탐색(위치 변경)과 같은 메소드가 있습니다.
이제 INT8 읽기를 구현할 수 있습니다.
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Kafka는 빅엔디안(AKA 네트워크) 바이트 순서를 사용하므로 byteorder="big"입니다.
지금 쓰고 있는 내용:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
INT16, INT32 및 INT64에 대해서는 이 내용을 반복하지 않겠습니다. 유일하게 중요한 차이점은 바이트 수(각각 2, 4, 8)와 확인된 범위([-(2**15), 2*)입니다. *15 - 1], [-(2**31), 2**31 - 1] 및 [-(2**63), 2**63 - 1] 해당).
UINT16은 INT16과 유사합니다.
./gradlew processMessages
여기서 signed=False임을 참고하세요.
BOOLEAN은 기본적으로 추가 논리가 포함된 INT8입니다. == 0은 false를 의미하고, != 0은 true를 의미합니다.
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
MetadataRequestData 생성 클래스의 allowedAutoTopicCreation 필드에서 BOOLEAN의 예를 볼 수 있습니다.
FLOAT64는 배정밀도 64비트 IEEE 754 값입니다. Python에는 int처럼 float에 대해 to_bytes 및 from_bytes가 없습니다. 따라서 대신 표준 라이브러리의 struct 모듈을 사용하겠습니다.
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
>d는 "빅엔디안 바이트 순서의 이중 값"을 의미합니다.
가변 길이 정수는 값이 작을 때 값당 더 적은 비트를 사용할 수 있는 접근 방식입니다. Kafka는 프로토콜 버퍼의 varint 접근 방식을 사용합니다. 아이디어는 간단합니다.
varint의 각 바이트에는 뒤에 오는 바이트가 varint의 일부인지 여부를 나타내는 연속 비트가 있습니다. 이는 바이트의 최상위 비트(MSB)입니다(부호 비트라고도 함). 하위 7비트는 페이로드입니다. 결과 정수는 구성 바이트의 7비트 페이로드를 함께 추가하여 구축됩니다.
자세한 내용은 Protobuf 사양 및 Kafka 구현(읽기, 쓰기)을 확인하실 수 있습니다.
이 유형은 프로토콜 필드 자체에서는 사용되지 않지만 아래 설명된 압축 컬렉션에 사용됩니다.
실천해 보겠습니다. 신뢰도를 높이기 위해 Kafka의 ByteUtils 클래스인 Kafka의 ByteUtils 클래스에서 직접 몇 가지 예를 가져옵니다.
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
이를 실행하면 다음을 얻을 수 있습니다.
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
가장 성능이 좋지는 않지만 간단한 방법으로 구현해 보겠습니다.
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
UUID는 엔터티를 고유하게 식별하는 데 사용되는 128비트 값입니다. 예를 들어 CreateTopicsResponse에서 주제 ID를 전달하는 데 사용됩니다.
Kafka 코드에서 어떻게 읽고 쓰는지 확인할 수 있습니다. 재현이 간단합니다:
def write_int8(value: int, buffer: BinaryIO) -> None: if -(2**7) <= value <= 2**7 - 1: buffer.write(value.to_bytes(1, byteorder="big", signed=True)) else: raise ValueError(f"Value {value} is out of range for INT8")
Kafka는 null/None을 0 UUID로 처리하므로 여기서도 동일한 작업을 수행합니다.
Kafka 프로토콜에는 4가지 유형의 문자열이 있습니다.
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_STRING | NULLABLE_STRING |
non-nullable | COMPACT_STRING | STRING |
압축성은 문자열 길이가 INT16 또는 UNSIGNED_VARINT로 인코딩되는지 여부를 나타냅니다. 메시지 버전에 따라 다릅니다(2017년경에 도입됨). Null 허용 여부는 값이 Null이 될 수 있는지 여부입니다. 이는 메시지 목적과 버전에 따라 다릅니다(때때로 프로토콜이 발전하는 동안 문자열 필드가 선택 사항이 됨).
문자열은 프로토콜 어디에나 존재합니다. 예를 들어 생성된 클래스 MetadataRequestData.MetadataRequestTopic의 필드 이름을 참조하세요.
문자열은 매우 간단하게 인코딩됩니다. 먼저 길이를 입력한 다음 UTF-8로 인코딩된 본문을 가져옵니다. 허용되는 최대 길이는 32767바이트입니다. Null 문자열은 길이가 -1이고 본문이 없습니다.
컴팩트와 비컴팩트의 유일한 차이점은 문자열 길이가 인코딩되는 방식이므로 두 모드 모두에 대해 하나의 기능을 가질 수 있습니다.
null 허용 문자열을 읽고 쓰는 것부터 시작해 보겠습니다.
git clone git@github.com:apache/kafka.git git checkout 3.8.0
null을 허용하지 않는 문자열 함수는 다음을 기반으로 구축할 수 있습니다.
./gradlew processMessages
바이트 배열은 문자열과 매우 유사합니다. 동일한 잠재적인 null 허용 여부와 압축성을 갖습니다.
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_BYTES | NULLABLE_BYTES |
non-nullable | COMPACT_BYTES | BYTES |
또한 동일한 방식으로 인코딩됩니다: 길이 본문. 당연히 본문은 UTF-8 문자열이 아닌 불투명 바이트 배열로 처리됩니다. 바이트 배열의 최대 길이는 2147483647입니다.
생성된 JoinGroupRequestData.JoinGroupRequestProtocol 클래스의 필드 메타데이터에서 바이트의 예를 찾을 수 있습니다.
git clone git@github.com:apache/kafka.git git checkout 3.8.0
보시다시피 이러한 함수와 문자열에 해당하는 함수의 차이는 작습니다.
이 프로토콜은 바이트 이외의 유형 배열(문자열, 숫자, 구조체(중첩 배열 제외)): ARRAY 및 COMPACT_ARRAY를 지원합니다. 압축성은 바이트 배열 및 문자열과 동일합니다.
어떤 이유로든 프로토콜 사양에 Null 허용 여부가 명시적으로 언급되어 있지 않습니다. 그러나 배열은 null을 허용할 수 있습니다. 이는 여기와 같이 스키마 정의의 nullableVersions에 의해 제어됩니다.
이미 read_array_length 및 write_array_length를 구현한 것을 고려하여 판독기 및 기록기 기능을 구현해 보겠습니다.
./gradlew processMessages
RECORDS는 Kafka 레코드를 인코딩합니다. 구조가 꽤 복잡해서 이 가이드에서는 설명하지 않겠습니다. (단, 원하는 경우 댓글로 알려주시기 바랍니다.) 단순화를 위해 레코드를 NULLABLE_BYTES 또는 COMPACT_NULLABLE_BYTES( 메시지 버전에 따라 다름).
태그된 필드는 선택적 데이터를 메시지에 첨부할 수 있는 Kafka 프로토콜의 확장입니다. 아이디어는 두 가지입니다:
예를 들어 이 분야를 살펴보세요. 이 필드에 태그가 지정된 버전을 알려주는 taggedVersions가 있습니다(대부분의 경우 필드가 추가되었을 때와 동일한 버전입니다).
태그된 필드는 다음으로 구성됩니다.
KIP-482에서 태그된 필드에 대한 자세한 내용을 확인할 수 있습니다.
다음을 구현해 보겠습니다.
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
여기서는 '알 수 없음'이라는 제목이 붙어 있습니다. 알려진 필드는 해당 구조 내부에 만들어져야 합니다.
상위 수준 메시지 구조는 매우 간단합니다. 사양에 따르면:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
즉, 바이트 단위의 크기가 앞에 오는 메시지 자체입니다. 요청 및 응답 메시지는 모두 헤더 바로 뒤에 본문이 오는 구성입니다. 어떤 이유에서인지 이 내용은 명시적으로 문서화되어 있지 않습니다4. 하지만 저를 믿으시겠어요? 또는 코드를 확인하세요.
요청 헤더는 0, 1, 2의 세 가지 버전으로 존재합니다. 프로토콜에서 다음과 같이 지정됩니다.
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
TAG_BUFFER는 앞서 언급한 태그된 필드입니다.
Python 데이터 클래스로 구현해 보겠습니다.
git clone git@github.com:apache/kafka.git git checkout 3.8.0
보시다시피 버전 2에는 일부 태그된 필드가 있지만 예상되는 알려진 필드는 없습니다. 일부 태그된 필드가 브로커에 잘못 전송되면 무시됩니다.
응답 헤더는 0과 1의 두 가지 버전으로 존재합니다. 프로토콜에서 다음과 같이 지정됩니다.
./gradlew processMessages
이를 구현해 보겠습니다.
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
요청 헤더에 대해서는 읽기를 구현하지 않고 응답 헤더에 대해서는 쓰기를 구현하지 않습니다. 이는 간결성을 위한 것입니다. 우리는 서버 측을 프로그래밍하지 않으므로 예제에서는 응답 헤더를 보내고 요청 헤더를 받지 않을 것입니다. 하지만 서버 측에도 관심이 있다면 두 기능을 모두 구현해야 합니다(간단해야 합니다).
요청 및 응답 헤더의 Correlation_id 필드에 특히 유의하세요. 프로토콜은 파이프라인을 지원합니다. 클라이언트는 연결당 하나 이상의 미해결 요청을 가질 수 있습니다. 상관 관계 ID를 사용하면 요청에 대한 응답을 일치시킬 수 있습니다.
어떤 버전을 사용해야 하는지는 API 키와 메시지 버전의 기능입니다. 현재 프로토콜 가이드에는 명시적으로 문서화되어 있지 않습니다5.
생성된 ApiMessageType 클래스의 requestHeaderVersion 및 responseHeaderVersion 함수를 참조로 사용하세요.
이제 모든 지식과 코드를 갖추었으므로 마침내 ApiVersions 요청을 보내고 응답을 받아 읽어 보겠습니다. ApiVersions는 일반적으로 클라이언트가 보내는 첫 번째 요청입니다. 그 목적은 브로커가 지원하는 API 버전과 기능을 찾는 것입니다. 최신 버전 3을 구현했습니다.
프로토콜 사양에서는 다음과 같이 정의됩니다.
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
데이터 클래스를 만들어 보겠습니다.
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
그리고 응답은 다음과 같습니다.
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
[api_keys]는 "api_keys의 배열"을 의미합니다. 여기서 api_keys는 아래 두 줄에 정의된 구조입니다.
이를 Python 데이터 클래스로 변환:
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
어레이에 관해 이야기할 때는 컴팩트한 배열이 필요한지, 컴팩트하지 않은 배열이 필요한지 알아야 합니다. 이를 알아보려면 ApiVersionsRequest.json의 스키마 정의를 살펴보겠습니다. "flexibleVersions": "3 "을 볼 수 있습니다. 이는 버전 3부터 컴팩트 배열이 사용된다는 의미입니다(자세한 내용은 스키마 디렉터리의 README.md에서 확인하세요). 여기서는 버전 3으로 작업하고 있으므로 컴팩트 배열을 사용합니다.
요청 및 응답 클래스를 구현하면 이러한 요청을 보내고 받을 수 있습니다. 이 ApiVersions v3의 경우 v2 요청 헤더와 v0 응답 헤더가 필요합니다(생성된 ApiMessageType.java 확인). API 키(18)는 ApiVersionsRequest.json 또는 프로토콜 사양에서 찾을 수 있습니다.
git clone git@github.com:apache/kafka.git git checkout 3.8.0
이 코드를 실행하면 콘솔에 응답 헤더와 메시지가 출력되는 것을 볼 수 있습니다. 축하합니다. Kafka 브로커와 올바른 네트워크 교환을 수행했습니다!
_unknownTaggedFields에 세 개의 태그된 필드가 있는 것을 볼 수 있습니다. 생성된 ApiVersionsResponseData 클래스의 읽기 및 쓰기 메서드와 ApiVersionsResponse.json의 메시지 정의는 이를 해석하는 데 도움이 됩니다. 이 숙제를 고려해보세요 ?
저는 본업으로 오픈소스 라이브러리 Kio를 개발했습니다. 이를 통해 Python에서 임의의 Kafka API 호출을 쉽게 수행할 수 있습니다. Kafka 자체와 마찬가지로 직렬화/역직렬화 코드는 JSON 프로토콜 정의에서 생성됩니다. 생성된 코드는 실제 Java Kafka 코드에 대한 속성 테스트를 포함하여 엄격하게 테스트됩니다. ↩
또는 원하는 경우 "메시지"를 사용하세요. 일부 스키마는 API용이 아니지만 예를 들어 디스크상의 데이터용. ↩
read_exact 함수에는 기본 버퍼가 이미 메모리에 있을 때 데이터를 복제한다는 단점이 있습니다. 그러나 교육 목적으로는 더 편리합니다. ↩
이 문제를 해결하기 위해 홍보를 했습니다. ↩
이 문제를 해결하기 위해 다시 PR을 했습니다. ↩
위 내용은 Kafka 프로토콜 실무 가이드의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!