如果您需要了解 Kafka 的基础知识,例如它的主要功能、组件和优势,我在这里有一篇文章对此进行了介绍。请查看它并按照步骤操作,直到使用 Docker 完成 Kafka 安装,然后继续以下部分。
与连接 Kafka 和 NodeJS 文章中的示例类似,此源代码也包含两部分:初始化 生产者 发送消息到Kafka并使用消费者订阅来自主题。
我会将代码分解成更小的部分以便更好地理解。首先,让我们定义变量值。
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )- 这里,包
github.com/confluenceinc/confluence-kafka-go/kafka 用于连接到 Kafka。
-经纪人是主机地址;如果您使用ZooKeeper,请相应地替换主机地址。
-groupId 和 topic 可以根据需要更改。
接下来是初始化生产者。
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }上面的代码用于发送一组消息
{"message 1", "message 2", "message 3"} 到一个主题并使用 go-routine 使用 for e := range p.Events() 迭代事件并打印出交付结果,无论是成功或失败。
下一步是创建一个消费者来订阅主题并接收消息。
func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": groupId, "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.Subscribe(topic, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }最后,由于这是一个简单的示例,因此调用函数创建
生产者和消费者以供使用。在现实场景中,生产者和消费者的部署通常在微服务系统中的两个不同服务器上完成。
func main() { startProducer() startConsumer() }
编码愉快!
如果您觉得本文内容有帮助,请访问我博客上的原文,支持作者,探索更多有趣的内容。
以上是连接 Kafka 和 Golang的详细内容。更多信息请关注PHP中文网其他相关文章!