首页 >后端开发 >Golang >连接 Kafka 和 Golang

连接 Kafka 和 Golang

WBOY
WBOY原创
2024-09-06 22:30:32527浏览

介绍

如果您需要了解 Kafka 的基础知识,例如它的主要功能、组件和优势,我在这里有一篇文章对此进行了介绍。请查看它并按照步骤操作,直到使用 Docker 完成 Kafka 安装,然后继续以下部分。

Connect Kafka with Golang

使用 Golang 连接到 Kafka

与连接 KafkaNodeJS 文章中的示例类似,此源代码也包含两部分:初始化 生产者 发送消息Kafka并使用消费者订阅来自主题

我会将代码分解成更小的部分以便更好地理解。首先,让我们定义变量值。


package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)
- 这里,包

github.com/confluenceinc/confluence-kafka-go/kafka 用于连接到 Kafka

-

经纪人是主机地址;如果您使用ZooKeeper,请相应地替换主机地址。

-

groupIdtopic 可以根据需要更改。

接下来是初始化生产者。


func startProducer() {
  p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
  if err != nil {
    panic(err)
  }

  go func() {
    for e := range p.Events() {
      switch ev := e.(type) {
      case *kafka.Message:
        if ev.TopicPartition.Error != nil {
          fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
        } else {
          fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
        }
      }
    }
  }()

  for _, word := range []string{"message 1", "message 2", "message 3"} {
    p.Produce(&kafka.Message{
      TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
      Value:          []byte(word),
    }, nil)
  }
}
上面的代码用于发送一组消息

{"message 1", "message 2", "message 3"} 到一个主题并使用 go-routine 使用 for e := range p.Events() 迭代事件并打印出交付结果,无论是成功或失败。

下一步是创建一个

消费者订阅主题并接收消息

func startConsumer() {
  c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": broker,
    "group.id":          groupId,
    "auto.offset.reset": "earliest",
  })

  if err != nil {
    panic(err)
  }
  c.Subscribe(topic, nil)

  for {
    msg, err := c.ReadMessage(-1)
    if err == nil {
      fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    } else {
      fmt.Printf("Consumer error: %v (%v)\n", err, msg)
      break
    }
  }

  c.Close()
}
最后,由于这是一个简单的示例,因此调用函数创建

生产者消费者以供使用。在现实场景中,生产者消费者的部署通常在微服务系统中的两个不同服务器上完成。

func main() {
  startProducer()
  startConsumer()
}

Connect Kafka with Golang

编码愉快!


如果您觉得本文内容有帮助,请访问我博客上的原文,支持作者,探索更多有趣的内容。

Connect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with Golang


您可能会感兴趣的一些系列:

    NodeJS
  •  反应
  • Docker 
  • Kubernetes

以上是连接 Kafka 和 Golang的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn