Maison  >  Article  >  développement back-end  >  Comment récupérer le décalage de message actuel d'un groupe de consommateurs dans Golang Kafka 10 à l'aide de Sarama ?

Comment récupérer le décalage de message actuel d'un groupe de consommateurs dans Golang Kafka 10 à l'aide de Sarama ?

Susan Sarandon
Susan Sarandonoriginal
2024-10-25 11:29:30309parcourir

How to Retrieve the Current Message Offset of a Consumer Group in Golang Kafka 10 Using Sarama?

Obtenir des compensations de groupes de consommateurs avec Golang Kafka 10

Traditionnellement, des bibliothèques externes étaient utilisées pour gérer les capacités des groupes de consommateurs dans Golang avec Kafka. Cependant, Kafka 10 propose désormais nativement une telle fonctionnalité. Cela soulève la question : comment pouvons-nous récupérer le décalage de message actuel traité par un groupe de consommateurs à l'aide de la bibliothèque Golang Kafka (sarama) ?

Auparavant, kazoo-go était utilisé pour récupérer les décalages de message de groupe de Zookeeper. Avec l'introduction de sarama-cluster, une approche alternative est nécessaire.

Solution

L'extrait de code suivant montre comment obtenir la compensation du groupe de consommateurs :

<code class="go">package main

import (
    "context"
    "log"
    "strings"

    "github.com/Shopify/sarama"
)

func main() {
    groupName := "testgrp"
    topic := "topic_name"
    offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
}

type gcInfo struct {
    offset int64
}

func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    g.offset = claim.InitialOffset()
    return nil
}

func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = false  // we don't want to change consumer group offsets
    client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
    if err != nil {
        return 0, err
    }
    info := gcInfo{}
    if err := client.Consume(ctx, []string{topic}, &info); err != nil {
        return 0, err
    }
    return info.offset, nil
}</code>

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn