Heim  >  Artikel  >  Backend-Entwicklung  >  Wie kann ich mit Sarama den aktuellen Nachrichtenoffset einer Verbrauchergruppe in Golang Kafka 10 abrufen?

Wie kann ich mit Sarama den aktuellen Nachrichtenoffset einer Verbrauchergruppe in Golang Kafka 10 abrufen?

Susan Sarandon
Susan SarandonOriginal
2024-10-25 11:29:30309Durchsuche

How to Retrieve the Current Message Offset of a Consumer Group in Golang Kafka 10 Using Sarama?

Erhalten von Verbrauchergruppen-Offsets mit Golang Kafka 10

Traditionell wurden externe Bibliotheken eingesetzt, um Verbrauchergruppenfunktionen in Golang mit Kafka zu verwalten. Allerdings stellt Kafka 10 diese Funktionalität nun nativ bereit. Dies wirft die Frage auf: Wie können wir den aktuellen Nachrichtenoffset abrufen, der von einer Verbrauchergruppe mithilfe der Golang Kafka-Bibliothek (sarama) verarbeitet wird?

Zuvor wurde kazoo-go verwendet, um Gruppennachrichtenoffsets von Zookeeper abzurufen. Mit der Einführung von sarama-cluster ist ein alternativer Ansatz erforderlich.

Lösung

Der folgende Codeausschnitt zeigt, wie man den Verbrauchergruppen-Offset erhält:

<code class="go">package main

import (
    "context"
    "log"
    "strings"

    "github.com/Shopify/sarama"
)

func main() {
    groupName := "testgrp"
    topic := "topic_name"
    offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
}

type gcInfo struct {
    offset int64
}

func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    g.offset = claim.InitialOffset()
    return nil
}

func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = false  // we don't want to change consumer group offsets
    client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
    if err != nil {
        return 0, err
    }
    info := gcInfo{}
    if err := client.Consume(ctx, []string{topic}, &info); err != nil {
        return 0, err
    }
    return info.offset, nil
}</code>

Das obige ist der detaillierte Inhalt vonWie kann ich mit Sarama den aktuellen Nachrichtenoffset einer Verbrauchergruppe in Golang Kafka 10 abrufen?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn