php小编小新在这篇文章中将为大家介绍Kafka架构注册表中的一个重要概念:代理。在Kafka中,代理是一种核心组件,负责管理和处理消息流。然而,代理无法验证记录,这意味着一旦记录被写入代理,就无法再进行验证和更改。这个特性可能会对一些特定的使用场景和安全性产生影响,因此在使用Kafka时需要注意这一点。接下来,我们将详细解析代理无法验证记录的原因以及可能引发的问题。
我正在使用 Kafka 架构注册表验证架构。问题是,即使我输入了正确的架构,我仍然收到错误 Broker: Broker failed to verify record.
confluence.value.schema.validation 设置为 true,以便可以在当前代理级别检查值的架构。
我设置的架构和我发送的数据如下。
{ "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": false, "description": "Sample schema to help you get started.", "properties": { "test": { "type": "string" } }, "title": "schema_test", "type": "object" }
{"test": "test1"}
另外,我使用go发送数据,数据的代码如下。
// main
func main() {
kafka.ProduceData("schema_test", []byte(`{"test": "test1"}`))
}
<code>// kafka func ProduceData(topic string, data []byte) { conf := ReadConfig() p, err := kafka.NewProducer(&conf) if err != nil { fmt.Printf("Failed to create producer: %s", err) os.Exit(1) } defer p.Close() // Go-routine to handle message delivery reports and // possibly other event types (errors, stats, etc) go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition) } else { fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n", *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value)) } } } }() p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: data, }, nil) // Wait for all messages to be delivered p.Flush(15 * 1000) } </code>
对于经纪人如何实际验证数据似乎存在误解。它按预期工作。您需要一个架构 ID。您只是发送有关主题的纯 JSON,没有 ID。注册表上的架构并不重要,重要的是它的 ID。
来自文档
更具体地说,您添加到注册表的模式只是主题上可能存在的许多“版本”之一(例如 topic-value
)。每个版本都有一个唯一的 ID。验证不仅仅使用最新版本;该 ID 在客户端进行编码。
请参阅使用 JSON 模式进行生成的 Confluence 示例(它本身应该进行记录验证)。
代理端验证只是为了防止错误序列化的数据或“毒丸”,正如您现在所做的那样。
以上是Kafka 架构注册表 - 代理:代理无法验证记录的详细内容。更多信息请关注PHP中文网其他相关文章!