Home  >  Article  >  Backend Development  >  How to Retrieve the Current Message Offset of a Consumer Group in Golang Kafka 10 Using Sarama?

How to Retrieve the Current Message Offset of a Consumer Group in Golang Kafka 10 Using Sarama?

Susan Sarandon
Susan SarandonOriginal
2024-10-25 11:29:30309browse

How to Retrieve the Current Message Offset of a Consumer Group in Golang Kafka 10 Using Sarama?

Getting Consumer Group Offsets with Golang Kafka 10

Traditionally, external libraries were employed to manage consumer group capabilities in Golang with Kafka. However, Kafka 10 now natively provides such functionality. This raises the question: how can we retrieve the current message offset processed by a consumer group using the Golang Kafka library (sarama)?

Previously, kazoo-go was utilized to retrieve group message offsets from Zookeeper. With the introduction of sarama-cluster, an alternative approach is required.

Solution

The following code snippet demonstrates how to obtain the consumer group offset:

<code class="go">package main

import (
    "context"
    "log"
    "strings"

    "github.com/Shopify/sarama"
)

func main() {
    groupName := "testgrp"
    topic := "topic_name"
    offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
}

type gcInfo struct {
    offset int64
}

func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    g.offset = claim.InitialOffset()
    return nil
}

func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = false  // we don't want to change consumer group offsets
    client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
    if err != nil {
        return 0, err
    }
    info := gcInfo{}
    if err := client.Consume(ctx, []string{topic}, &info); err != nil {
        return 0, err
    }
    return info.offset, nil
}</code>

The above is the detailed content of How to Retrieve the Current Message Offset of a Consumer Group in Golang Kafka 10 Using Sarama?. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn