Heim >Backend-Entwicklung >Golang >Wie rufe ich Verbrauchergruppen-Offsets in Golang Kafka 10 mithilfe von „sarama-cluster' ab?

Wie rufe ich Verbrauchergruppen-Offsets in Golang Kafka 10 mithilfe von „sarama-cluster' ab?

Patricia Arquette
Patricia ArquetteOriginal
2024-10-26 02:22:27671Durchsuche

How to Retrieve Consumer Group Offsets in Golang Kafka 10 Using `sarama-cluster`?

So rufen Sie Verbrauchergruppen-Offsets in Golang Kafka 10 ab

Zuvor wurden externe Bibliotheken wie kazoo-go zum Abrufen von Verbrauchergruppennachrichten verwendet in Zookeeper gespeicherte Offsets. Mit der Einführung der Verbrauchergruppenfunktion in der Golang Kafka-Bibliothek (sarama) in Version 10 ist es jedoch möglich, direkt auf diese Offsets zuzugreifen.

Verwendung von sarama-cluster

So erhalten Sie Consumer-Gruppen-Offsets mithilfe der sarama-cluster-Bibliothek:

<code class="go">import (
    "context"
    "log"
    "strings"

    "github.com/Shopify/sarama"
)

func main() {
    groupName := "testgrp"
    topic := "topic_name"
    offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
}

type gcInfo struct {
    offset int64
}

func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    g.offset = claim.InitialOffset()
    return nil
}

func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = false // we're not going to update the consumer group offsets
    client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
    if err != nil {
        return 0, err
    }
    info := gcInfo{}
    if err := client.Consume(ctx, []string{topic}, &info); err != nil {
        return 0, err
    }
    return info.offset, nil
}</code>

Dieser Code erstellt eine neue ConsumerGroup-Instanz und ruft den aktuellen Offset ab, indem er eine Nachricht aus dem angegebenen Thema mithilfe eines leeren Handlers (consumeClaim) konsumiert. Der anfängliche Ausgleich des Anspruchs wird dann in der gcInfo-Struktur erfasst.

Das obige ist der detaillierte Inhalt vonWie rufe ich Verbrauchergruppen-Offsets in Golang Kafka 10 mithilfe von „sarama-cluster' ab?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn