首页  >  文章  >  后端开发  >  如何使用“sarama-cluster”检索 Golang Kafka 10 中的消费者组偏移量?

如何使用“sarama-cluster”检索 Golang Kafka 10 中的消费者组偏移量?

Patricia Arquette
Patricia Arquette原创
2024-10-26 02:22:27520浏览

How to Retrieve Consumer Group Offsets in Golang Kafka 10 Using `sarama-cluster`?

如何在 Golang Kafka 10 中检索消费者组偏移量

之前,使用 kazoo-go 等外部库来检索消费者组消息偏移量存储在 Zookeeper 中。然而,随着 Golang Kafka 库(sarama)在版本 10 中引入消费者组功能,可以直接访问这些偏移量。

使用 sarama-cluster

使用 sarama-cluster 库获取消费者组偏移量:

<code class="go">import (
    "context"
    "log"
    "strings"

    "github.com/Shopify/sarama"
)

func main() {
    groupName := "testgrp"
    topic := "topic_name"
    offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
}

type gcInfo struct {
    offset int64
}

func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    g.offset = claim.InitialOffset()
    return nil
}

func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = false // we're not going to update the consumer group offsets
    client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
    if err != nil {
        return 0, err
    }
    info := gcInfo{}
    if err := client.Consume(ctx, []string{topic}, &info); err != nil {
        return 0, err
    }
    return info.offset, nil
}</code>

此代码创建一个新的 ConsumerGroup 实例,并通过使用空处理程序 (consumeClaim) 消费来自指定主题的消息来检索当前偏移量。然后,声明的初始偏移量将记录在 gcInfo 结构中。

以上是如何使用“sarama-cluster”检索 Golang Kafka 10 中的消费者组偏移量?的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn