Rumah >pembangunan bahagian belakang >Golang >Bagaimana untuk Mengambil Offset Mesej Semasa Kumpulan Pengguna di Golang Kafka 10 Menggunakan Sarama?

Bagaimana untuk Mengambil Offset Mesej Semasa Kumpulan Pengguna di Golang Kafka 10 Menggunakan Sarama?

Susan Sarandon
Susan Sarandonasal
2024-10-25 11:29:30389semak imbas

How to Retrieve the Current Message Offset of a Consumer Group in Golang Kafka 10 Using Sarama?

Mendapatkan Offset Kumpulan Pengguna dengan Golang Kafka 10

Secara tradisinya, perpustakaan luar digunakan untuk mengurus keupayaan kumpulan pengguna di Golang dengan Kafka. Walau bagaimanapun, Kafka 10 kini secara asli menyediakan fungsi sedemikian. Ini menimbulkan persoalan: bagaimanakah kita boleh mendapatkan semula offset mesej semasa yang diproses oleh kumpulan pengguna menggunakan perpustakaan Golang Kafka (sarama)?

Sebelum ini, kazoo-go telah digunakan untuk mendapatkan semula offset mesej kumpulan daripada Zookeeper. Dengan pengenalan kluster sarama, pendekatan alternatif diperlukan.

Penyelesaian

Coretan kod berikut menunjukkan cara mendapatkan offset kumpulan pengguna:

<code class="go">package main

import (
    "context"
    "log"
    "strings"

    "github.com/Shopify/sarama"
)

func main() {
    groupName := "testgrp"
    topic := "topic_name"
    offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
}

type gcInfo struct {
    offset int64
}

func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    g.offset = claim.InitialOffset()
    return nil
}

func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = false  // we don't want to change consumer group offsets
    client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
    if err != nil {
        return 0, err
    }
    info := gcInfo{}
    if err := client.Consume(ctx, []string{topic}, &info); err != nil {
        return 0, err
    }
    return info.offset, nil
}</code>

Atas ialah kandungan terperinci Bagaimana untuk Mengambil Offset Mesej Semasa Kumpulan Pengguna di Golang Kafka 10 Menggunakan Sarama?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn