Kafka は、ビッグ データ アプリケーションでリアルタイム データ ストリーム処理アプリケーションを構築するためによく使用されるオープン ソースの分散メッセージ キューです。 Golang は Google によって開発されたプログラミング言語であり、その効率的な同時実行性、強力なライブラリとエコシステムで知られています。では、Golang を使用して Kafka と組み合わせるにはどうすればよいでしょうか?
まず、github.com/Shopify/sarama パッケージをインポートする必要があります。これは、Kafka をサポートする Golang クライアント ライブラリです。インストール プロセス中に、次のコマンドを実行する必要があります:
go get github.com/Shopify/sarama
次に、プロデューサーを作成する必要があります。まず、構成を作成します。
config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 5 config.Producer.Return.Successes = true
ここでは、すべての ACK を待機し、最大 5 回まで再試行し、成功後にプロデューサーに成功メッセージを返すようにプロデューサーを設定します。
次に、プロデューサー インスタンスを作成する必要があります:
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(err) } defer producer.Close()
Kafka に接続するサービス エンドポイントとして Kafka ブローカー アドレスを指定する必要があります。ここではローカルの Kafka サーバーに接続しています。また、.Close()
メソッドを呼び出して、プロデューサの終了時にクリーンアップを確実に行います。
これで、Kafka トピックへのメッセージのパブリッシュを開始する準備が整いました:
msg := &sarama.ProducerMessage{ Topic: "test", Value: sarama.StringEncoder("Hello World!"), } part, offset, err := producer.SendMessage(msg) if err != nil { fmt.Printf("Error publishing message: %v", err) } else { fmt.Printf("Message published successfully. Partition: %v, Offset: %v\n", part, offset) }
この例では、「test」という名前のトピックにメッセージをパブリッシュします。エラーがない場合は、正常にパブリッシュされたパーティションとオフセットが出力されます。
これで、Kafka にメッセージを発行するプロデューサーが作成されました。次に、コンシューマを作成する方法を見てみましょう。
最初に、コンシューマ設定を作成する必要があります:
config := sarama.NewConfig() config.Consumer.Return.Errors = true
ここで受信エラーを設定します。
次に、コンシューマ インスタンスを作成する必要があります:
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(err) } defer consumer.Close()
ここでは、Kafka ブローカー アドレスも指定します。また、コンシューマーが終了時に確実にクリーンアップするように、.Close()
メソッドを呼び出す必要があります。
これで、Kafka トピックからメッセージを読み取る準備が整いました。
partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest) if err != nil { panic(err) } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): fmt.Printf("Received message from partition %d with offset %d: %s = %s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) case err := <-partitionConsumer.Errors(): fmt.Println("Error: ", err.Error()) } }
この例では、「test」という名前のトピックをサブスクライブします。次に、最初のパーティションのオフセットを読み取ります。次に、ループ内でそのパーティションからメッセージを無限に読み取ります。ループ内の select
ステートメントは、常にメッセージ チャネルとエラー チャネルを監視し、それぞれを出力します。
ここまで、Golang と Kafka を組み合わせて使用する方法を紹介しました。この簡単な例で、Golang と Kafka の基本的な使い方をマスターしたはずです。
以上がGolang と Kafka を一緒に使用する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。