ホームページ >バックエンド開発 >Golang >Kafka と Golang を接続する

Kafka と Golang を接続する

WBOY
WBOYオリジナル
2024-09-06 22:30:32565ブラウズ

導入

Kafka の主な機能、コンポーネント、利点などの基本を知る必要がある場合は、ここで説明した記事を参照してください。 Docker を使用して Kafka のインストールが完了するまで内容を確認し、次のセクションに進んでください。

Connect Kafka with Golang

Golang を使用して Kafka に接続する

KafkaNodeJS の接続に関する記事の例と同様に、このソース コードには、プロデューサー メッセージKafkaに送信し、コンシューマを使用してトピック. 理解を助けるために、コードを小さな部分に分割します。まず、変数の値を定義しましょう。


- ここでは、パッケージ
package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)
github.com/confluentinc/confluent-kafka-go/kafka

を使用して Kafka に接続します。 -

ブローカー

はホスト アドレスです。 ZooKeeper を使用している場合は、ホスト アドレスを適宜置き換えてください。 -

groupId

topic は、必要に応じて変更できます。 次はプロデューサーを初期化します。


上記のコードは、メッセージの配列
func startProducer() {
  p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
  if err != nil {
    panic(err)
  }

  go func() {
    for e := range p.Events() {
      switch ev := e.(type) {
      case *kafka.Message:
        if ev.TopicPartition.Error != nil {
          fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
        } else {
          fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
        }
      }
    }
  }()

  for _, word := range []string{"message 1", "message 2", "message 3"} {
    p.Produce(&kafka.Message{
      TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
      Value:          []byte(word),
    }, nil)
  }
}

{"message 1", "message 2", "message 3"} をトピックに送信するために使用され、go-routine は、for e := range p.Events() を使用してイベントを反復処理し、配信結果を出力します。成功か失敗か。 次に、

トピック

サブスクライブし、メッセージを受信するコンシューマを作成します。

最後に、これは簡単な例なので、使用する
func startConsumer() {
  c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": broker,
    "group.id":          groupId,
    "auto.offset.reset": "earliest",
  })

  if err != nil {
    panic(err)
  }
  c.Subscribe(topic, nil)

  for {
    msg, err := c.ReadMessage(-1)
    if err == nil {
      fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    } else {
      fmt.Printf("Consumer error: %v (%v)\n", err, msg)
      break
    }
  }

  c.Close()
}
プロデューサー

コンシューマー を作成する関数を呼び出します。実際のシナリオでは、プロデューサーコンシューマー のデプロイメントは通常、マイクロサービス システム内の 2 つの異なるサーバーで行われます。

func main() {
  startProducer()
  startConsumer()
}

Connect Kafka with Golang

コーディングを楽しんでください!


このコンテンツが役立つと思われた場合は、著者をサポートし、より興味深いコンテンツを探索するために、私のブログの元の記事にアクセスしてください。

Connect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with Golang Connect Kafka with Golang

興味深いと思われるシリーズ:

NodeJS
  • 反応する
  • ドッカー
  • Kubernetes

以上がKafka と Golang を接続するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。