首頁 >後端開發 >Golang >連接 Kafka 和 Golang

連接 Kafka 和 Golang

WBOY
WBOY原創
2024-09-06 22:30:32557瀏覽

介紹

如果您需要了解 Kafka 的基礎知識,例如它的主要功能、組件和優勢,我在這裡有一篇文章對此進行了介紹。請查看它並按照步驟操作,直到使用 Docker 完成 Kafka 安裝,然後繼續以下部分。

Connect Kafka with Golang

使用 Golang 連線到 Kafka

與連接KafkaNodeJS 文章中的示例類似,此源代碼也包含兩部分:初始化生產者 發送訊息Kafka並使用消費者訂閱來自消費者訂閱來自

主題

>

package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)
我會將程式碼分解成更小的部分以便更好地理解。首先,讓我們定義變數值。

- 這裡,套件 github.com/confluenceinc/confluence-kafka-go/kafka

用來連接到

Kafka - 經紀人

是主機位址;如果您使用

ZooKeeper,請相應地替換主機位址。 - groupId

topic
可以根據需要更改。

func startProducer() {
  p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
  if err != nil {
    panic(err)
  }

  go func() {
    for e := range p.Events() {
      switch ev := e.(type) {
      case *kafka.Message:
        if ev.TopicPartition.Error != nil {
          fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
        } else {
          fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
        }
      }
    }
  }()

  for _, word := range []string{"message 1", "message 2", "message 3"} {
    p.Produce(&kafka.Message{
      TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
      Value:          []byte(word),
    }, nil)
  }
}
接下來是初始化生產者。

上面的程式碼用來傳送一組訊息{"message 1", "message 2", "message 3"} 到一個主題並使用go-routine 使用

for e := range p.Events()

迭代事件並列印出交付結果,無論是成功或失敗。 下一步是建立一個消費者訂閱主題
並接收

訊息
func startConsumer() {
  c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": broker,
    "group.id":          groupId,
    "auto.offset.reset": "earliest",
  })

  if err != nil {
    panic(err)
  }
  c.Subscribe(topic, nil)

  for {
    msg, err := c.ReadMessage(-1)
    if err == nil {
      fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    } else {
      fmt.Printf("Consumer error: %v (%v)\n", err, msg)
      break
    }
  }

  c.Close()
}

最後,由於這是一個簡單的範例,因此呼叫函數建立生產者消費者以供使用。在現實場景中,生產者消費者
的部署通常在

微服務
func main() {
  startProducer()
  startConsumer()
}
系統中的兩個不同伺服器上完成。

Connect Kafka with Golang

編碼愉快!

如果您覺得本文內容有幫助,請訪問我部落格上的原文,支持作者,探索更多有趣的內容。

Connect Kafka with GolangConnect Kafka with Golang Connect Kafka with GolangConnect Kafka with GolangConnect Kafka with Golang


  • 您可能會感興趣的一些系列:
  • NodeJS
  •  反應
  • Docker 
Kubernetes

以上是連接 Kafka 和 Golang的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn