Heim >Backend-Entwicklung >Golang >Deserialisierungs-/Analysefehler KafkaProtobuf Python

Deserialisierungs-/Analysefehler KafkaProtobuf Python

PHPz
PHPznach vorne
2024-02-09 23:48:091161Durchsuche

反序列化/解析错误 KafkaProtobuf Python

PHP-Editor Xinyi stellt Ihnen einen häufigen Fehler vor: Deserialisierungs-/Analysefehler KafkaProtobuf Python. Bei Verwendung der KafkaProtobuf-Python-Bibliothek können Probleme mit der Deserialisierung oder Analysefehlern auftreten. Dies kann auf eine Nichtübereinstimmung zwischen dem Serialisierungsformat der Nachricht und dem Verbrauchercode zurückzuführen sein oder darauf, dass die Nachricht fehlerhaft ist. Zu den Lösungen hierfür gehören die Überprüfung der Kompatibilität des Serialisierungsformats und des Verbrauchercodes der Nachricht sowie die Sicherstellung, dass die Nachricht gut formatiert ist. In diesem Artikel werden wir die Ursache und Lösung dieses Problems detailliert beschreiben und hoffen, Ihnen bei der Lösung ähnlicher Fehler helfen zu können.

Frageninhalt

Serialisierungscode (Go-Sprache)

1. Produzent

func NewProducer(kafkaBrokerURL string, kafkaSchemaRegistryUrl string) {
    producerConfig := getKafkaProducerConfig(config.EnvConfig)

    producer, err := confluent_kafka.NewProducer(producerConfig)
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Producer")
        log.Panicf("Unable to create Kafka Producer")
    }

    client, err := schemaregistry.NewClient(schemaregistry.NewConfig(kafkaSchemaRegistryUrl))
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Client")
        log.Panicf("Unable to create Kafka Client")
    }

    serializer, err := protobuf.NewSerializer(client, serde.ValueSerde, protobuf.NewSerializerConfig())
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Serializer")
        log.Panicf("Unable to create Kafka Serializer")
    }

    KafkaProducerInstance = &KafkaProducer{
        producer:   producer,
        serializer: serializer,
    }

    log.Info("Created Kafka Producer and Serializer")
}

2. Kafka-Nachricht senden

func producerHelper[kdt KafkaMesageDataTypes](message kdt, topicName string) {
    deliveryChan := make(chan confluent_kafka.Event)
    payload, err := KafkaProducerInstance.serializer.Serialize(topicName, &message)
    if err != nil {
        log.Errorf("Failed to serialize payload: %v\n", err)
        close(deliveryChan)
        return
    }

    err = KafkaProducerInstance.producer.Produce(&confluent_kafka.Message{
        TopicPartition: confluent_kafka.TopicPartition{Topic: &topicName, Partition: confluent_kafka.PartitionAny},
        Value:          payload,
    }, deliveryChan)

    if err != nil {
        log.Errorf("Failed to Produce: %v\n", err)
        close(deliveryChan)
        return
    }

    e := <-deliveryChan
    m := e.(*confluent_kafka.Message)

    if m.TopicPartition.Error != nil {
        log.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
        close(deliveryChan)
        return
    } else {
        log.Infof("Delivered message to topic %s [%d] at offset %v\n",
            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
    }

    close(deliveryChan)
}

Versuchen Sie es mit Messages (Diff, eine App in Python)

from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2  # replace with your generated module name
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",  # Replace with your Kafka server address
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
consumer.subscribe(['diagnosis']) 
try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                continue
            else:
                print(msg.error())
                break

        # Deserialize the message
        try:
            data = KafkaDiagnoseResult_pb2.KafkaDiagnoseRequest() 
            data.ParseFromString(msg.value())
        except DecodeError as e:
            print(f"Error parsing message: {e}")
            print(f"Raw message data: {msg.value()}")

        print("Received message: ", data)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Fehler

解析消息时出错

Ich versuche, es zu debuggen, aber es geht nicht.

  1. Die Protodateien in beiden Anwendungen sind gleich
  2. Ich verwende proton, um pb2-Dateien zu generieren.

Danke für deine Hilfe.

Danke

Ich kann die Nachricht im Originalformat erhalten:

Nachricht im Originalformat.

原始消息数据:b'x00x00x00x00x02x02x08n75100a-1a47-48b2-93b7-b7a331be59b4x12tcompleted'

  • Ich habe versucht, es mit UTF-8 zu dekodieren, aber es ist fehlgeschlagen, weil nicht alle Felder gelesen wurden.
print(" Decode 1: ", dict_str)
   print("Decode 2: ", ast.literal_eval(dict_str))

Ausgabe des obigen Codes:

Unparsed Message:  b'\x00\x00\x00\x00\x02\x02\x08\n$ccb0ad7e-abb2-4af6-90d1-187381f9d47e\x12\tcompleted'
 Decode 1:  
$ccb0ad7e-abb2-4af6-90d1-187381f9d47e   completed
Inner Exception Here source code string cannot contain null bytes

Workaround

Ihr Go-Client verwendet die Schema-Registrierung für die Serialisierung, was bedeutet, dass Ihr Python-Code dasselbe tun muss. Diese Datensätze sind „nicht nur Protobuf“, da die Schema-ID auch in Bytes codiert ist, sodass der reguläre Protobuf-Parser fehlschlägt.

Das Repository enthält Beispielcode für die Verwendung von Protobuf mit Registrierungsintegration

https://github.com/confluenceinc /confluence-kafka-python/blob/master/examples/protobuf_consumer.py

Das obige ist der detaillierte Inhalt vonDeserialisierungs-/Analysefehler KafkaProtobuf Python. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:stackoverflow.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen