如果您需要了解 Kafka 的基礎知識,例如它的主要功能、組件和優勢,我在這裡有一篇文章對此進行了介紹。請查看它並按照步驟操作,直到使用 Docker 完成 Kafka 安裝,然後繼續以下部分。
與連接Kafka 和NodeJS 文章中的示例類似,此源代碼也包含兩部分:初始化生產者 發送訊息到Kafka並使用消費者訂閱來自消費者訂閱來自
主題 >
。
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )我會將程式碼分解成更小的部分以便更好地理解。首先,讓我們定義變數值。
- 這裡,套件 github.com/confluenceinc/confluence-kafka-go/kafka
用來連接到Kafka。 - 經紀人
是主機位址;如果您使用ZooKeeper,請相應地替換主機位址。 - groupId
和topic
可以根據需要更改。
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }接下來是初始化生產者。
上面的程式碼用來傳送一組訊息{"message 1", "message 2", "message 3"} 到一個主題並使用go-routine 使用
for e := range p.Events() 迭代事件並列印出交付結果,無論是成功或失敗。
下一步是建立一個消費者來訂閱主題
並接收
func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": groupId, "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.Subscribe(topic, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }。
最後,由於這是一個簡單的範例,因此呼叫函數建立生產者和消費者以供使用。在現實場景中,生產者和消費者
的部署通常在
func main() { startProducer() startConsumer() }系統中的兩個不同伺服器上完成。
編碼愉快!
如果您覺得本文內容有幫助,請訪問我部落格上的原文,支持作者,探索更多有趣的內容。
以上是連接 Kafka 和 Golang的詳細內容。更多資訊請關注PHP中文網其他相關文章!