Rumah >pembangunan bahagian belakang >Golang >Ralat penyahserialisasian/penghuraian KafkaProtobuf Python
Editor PHP Xinyi memperkenalkan anda kepada ralat biasa: ralat penyahserilan/penghuraian KafkaProtobuf Python. Apabila menggunakan perpustakaan Python KafkaProtobuf, anda mungkin menghadapi masalah dengan ralat penyahserikatan atau penghuraian. Ini mungkin disebabkan oleh ketidakpadanan antara format bersiri mesej dan kod pengguna, atau kerana mesej itu salah bentuk. Penyelesaian kepada masalah ini termasuk menyemak format bersiri mesej dan kod pengguna untuk keserasian, serta memastikan mesej diformat dengan baik. Dalam artikel ini, kami akan memperincikan punca dan penyelesaian masalah ini, dengan harapan dapat membantu anda menyelesaikan ralat yang serupa.
func NewProducer(kafkaBrokerURL string, kafkaSchemaRegistryUrl string) { producerConfig := getKafkaProducerConfig(config.EnvConfig) producer, err := confluent_kafka.NewProducer(producerConfig) if err != nil { log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Producer") log.Panicf("Unable to create Kafka Producer") } client, err := schemaregistry.NewClient(schemaregistry.NewConfig(kafkaSchemaRegistryUrl)) if err != nil { log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Client") log.Panicf("Unable to create Kafka Client") } serializer, err := protobuf.NewSerializer(client, serde.ValueSerde, protobuf.NewSerializerConfig()) if err != nil { log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Serializer") log.Panicf("Unable to create Kafka Serializer") } KafkaProducerInstance = &KafkaProducer{ producer: producer, serializer: serializer, } log.Info("Created Kafka Producer and Serializer") }
func producerHelper[kdt KafkaMesageDataTypes](message kdt, topicName string) { deliveryChan := make(chan confluent_kafka.Event) payload, err := KafkaProducerInstance.serializer.Serialize(topicName, &message) if err != nil { log.Errorf("Failed to serialize payload: %v\n", err) close(deliveryChan) return } err = KafkaProducerInstance.producer.Produce(&confluent_kafka.Message{ TopicPartition: confluent_kafka.TopicPartition{Topic: &topicName, Partition: confluent_kafka.PartitionAny}, Value: payload, }, deliveryChan) if err != nil { log.Errorf("Failed to Produce: %v\n", err) close(deliveryChan) return } e := <-deliveryChan m := e.(*confluent_kafka.Message) if m.TopicPartition.Error != nil { log.Errorf("Delivery failed: %v\n", m.TopicPartition.Error) close(deliveryChan) return } else { log.Infof("Delivered message to topic %s [%d] at offset %v\n", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) } close(deliveryChan) }
from confluent_kafka import Consumer, KafkaError import KafkaDiagnoseResult_pb2 # replace with your generated module name from google.protobuf.message import DecodeError # Kafka consumer configuration conf = { 'bootstrap.servers': "localhost:9092/v3/", # Replace with your Kafka server address 'group.id': "myGroup", 'auto.offset.reset': 'earliest' } # Create a consumer instance consumer = Consumer(conf) # Subscribe to a topic from confluent_kafka import Consumer, KafkaError import KafkaDiagnoseResult_pb2 from google.protobuf.message import DecodeError # Kafka consumer configuration conf = { 'bootstrap.servers': "localhost:9092/v3/", 'group.id': "myGroup", 'auto.offset.reset': 'earliest' } # Create a consumer instance consumer = Consumer(conf) # Subscribe to a topic consumer.subscribe(['diagnosis']) try: while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event continue else: print(msg.error()) break # Deserialize the message try: data = KafkaDiagnoseResult_pb2.KafkaDiagnoseRequest() data.ParseFromString(msg.value()) except DecodeError as e: print(f"Error parsing message: {e}") print(f"Raw message data: {msg.value()}") print("Received message: ", data) except KeyboardInterrupt: pass finally: consumer.close()
解析消息时出错
Saya cuba nyahpepijat tetapi tidak boleh.
proton
untuk menjana fail pb2. Terima kasih atas bantuan anda.
Terima kasih
Saya boleh mendapatkan mesej dalam format asalnya:
原始消息数据:b'x00x00x00x00x02x02x08n75100a-1a47-48b2-93b7-b7a331be59b4x12tcompleted'
print(" Decode 1: ", dict_str) print("Decode 2: ", ast.literal_eval(dict_str))
Output kod di atas:
Unparsed Message: b'\x00\x00\x00\x00\x02\x02\x08\n$ccb0ad7e-abb2-4af6-90d1-187381f9d47e\x12\tcompleted' Decode 1: $ccb0ad7e-abb2-4af6-90d1-187381f9d47e completed Inner Exception Here source code string cannot contain null bytes
Pelanggan Go anda menggunakan pendaftaran skema untuk bersiri, yang bermaksud kod Python anda mesti melakukan perkara yang sama. Rekod ini "bukan hanya Protobuf" kerana ID skema juga dikodkan dalam bait, jadi penghurai Protobuf biasa akan gagal.
Repositori mempunyai kod sampel untuk menggunakan Protobuf dengan penyepaduan pendaftaran
https://github.com/confluenceinc /confluence-kafka-python/blob/master/examples/protobuf_consumer.py
Atas ialah kandungan terperinci Ralat penyahserialisasian/penghuraian KafkaProtobuf Python. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!