php小編新一為你介紹一個常見的錯誤:反序列化/解析錯誤 KafkaProtobuf Python。使用KafkaProtobuf Python函式庫時,可能會遇到反序列化或解析錯誤的問題。這可能是由於訊息的序列化格式與消費者代碼不匹配,或是由於訊息的格式有誤導致的。解決這個問題的方法包括檢查訊息的序列化格式和消費者程式碼的相容性,以及確保訊息的格式正確。在本文中,我們將詳細介紹這個問題的原因和解決方法,希望能幫助你解決類似的錯誤。
func NewProducer(kafkaBrokerURL string, kafkaSchemaRegistryUrl string) { producerConfig := getKafkaProducerConfig(config.EnvConfig) producer, err := confluent_kafka.NewProducer(producerConfig) if err != nil { log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Producer") log.Panicf("Unable to create Kafka Producer") } client, err := schemaregistry.NewClient(schemaregistry.NewConfig(kafkaSchemaRegistryUrl)) if err != nil { log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Client") log.Panicf("Unable to create Kafka Client") } serializer, err := protobuf.NewSerializer(client, serde.ValueSerde, protobuf.NewSerializerConfig()) if err != nil { log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Serializer") log.Panicf("Unable to create Kafka Serializer") } KafkaProducerInstance = &KafkaProducer{ producer: producer, serializer: serializer, } log.Info("Created Kafka Producer and Serializer") }
func producerHelper[kdt KafkaMesageDataTypes](message kdt, topicName string) { deliveryChan := make(chan confluent_kafka.Event) payload, err := KafkaProducerInstance.serializer.Serialize(topicName, &message) if err != nil { log.Errorf("Failed to serialize payload: %v\n", err) close(deliveryChan) return } err = KafkaProducerInstance.producer.Produce(&confluent_kafka.Message{ TopicPartition: confluent_kafka.TopicPartition{Topic: &topicName, Partition: confluent_kafka.PartitionAny}, Value: payload, }, deliveryChan) if err != nil { log.Errorf("Failed to Produce: %v\n", err) close(deliveryChan) return } e := <-deliveryChan m := e.(*confluent_kafka.Message) if m.TopicPartition.Error != nil { log.Errorf("Delivery failed: %v\n", m.TopicPartition.Error) close(deliveryChan) return } else { log.Infof("Delivered message to topic %s [%d] at offset %v\n", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) } close(deliveryChan) }
from confluent_kafka import Consumer, KafkaError import KafkaDiagnoseResult_pb2 # replace with your generated module name from google.protobuf.message import DecodeError # Kafka consumer configuration conf = { 'bootstrap.servers': "localhost:9092/v3/", # Replace with your Kafka server address 'group.id': "myGroup", 'auto.offset.reset': 'earliest' } # Create a consumer instance consumer = Consumer(conf) # Subscribe to a topic from confluent_kafka import Consumer, KafkaError import KafkaDiagnoseResult_pb2 from google.protobuf.message import DecodeError # Kafka consumer configuration conf = { 'bootstrap.servers': "localhost:9092/v3/", 'group.id': "myGroup", 'auto.offset.reset': 'earliest' } # Create a consumer instance consumer = Consumer(conf) # Subscribe to a topic consumer.subscribe(['diagnosis']) try: while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event continue else: print(msg.error()) break # Deserialize the message try: data = KafkaDiagnoseResult_pb2.KafkaDiagnoseRequest() data.ParseFromString(msg.value()) except DecodeError as e: print(f"Error parsing message: {e}") print(f"Raw message data: {msg.value()}") print("Received message: ", data) except KeyboardInterrupt: pass finally: consumer.close()
解析訊息時出錯
#我正在嘗試調試它,但無法。
proton
產生 pb2 檔案。 感謝您的幫忙。
謝謝
我可以取得原始格式的訊息:
原始訊息資料:b'\x00\x00\x00\x00\x02\x02\x08\n$1775100a-1a47-48b2-93b7-b7a331be59b4\x12\tcompleted
print(" Decode 1: ", dict_str) print("Decode 2: ", ast.literal_eval(dict_str))以上程式碼的輸出:
Unparsed Message: b'\x00\x00\x00\x00\x02\x02\x08\n$ccb0ad7e-abb2-4af6-90d1-187381f9d47e\x12\tcompleted' Decode 1: $ccb0ad7e-abb2-4af6-90d1-187381f9d47e completed Inner Exception Here source code string cannot contain null bytes
解決方法
您的 Go 用戶端正在使用架構註冊表進行序列化,這意味著您的 Python 程式碼也必須執行相同的操作。這些記錄“不僅僅是 Protobuf”,因為模式 ID 也以位元組編碼,因此常規 Protobuf 解析器將失敗。
儲存庫中有用於透過登錄機碼整合使用 Protobuf 的範例程式碼
https://github.com/confluenceinc /confluence-kafka-python/blob/master/examples/protobuf_consumer.py#######以上是反序列化/解析錯誤 KafkaProtobuf Python的詳細內容。更多資訊請關注PHP中文網其他相關文章!