Maison  >  Article  >  développement back-end  >  Erreurs de désérialisation/analyse KafkaProtobuf Python

Erreurs de désérialisation/analyse KafkaProtobuf Python

PHPz
PHPzavant
2024-02-09 23:48:091093parcourir

反序列化/解析错误 KafkaProtobuf Python

L'éditeur PHP Xinyi vous présente une erreur courante : l'erreur de désérialisation/analyse KafkaProtobuf Python. Lorsque vous utilisez la bibliothèque Python KafkaProtobuf, vous pouvez rencontrer des problèmes de désérialisation ou d'erreurs d'analyse. Cela peut être dû à une inadéquation entre le format de sérialisation du message et le code du consommateur, ou à un message mal formé. Les solutions à ce problème incluent la vérification de la compatibilité du format de sérialisation du message et du code consommateur, ainsi que la garantie que le message est bien formaté. Dans cet article, nous détaillerons la cause et la solution de ce problème, dans l’espoir de vous aider à résoudre des erreurs similaires.

Contenu de la question

Code de sérialisation (langue Go)

1. Producteur

func NewProducer(kafkaBrokerURL string, kafkaSchemaRegistryUrl string) {
    producerConfig := getKafkaProducerConfig(config.EnvConfig)

    producer, err := confluent_kafka.NewProducer(producerConfig)
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Producer")
        log.Panicf("Unable to create Kafka Producer")
    }

    client, err := schemaregistry.NewClient(schemaregistry.NewConfig(kafkaSchemaRegistryUrl))
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Client")
        log.Panicf("Unable to create Kafka Client")
    }

    serializer, err := protobuf.NewSerializer(client, serde.ValueSerde, protobuf.NewSerializerConfig())
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Serializer")
        log.Panicf("Unable to create Kafka Serializer")
    }

    KafkaProducerInstance = &KafkaProducer{
        producer:   producer,
        serializer: serializer,
    }

    log.Info("Created Kafka Producer and Serializer")
}

2.Envoyer un message à Kafka

func producerHelper[kdt KafkaMesageDataTypes](message kdt, topicName string) {
    deliveryChan := make(chan confluent_kafka.Event)
    payload, err := KafkaProducerInstance.serializer.Serialize(topicName, &message)
    if err != nil {
        log.Errorf("Failed to serialize payload: %v\n", err)
        close(deliveryChan)
        return
    }

    err = KafkaProducerInstance.producer.Produce(&confluent_kafka.Message{
        TopicPartition: confluent_kafka.TopicPartition{Topic: &topicName, Partition: confluent_kafka.PartitionAny},
        Value:          payload,
    }, deliveryChan)

    if err != nil {
        log.Errorf("Failed to Produce: %v\n", err)
        close(deliveryChan)
        return
    }

    e := <-deliveryChan
    m := e.(*confluent_kafka.Message)

    if m.TopicPartition.Error != nil {
        log.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
        close(deliveryChan)
        return
    } else {
        log.Infof("Delivered message to topic %s [%d] at offset %v\n",
            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
    }

    close(deliveryChan)
}

Essayez d'utiliser Messages (Diff, une application en Python)

from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2  # replace with your generated module name
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",  # Replace with your Kafka server address
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
consumer.subscribe(['diagnosis']) 
try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                continue
            else:
                print(msg.error())
                break

        # Deserialize the message
        try:
            data = KafkaDiagnoseResult_pb2.KafkaDiagnoseRequest() 
            data.ParseFromString(msg.value())
        except DecodeError as e:
            print(f"Error parsing message: {e}")
            print(f"Raw message data: {msg.value()}")

        print("Received message: ", data)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Erreur

解析消息时出错

J'essaie de le déboguer mais je n'y parviens pas.

  1. Les fichiers proto dans les deux applications sont les mêmes
  2. J'utilise proton pour générer des fichiers pb2.

Merci pour votre aide.

Merci

Je peux recevoir le message dans son format original :

Message au format original.

原始消息数据:b'x00x00x00x00x02x02x08n75100a-1a47-48b2-93b7-b7a331be59b4x12tcompleted'

  • J'ai essayé de le décoder en utilisant UTF-8 mais cela a échoué car tous les champs n'étaient pas lus.
print(" Decode 1: ", dict_str)
   print("Decode 2: ", ast.literal_eval(dict_str))

Sortie du code ci-dessus :

Unparsed Message:  b'\x00\x00\x00\x00\x02\x02\x08\n$ccb0ad7e-abb2-4af6-90d1-187381f9d47e\x12\tcompleted'
 Decode 1:  
$ccb0ad7e-abb2-4af6-90d1-187381f9d47e   completed
Inner Exception Here source code string cannot contain null bytes

Solution de contournement

Votre client Go utilise le registre de schémas pour la sérialisation, ce qui signifie que votre code Python doit faire de même. Ces enregistrements ne sont "pas seulement Protobuf" car l'ID de schéma est également codé en octets, donc l'analyseur Protobuf standard échouera.

Le référentiel contient un exemple de code pour utiliser Protobuf avec l'intégration du registre

https://github.com/confluenceinc/confluence-kafka-python/blob/master/examples/protobuf_consumer.py

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer