Golang Kafka 10에서 소비자 그룹 오프셋을 검색하는 방법
이전에는 kazoo-go와 같은 외부 라이브러리를 활용하여 소비자 그룹 메시지를 검색했습니다. Zookeeper에 저장된 오프셋입니다. 그러나 버전 10의 Golang Kafka 라이브러리(sarama)에 소비자 그룹 기능이 도입되면서 이러한 오프셋에 직접 액세스할 수 있습니다.
sarama-cluster 사용
sarama-cluster 라이브러리를 사용하여 소비자 그룹 오프셋을 얻으려면:
<code class="go">import ( "context" "log" "strings" "github.com/Shopify/sarama" ) func main() { groupName := "testgrp" topic := "topic_name" offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic) if err != nil { log.Fatal(err) } log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset) } type gcInfo struct { offset int64 } func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { g.offset = claim.InitialOffset() return nil } func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) { config := sarama.NewConfig() config.Consumer.Offsets.AutoCommit.Enable = false // we're not going to update the consumer group offsets client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config) if err != nil { return 0, err } info := gcInfo{} if err := client.Consume(ctx, []string{topic}, &info); err != nil { return 0, err } return info.offset, nil }</code>
이 코드는 새 ConsumerGroup 인스턴스를 생성하고 빈 핸들러(consumeClaim)를 사용하여 지정된 주제에서 메시지를 소비하여 현재 오프셋을 검색합니다. 그러면 청구의 초기 오프셋이 gcInfo 구조체에 기록됩니다.
위 내용은 `sarama-cluster`를 사용하여 Golang Kafka 10에서 소비자 그룹 오프셋을 검색하는 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!