Maison >développement back-end >Golang >Comment récupérer les compensations de groupes de consommateurs dans Golang Kafka 10 à l'aide de « sarama-cluster » ?

Comment récupérer les compensations de groupes de consommateurs dans Golang Kafka 10 à l'aide de « sarama-cluster » ?

Patricia Arquette
Patricia Arquetteoriginal
2024-10-26 02:22:27660parcourir

How to Retrieve Consumer Group Offsets in Golang Kafka 10 Using `sarama-cluster`?

Comment récupérer les compensations de groupes de consommateurs dans Golang Kafka 10

Auparavant, des bibliothèques externes telles que kazoo-go étaient utilisées pour récupérer le message d'un groupe de consommateurs compensations stockées dans Zookeeper. Cependant, avec l'introduction de la fonctionnalité de groupe de consommateurs dans la bibliothèque Golang Kafka (sarama) dans la version 10, il est possible d'accéder directement à ces compensations.

Utilisation de sarama-cluster

Pour obtenir les décalages de groupe de consommateurs à l'aide de la bibliothèque sarama-cluster :

<code class="go">import (
    "context"
    "log"
    "strings"

    "github.com/Shopify/sarama"
)

func main() {
    groupName := "testgrp"
    topic := "topic_name"
    offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
}

type gcInfo struct {
    offset int64
}

func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    g.offset = claim.InitialOffset()
    return nil
}

func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = false // we're not going to update the consumer group offsets
    client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
    if err != nil {
        return 0, err
    }
    info := gcInfo{}
    if err := client.Consume(ctx, []string{topic}, &info); err != nil {
        return 0, err
    }
    return info.offset, nil
}</code>

Ce code crée une nouvelle instance de ConsumerGroup et récupère le décalage actuel en consommant un message du sujet spécifié à l'aide d'un gestionnaire vide (consumeClaim). Le décalage initial de la réclamation est ensuite enregistré dans la structure gcInfo.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn