ホームページ >バックエンド開発 >Golang >Golang と Kafka を一緒に使用する方法

Golang と Kafka を一緒に使用する方法

PHPz
PHPzオリジナル
2023-04-13 18:34:171409ブラウズ

Kafka は、ビッグ データ アプリケーションでリアルタイム データ ストリーム処理アプリケーションを構築するためによく使用されるオープン ソースの分散メッセージ キューです。 Golang は Google によって開発されたプログラミング言語であり、その効率的な同時実行性、強力なライブラリとエコシステムで知られています。では、Golang を使用して Kafka と組み合わせるにはどうすればよいでしょうか?

まず、github.com/Shopify/sarama パッケージをインポートする必要があります。これは、Kafka をサポートする Golang クライアント ライブラリです。インストール プロセス中に、次のコマンドを実行する必要があります:

go get github.com/Shopify/sarama

次に、プロデューサーを作成する必要があります。まず、構成を作成します。

config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true

ここでは、すべての ACK を待機し、最大 5 回まで再試行し、成功後にプロデューサーに成功メッセージを返すようにプロデューサーを設定します。

次に、プロデューサー インスタンスを作成する必要があります:

producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
    panic(err)
}
defer producer.Close()

Kafka に接続するサービス エンドポイントとして Kafka ブローカー アドレスを指定する必要があります。ここではローカルの Kafka サーバーに接続しています。また、.Close() メソッドを呼び出して、プロデューサの終了時にクリーンアップを確実に行います。

これで、Kafka トピックへのメッセージのパブリッシュを開始する準備が整いました:

msg := &sarama.ProducerMessage{
    Topic: "test",
    Value: sarama.StringEncoder("Hello World!"),
}

part, offset, err := producer.SendMessage(msg)
if err != nil {
    fmt.Printf("Error publishing message: %v", err)
} else {
    fmt.Printf("Message published successfully. Partition: %v, Offset: %v\n", part, offset)
}

この例では、「test」という名前のトピックにメッセージをパブリッシュします。エラーがない場合は、正常にパブリッシュされたパーティションとオフセットが出力されます。

これで、Kafka にメッセージを発行するプロデューサーが作成されました。次に、コンシューマを作成する方法を見てみましょう。

最初に、コンシューマ設定を作成する必要があります:

config := sarama.NewConfig()
config.Consumer.Return.Errors = true

ここで受信エラーを設定します。

次に、コンシューマ インスタンスを作成する必要があります:

consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
    panic(err)
}
defer consumer.Close()

ここでは、Kafka ブローカー アドレスも指定します。また、コンシューマーが終了時に確実にクリーンアップするように、.Close() メソッドを呼び出す必要があります。

これで、Kafka トピックからメッセージを読み取る準備が整いました。

partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
if err != nil {
    panic(err)
}
defer partitionConsumer.Close()

for {
    select {
    case msg := <-partitionConsumer.Messages():
        fmt.Printf("Received message from partition %d with offset %d: %s = %s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
    case err := <-partitionConsumer.Errors():
        fmt.Println("Error: ", err.Error())
    }
}

この例では、「test」という名前のトピックをサブスクライブします。次に、最初のパーティションのオフセットを読み取ります。次に、ループ内でそのパーティションからメッセージを無限に読み取ります。ループ内の select ステートメントは、常にメッセージ チャネルとエラー チャネルを監視し、それぞれを出力します。

ここまで、Golang と Kafka を組み合わせて使用​​する方法を紹介しました。この簡単な例で、Golang と Kafka の基本的な使い方をマスターしたはずです。

以上がGolang と Kafka を一緒に使用する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。