Rumah >pembangunan bahagian belakang >Golang >Bagaimana untuk Mendapatkan Offset Kumpulan Pengguna di Golang Kafka 10 Menggunakan `sarama-cluster`?

Bagaimana untuk Mendapatkan Offset Kumpulan Pengguna di Golang Kafka 10 Menggunakan `sarama-cluster`?

Patricia Arquette
Patricia Arquetteasal
2024-10-26 02:22:27672semak imbas

How to Retrieve Consumer Group Offsets in Golang Kafka 10 Using `sarama-cluster`?

Cara Mendapatkan Offset Kumpulan Pengguna di Golang Kafka 10

Sebelum ini, perpustakaan luaran seperti kazoo-go telah digunakan untuk mendapatkan semula mesej kumpulan pengguna offset yang disimpan dalam Zookeeper. Walau bagaimanapun, dengan pengenalan keupayaan kumpulan pengguna dalam perpustakaan Golang Kafka (sarama) dalam versi 10, adalah mungkin untuk mengakses offset ini secara langsung.

Menggunakan sarama-cluster

Untuk mendapatkan offset kumpulan pengguna menggunakan perpustakaan kelompok sarama:

<code class="go">import (
    "context"
    "log"
    "strings"

    "github.com/Shopify/sarama"
)

func main() {
    groupName := "testgrp"
    topic := "topic_name"
    offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
}

type gcInfo struct {
    offset int64
}

func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    g.offset = claim.InitialOffset()
    return nil
}

func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = false // we're not going to update the consumer group offsets
    client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
    if err != nil {
        return 0, err
    }
    info := gcInfo{}
    if err := client.Consume(ctx, []string{topic}, &info); err != nil {
        return 0, err
    }
    return info.offset, nil
}</code>

Kod ini mencipta tika ConsumerGroup baharu dan mendapatkan semula offset semasa dengan menggunakan mesej daripada topik yang ditentukan menggunakan pengendali kosong (consumeClaim). Offset awal tuntutan kemudiannya direkodkan dalam struct gcInfo.

Atas ialah kandungan terperinci Bagaimana untuk Mendapatkan Offset Kumpulan Pengguna di Golang Kafka 10 Menggunakan `sarama-cluster`?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn