使用 Golang Kafka 10 获取消费者组偏移
传统上,使用外部库来使用 Kafka 管理 Golang 中的消费者组功能。然而,Kafka 10 现在原生提供了这样的功能。这就提出了一个问题:我们如何使用 Golang Kafka 库(sarama)检索消费者组处理的当前消息偏移量?
之前,kazoo-go 用于从 Zookeeper 检索组消息偏移量。随着sarama-cluster的引入,需要另一种方法。
解决方案
以下代码片段演示了如何获取消费者组偏移量:
<code class="go">package main import ( "context" "log" "strings" "github.com/Shopify/sarama" ) func main() { groupName := "testgrp" topic := "topic_name" offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic) if err != nil { log.Fatal(err) } log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset) } type gcInfo struct { offset int64 } func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { g.offset = claim.InitialOffset() return nil } func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) { config := sarama.NewConfig() config.Consumer.Offsets.AutoCommit.Enable = false // we don't want to change consumer group offsets client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config) if err != nil { return 0, err } info := gcInfo{} if err := client.Consume(ctx, []string{topic}, &info); err != nil { return 0, err } return info.offset, nil }</code>
以上是如何使用 Sarama 检索 Golang Kafka 10 中消费者组的当前消息偏移量?的详细内容。更多信息请关注PHP中文网其他相关文章!