Golang Kafka 10 でコンシューマ グループ オフセットを取得する方法
以前は、kazoo-go などの外部ライブラリを利用してコンシューマ グループ メッセージを取得していましたZookeeper に保存されているオフセット。ただし、バージョン 10 の Golang Kafka ライブラリ (sarama) にコンシューマ グループ機能が導入されたことで、これらのオフセットに直接アクセスできるようになりました。
sarama-cluster の使用
sarama-cluster ライブラリを使用してコンシューマ グループ オフセットを取得するには:
<code class="go">import ( "context" "log" "strings" "github.com/Shopify/sarama" ) func main() { groupName := "testgrp" topic := "topic_name" offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic) if err != nil { log.Fatal(err) } log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset) } type gcInfo struct { offset int64 } func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { g.offset = claim.InitialOffset() return nil } func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) { config := sarama.NewConfig() config.Consumer.Offsets.AutoCommit.Enable = false // we're not going to update the consumer group offsets client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config) if err != nil { return 0, err } info := gcInfo{} if err := client.Consume(ctx, []string{topic}, &info); err != nil { return 0, err } return info.offset, nil }</code>
このコードは、新しい ConsumerGroup インスタンスを作成し、空のハンドラ (consumeClaim) を使用して指定されたトピックからメッセージを消費することで現在のオフセットを取得します。その後、クレームの初期オフセットが gcInfo 構造体に記録されます。
以上がGolang Kafka 10 で「sarama-cluster」を使用してコンシューマ グループ オフセットを取得する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。