Kafka 10 を使用した Go でのコンシューマ グループ オフセットの取得
Kafka 10 のリリースにより、Go Kafka ライブラリ (sarama) はコンシューマ グループ オフセットを提供するようになりました。外部ライブラリに依存せずにグループ機能を利用できます。これにより、コンシューマ グループによって処理されている現在のメッセージ オフセットを取得する方法が問題になります。
解決策
コンシューマ グループ オフセットを取得するには、次の手順に従います。
コンシューマ グループ情報構造体の実装:
<code class="go">type gcInfo struct { offset int64 }</code>
コンシューマ グループ情報ハンドラの作成:
<code class="go">func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { g.offset = claim.InitialOffset() return nil }</code>
コンシューマ グループの構成と作成:
<code class="go">config := sarama.NewConfig() config.Consumer.Offsets.AutoCommit.Enable = false client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)</code>
グループ内のメッセージ:
<code class="go">info := gcInfo{} if err := client.Consume(ctx, []string{topic}, &info); err != nil { return 0, err }</code>
オフセットを取得:
<code class="go">return info.offset, nil</code>
これで実装すると、いつでも特定のパーティションとトピックのコンシューマ グループ オフセットを取得できます。
以上がKafka 10 を使用して Go でコンシューマ グループ オフセットを取得する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。