Rumah  >  Artikel  >  pembangunan bahagian belakang  >  Ralat penyahserialisasian/penghuraian KafkaProtobuf Python

Ralat penyahserialisasian/penghuraian KafkaProtobuf Python

PHPz
PHPzke hadapan
2024-02-09 23:48:091097semak imbas

反序列化/解析错误 KafkaProtobuf Python

Editor PHP Xinyi memperkenalkan anda kepada ralat biasa: ralat penyahserilan/penghuraian KafkaProtobuf Python. Apabila menggunakan perpustakaan Python KafkaProtobuf, anda mungkin menghadapi masalah dengan ralat penyahserikatan atau penghuraian. Ini mungkin disebabkan oleh ketidakpadanan antara format bersiri mesej dan kod pengguna, atau kerana mesej itu salah bentuk. Penyelesaian kepada masalah ini termasuk menyemak format bersiri mesej dan kod pengguna untuk keserasian, serta memastikan mesej diformat dengan baik. Dalam artikel ini, kami akan memperincikan punca dan penyelesaian masalah ini, dengan harapan dapat membantu anda menyelesaikan ralat yang serupa.

Kandungan soalan

Kod siri (Bahasa Go)

1. Penerbit

func NewProducer(kafkaBrokerURL string, kafkaSchemaRegistryUrl string) {
    producerConfig := getKafkaProducerConfig(config.EnvConfig)

    producer, err := confluent_kafka.NewProducer(producerConfig)
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Producer")
        log.Panicf("Unable to create Kafka Producer")
    }

    client, err := schemaregistry.NewClient(schemaregistry.NewConfig(kafkaSchemaRegistryUrl))
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Client")
        log.Panicf("Unable to create Kafka Client")
    }

    serializer, err := protobuf.NewSerializer(client, serde.ValueSerde, protobuf.NewSerializerConfig())
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Serializer")
        log.Panicf("Unable to create Kafka Serializer")
    }

    KafkaProducerInstance = &KafkaProducer{
        producer:   producer,
        serializer: serializer,
    }

    log.Info("Created Kafka Producer and Serializer")
}

2.Hantar mesej Kafka

func producerHelper[kdt KafkaMesageDataTypes](message kdt, topicName string) {
    deliveryChan := make(chan confluent_kafka.Event)
    payload, err := KafkaProducerInstance.serializer.Serialize(topicName, &message)
    if err != nil {
        log.Errorf("Failed to serialize payload: %v\n", err)
        close(deliveryChan)
        return
    }

    err = KafkaProducerInstance.producer.Produce(&confluent_kafka.Message{
        TopicPartition: confluent_kafka.TopicPartition{Topic: &topicName, Partition: confluent_kafka.PartitionAny},
        Value:          payload,
    }, deliveryChan)

    if err != nil {
        log.Errorf("Failed to Produce: %v\n", err)
        close(deliveryChan)
        return
    }

    e := <-deliveryChan
    m := e.(*confluent_kafka.Message)

    if m.TopicPartition.Error != nil {
        log.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
        close(deliveryChan)
        return
    } else {
        log.Infof("Delivered message to topic %s [%d] at offset %v\n",
            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
    }

    close(deliveryChan)
}

Cuba gunakan Messages (Diff, aplikasi dalam Python)

from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2  # replace with your generated module name
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",  # Replace with your Kafka server address
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
consumer.subscribe(['diagnosis']) 
try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                continue
            else:
                print(msg.error())
                break

        # Deserialize the message
        try:
            data = KafkaDiagnoseResult_pb2.KafkaDiagnoseRequest() 
            data.ParseFromString(msg.value())
        except DecodeError as e:
            print(f"Error parsing message: {e}")
            print(f"Raw message data: {msg.value()}")

        print("Received message: ", data)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Ralat

解析消息时出错

Saya cuba nyahpepijat tetapi tidak boleh.

  1. Fail proto dalam kedua-dua aplikasi adalah sama
  2. Saya menggunakan proton untuk menjana fail pb2.

Terima kasih atas bantuan anda.

Terima kasih

Saya boleh mendapatkan mesej dalam format asalnya:

Mesej format asal.

原始消息数据:b'x00x00x00x00x02x02x08n75100a-1a47-48b2-93b7-b7a331be59b4x12tcompleted'

  • Saya cuba menyahkodnya menggunakan UTF-8 tetapi ia gagal kerana tidak semua medan dibaca.
print(" Decode 1: ", dict_str)
   print("Decode 2: ", ast.literal_eval(dict_str))

Output kod di atas:

Unparsed Message:  b'\x00\x00\x00\x00\x02\x02\x08\n$ccb0ad7e-abb2-4af6-90d1-187381f9d47e\x12\tcompleted'
 Decode 1:  
$ccb0ad7e-abb2-4af6-90d1-187381f9d47e   completed
Inner Exception Here source code string cannot contain null bytes

Penyelesaian

Pelanggan Go anda menggunakan pendaftaran skema untuk bersiri, yang bermaksud kod Python anda mesti melakukan perkara yang sama. Rekod ini "bukan hanya Protobuf" kerana ID skema juga dikodkan dalam bait, jadi penghurai Protobuf biasa akan gagal.

Repositori mempunyai kod sampel untuk menggunakan Protobuf dengan penyepaduan pendaftaran

https://github.com/confluenceinc /confluence-kafka-python/blob/master/examples/protobuf_consumer.py

Atas ialah kandungan terperinci Ralat penyahserialisasian/penghuraian KafkaProtobuf Python. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:stackoverflow.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam