Maison >développement back-end >Golang >Erreurs de désérialisation/analyse KafkaProtobuf Python
L'éditeur PHP Xinyi vous présente une erreur courante : l'erreur de désérialisation/analyse KafkaProtobuf Python. Lorsque vous utilisez la bibliothèque Python KafkaProtobuf, vous pouvez rencontrer des problèmes de désérialisation ou d'erreurs d'analyse. Cela peut être dû à une inadéquation entre le format de sérialisation du message et le code du consommateur, ou à un message mal formé. Les solutions à ce problème incluent la vérification de la compatibilité du format de sérialisation du message et du code consommateur, ainsi que la garantie que le message est bien formaté. Dans cet article, nous détaillerons la cause et la solution de ce problème, dans l’espoir de vous aider à résoudre des erreurs similaires.
func NewProducer(kafkaBrokerURL string, kafkaSchemaRegistryUrl string) { producerConfig := getKafkaProducerConfig(config.EnvConfig) producer, err := confluent_kafka.NewProducer(producerConfig) if err != nil { log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Producer") log.Panicf("Unable to create Kafka Producer") } client, err := schemaregistry.NewClient(schemaregistry.NewConfig(kafkaSchemaRegistryUrl)) if err != nil { log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Client") log.Panicf("Unable to create Kafka Client") } serializer, err := protobuf.NewSerializer(client, serde.ValueSerde, protobuf.NewSerializerConfig()) if err != nil { log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Serializer") log.Panicf("Unable to create Kafka Serializer") } KafkaProducerInstance = &KafkaProducer{ producer: producer, serializer: serializer, } log.Info("Created Kafka Producer and Serializer") }
func producerHelper[kdt KafkaMesageDataTypes](message kdt, topicName string) { deliveryChan := make(chan confluent_kafka.Event) payload, err := KafkaProducerInstance.serializer.Serialize(topicName, &message) if err != nil { log.Errorf("Failed to serialize payload: %v\n", err) close(deliveryChan) return } err = KafkaProducerInstance.producer.Produce(&confluent_kafka.Message{ TopicPartition: confluent_kafka.TopicPartition{Topic: &topicName, Partition: confluent_kafka.PartitionAny}, Value: payload, }, deliveryChan) if err != nil { log.Errorf("Failed to Produce: %v\n", err) close(deliveryChan) return } e := <-deliveryChan m := e.(*confluent_kafka.Message) if m.TopicPartition.Error != nil { log.Errorf("Delivery failed: %v\n", m.TopicPartition.Error) close(deliveryChan) return } else { log.Infof("Delivered message to topic %s [%d] at offset %v\n", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) } close(deliveryChan) }
from confluent_kafka import Consumer, KafkaError import KafkaDiagnoseResult_pb2 # replace with your generated module name from google.protobuf.message import DecodeError # Kafka consumer configuration conf = { 'bootstrap.servers': "localhost:9092/v3/", # Replace with your Kafka server address 'group.id': "myGroup", 'auto.offset.reset': 'earliest' } # Create a consumer instance consumer = Consumer(conf) # Subscribe to a topic from confluent_kafka import Consumer, KafkaError import KafkaDiagnoseResult_pb2 from google.protobuf.message import DecodeError # Kafka consumer configuration conf = { 'bootstrap.servers': "localhost:9092/v3/", 'group.id': "myGroup", 'auto.offset.reset': 'earliest' } # Create a consumer instance consumer = Consumer(conf) # Subscribe to a topic consumer.subscribe(['diagnosis']) try: while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event continue else: print(msg.error()) break # Deserialize the message try: data = KafkaDiagnoseResult_pb2.KafkaDiagnoseRequest() data.ParseFromString(msg.value()) except DecodeError as e: print(f"Error parsing message: {e}") print(f"Raw message data: {msg.value()}") print("Received message: ", data) except KeyboardInterrupt: pass finally: consumer.close()
解析消息时出错
J'essaie de le déboguer mais je n'y parviens pas.
proton
pour générer des fichiers pb2. Merci pour votre aide.
Merci
Je peux recevoir le message dans son format original :
原始消息数据:b'x00x00x00x00x02x02x08n75100a-1a47-48b2-93b7-b7a331be59b4x12tcompleted'
print(" Decode 1: ", dict_str) print("Decode 2: ", ast.literal_eval(dict_str))
Sortie du code ci-dessus :
Unparsed Message: b'\x00\x00\x00\x00\x02\x02\x08\n$ccb0ad7e-abb2-4af6-90d1-187381f9d47e\x12\tcompleted' Decode 1: $ccb0ad7e-abb2-4af6-90d1-187381f9d47e completed Inner Exception Here source code string cannot contain null bytes
Votre client Go utilise le registre de schémas pour la sérialisation, ce qui signifie que votre code Python doit faire de même. Ces enregistrements ne sont "pas seulement Protobuf" car l'ID de schéma est également codé en octets, donc l'analyseur Protobuf standard échouera.
Le référentiel contient un exemple de code pour utiliser Protobuf avec l'intégration du registre
https://github.com/confluenceinc/confluence-kafka-python/blob/master/examples/protobuf_consumer.py
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!