cari

Rumah  >  Soal Jawab  >  teks badan

Bagaimana untuk mendapatkan offset terakhir dalam topik dari Kafka menggunakan perpustakaan php?

Saya sedang menulis pengguna Kafka untuk projek API menggunakan perpustakaan php-rdkafka. Saya perlu mencari offset terakhir dalam topik dan mendapatkan nilai daripadanya untuk pemprosesan selanjutnya. Sebagai contoh, offset terakhir dalam topik = 5, maka saya perlu mendapatkan offset 5 dan menghantarnya melalui API sehingga offset baharu ditambahkan. Apa yang saya cuba jalankan:

$conf = new RdKafka\Conf();

$settings = [
    'socket.keepalive.enable'  => true,
    'log_level'                => LOG_WARNING,
    'enable.auto.offset.store' => 'true',
    'auto.offset.reset'        => 'earliest',
    'enable.partition.eof'     => 'false',
    'enable.auto.commit'       => 'false',
    'max.poll.interval.ms'     => 300000,
    'session.timeout.ms'       => 45000,
    'group.id'                 => 'test-group',
    'group.instance.id'        => uniqid('', true),
    'metadata.broker.list'     => 'stat-kafka-1:9092,stat-kafka-2:9092,stat-kafka-3:9092',
];
foreach ($settings as $key => $value) {
    $conf->set($key, $value);
}

$topicName = 'userstatistics_12345';
$partition = 0;

$topicPartition = new RdKafka\TopicPartition($topicName, $partition);

$topicPartitionsWithOffsets = $consumer->getOffsetPositions([$topicPartition]);

var_dump($topicPartitionsWithOffsets);

Tetapi ini mengembalikan hasil yang pelik dengan offset negatif

array(1) { [0]=> object(RdKafka\TopicPartition)#6 (4) { ["topic"]=> string(20) "userstatistics_12345" ["partition"]=> int(0) ["offset"]=> int(-1001) ["err"]=> int(0) } }

Walaupun sebenarnya offset terakhir pada masa ini ialah 59. Idea saya adalah untuk mendapatkan offset terakhir dan kemudian mendapatkan nilai menggunakan:

$consumer->assign([
    new RdKafka\TopicPartition($topicName, $partition, $lastOffset)
]);

Saya juga tidak mahu menggunakan gelung while(true) untuk melakukan kerja skrip dengan cepat.

Itu sahaja. terima kasih.

P粉763662390P粉763662390440 hari yang lalu586

membalas semua(1)saya akan balas

  • P粉701491897

    P粉7014918972023-09-11 00:14:41

    Saya menemui jawapannya dan ia sangat berkesan untuk saya:

    $conf = new RdKafka\Conf();
    
    // Configure the group.id. All consumer with the same group.id will consume
    // different partitions.
    $conf->set('group.id', 'test-group');
    
    // Initial list of Kafka brokers
    $conf->set('metadata.broker.list', 'kafka-1:9092');
    
    // Set where to start consuming messages when there is no initial offset in
    // offset store or the desired offset is out of range.
    // 'earliest': start from the beginning
    $conf->set('auto.offset.reset', 'latest');
    
    // Emit EOF event when reaching the end of a partition
    $conf->set('enable.partition.eof', 'true');
    
    $kafkaConsumer = new RdKafka\KafkaConsumer($conf);
    $topicName = 'topic_name';
    $partition = 0;
    
    
    $topicPartition = new RdKafka\TopicPartition($topicName, 0);
    $timeoutMs = 100000;
    
    $low = null;
    $high = null;
    
    $wm = $kafkaConsumer->queryWatermarkOffsets($topicName,$partition,$low,$high,$timeoutMs);
    
    $offset = $high - 1;
    
    $kafkaConsumer->assign([new RdKafka\TopicPartition($topicName, $partition, $offset)]);
    
    $message = $kafkaConsumer->consume(1000);
    
    if ($message !== null) {
        // Process the message
        $payload = $message->payload;
        echo "Message at offset $offset: $payload\n";
    }
    
    $kafkaConsumer->close();

    balas
    0
  • Batalbalas