Heim > Fragen und Antworten > Hauptteil
Ich schreibe einen Kafka-Consumer für ein API-Projekt unter Verwendung der php-rdkafka-Bibliothek. Ich muss den letzten Offset im Thema finden und daraus den Wert für die weitere Verarbeitung erhalten. Zum Beispiel, letzter Offset im Thema = 5, dann muss ich Offset 5 abrufen und ihn über die API senden, bis ein neuer Offset hinzugefügt wird. Was ich ausführen möchte:
$conf = new RdKafka\Conf(); $settings = [ 'socket.keepalive.enable' => true, 'log_level' => LOG_WARNING, 'enable.auto.offset.store' => 'true', 'auto.offset.reset' => 'earliest', 'enable.partition.eof' => 'false', 'enable.auto.commit' => 'false', 'max.poll.interval.ms' => 300000, 'session.timeout.ms' => 45000, 'group.id' => 'test-group', 'group.instance.id' => uniqid('', true), 'metadata.broker.list' => 'stat-kafka-1:9092,stat-kafka-2:9092,stat-kafka-3:9092', ]; foreach ($settings as $key => $value) { $conf->set($key, $value); } $topicName = 'userstatistics_12345'; $partition = 0; $topicPartition = new RdKafka\TopicPartition($topicName, $partition); $topicPartitionsWithOffsets = $consumer->getOffsetPositions([$topicPartition]); var_dump($topicPartitionsWithOffsets);
Aber dies führt zu seltsamen Ergebnissen mit negativen Offsets
array(1) { [0]=> object(RdKafka\TopicPartition)#6 (4) { ["topic"]=> string(20) "userstatistics_12345" ["partition"]=> int(0) ["offset"]=> int(-1001) ["err"]=> int(0) } }
Obwohl der letzte Offset aktuell bei 59 liegt. Meine Idee ist, den letzten Offset zu ermitteln und dann den Wert zu ermitteln mit:
$consumer->assign([ new RdKafka\TopicPartition($topicName, $partition, $lastOffset) ]);
Ich möchte auch keine while(true)-Schleife verwenden, um Skriptarbeiten schnell zu erledigen.
Das ist alles. Danke.
P粉7014918972023-09-11 00:14:41
我找到了答案,对我来说效果很好:
$conf = new RdKafka\Conf(); // Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set('group.id', 'test-group'); // Initial list of Kafka brokers $conf->set('metadata.broker.list', 'kafka-1:9092'); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'earliest': start from the beginning $conf->set('auto.offset.reset', 'latest'); // Emit EOF event when reaching the end of a partition $conf->set('enable.partition.eof', 'true'); $kafkaConsumer = new RdKafka\KafkaConsumer($conf); $topicName = 'topic_name'; $partition = 0; $topicPartition = new RdKafka\TopicPartition($topicName, 0); $timeoutMs = 100000; $low = null; $high = null; $wm = $kafkaConsumer->queryWatermarkOffsets($topicName,$partition,$low,$high,$timeoutMs); $offset = $high - 1; $kafkaConsumer->assign([new RdKafka\TopicPartition($topicName, $partition, $offset)]); $message = $kafkaConsumer->consume(1000); if ($message !== null) { // Process the message $payload = $message->payload; echo "Message at offset $offset: $payload\n"; } $kafkaConsumer->close();