Maison >développement back-end >Golang >Kafka Schema Registry - Courtier : le courtier ne peut pas valider l'enregistrement

Kafka Schema Registry - Courtier : le courtier ne peut pas valider l'enregistrement

王林
王林avant
2024-02-08 21:06:081187parcourir

Kafka 架构注册表 - 代理:代理无法验证记录

Dans cet article, l'éditeur PHP Xiaoxin vous présentera un concept important dans le registre de l'architecture Kafka : le proxy. Dans Kafka, le courtier est un composant essentiel responsable de la gestion et du traitement du flux de messages. Cependant, le proxy ne peut pas valider les enregistrements, ce qui signifie qu'une fois qu'un enregistrement est écrit sur le proxy, il ne peut pas être validé ou modifié. Cette fonctionnalité peut avoir un impact sur certains scénarios d'utilisation et sur la sécurité spécifiques, vous devez donc y prêter attention lorsque vous utilisez Kafka. Ensuite, nous expliquerons en détail pourquoi l'agent ne peut pas vérifier le dossier et les problèmes qui peuvent survenir.

Contenu de la question

Je valide le schéma à l'aide du registre de schémas Kafka. Le problème est que même si j'ai entré le bon schéma, j'obtiens toujours l'erreur Broker: Broker failed to verify record.

confluence.value.schema.validation est défini sur true afin que le schéma de la valeur puisse être vérifié au niveau actuel de l'agent.

Le schéma que j'ai mis en place et les données que j'ai envoyées sont les suivantes.

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "additionalProperties": false,
  "description": "Sample schema to help you get started.",
  "properties": {
    "test": {
      "type": "string"
    }
  },
  "title": "schema_test",
  "type": "object"
}
{"test": "test1"}

De plus, j'utilise go pour envoyer des données, et le code des données est le suivant.

// main

func main() {
    kafka.ProduceData("schema_test", []byte(`{"test": "test1"}`))
}
<code>// kafka
func ProduceData(topic string, data []byte) {

    conf := ReadConfig()

    p, err := kafka.NewProducer(&conf)

    if err != nil {
        fmt.Printf("Failed to create producer: %s", err)
        os.Exit(1)
    }

    defer p.Close()

    // Go-routine to handle message delivery reports and
    // possibly other event types (errors, stats, etc)
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n",
                        *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
                }
            }
        }
    }()

    p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          data,
    }, nil)

    // Wait for all messages to be delivered
    p.Flush(15 * 1000)
}
</code>

Réponse correcte


Il semble y avoir un malentendu sur la manière dont les courtiers vérifient réellement les données. Cela fonctionne comme prévu. Vous avez besoin d'un ID de schéma. Vous envoyez simplement du JSON simple sur le sujet, sans l'ID. Le schéma du registre n'a pas d'importance, seul son identifiant.

D'après la documentation

Plus précisément, le modèle que vous ajoutez au registre n'est qu'une des nombreuses "versions" qui peuvent exister sur le thème (par exemple topic-value). Chaque version possède un identifiant unique. L'authentification n'utilise pas uniquement la dernière version ; l'ID est codé côté client.

Voir l'exemple Confluence utilisant la génération de schéma JSON (qui devrait lui-même effectuer la validation des enregistrements).

https://github.com/confluenceinc/confluence-kafka-go/blob/master/examples/json_ Producer_example/json_ Producer_example.go

La validation côté courtier sert simplement à éviter les données mal sérialisées ou les « pilules empoisonnées » comme vous le faites actuellement.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer