Heim > Artikel > Backend-Entwicklung > PHP sendet Daten an den Kafka-Implementierungscode
kafka ist nur ein kleiner Link. Wird häufig zum Senden und Übertragen von Daten verwendet. Im offiziellen Beispiel von Kafka gibt es tatsächlich keine relevante Implementierungsversion von PHP. Die Kafka-bezogenen PHP-Bibliotheken, die derzeit im Internet kursieren, sind alle Klassenbibliotheken, die von Programmierbegeisterten selbst geschrieben wurden, sodass es definitiv keinen einheitlichen Schnittstellenstandard geben wird.
Im Folgenden wird eine bestimmte Klassenbibliothek als Beispiel verwendet, um die Verwendung der entsprechenden Kafka-PHP-Erweiterungsbibliothek zu zeigen. Nach einem umfassenden Vergleich mehrerer Kafka-PHP-Bibliotheken ist Onkel Su Nan der Meinung, dass die folgende Open-Source-Bibliothek, nmred/kafka-php, einfacher und bequemer ist.
Grundlegende Fragen zur Verwendung von Composer finden Sie in meinen Composer-bezogenen Artikeln. https://newsn.net/tag/composer/.
composer require "nmred/kafka-php" -vvv
Natürlich können Sie auch Spiegel verwenden, um Downloads zu beschleunigen.
{ "config": { "secure-http": false, "preferred-install": "dist", "sort-packages": true }, "repositories": { "packagist": { "type": "composer", "url": "https://packagist.phpcomposer.com" } }, "require": { "nmred/kafka-php": "v0.2.0.7" } }
Ich habe den lokalen Port als 9092 ausgewählt, das Thema ist test1 und überprüfe, ob meine lokale Kafka-Version 0.11.0.0 ist . Diese werden alle im Code verwendet.
<?phprequire 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// // Create the logger// $logger = new Logger('my_logger');// // Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer(function() { return array( array( 'topic' => 'test1', //注意对应topic 'key' => 'testkey', 'value' => 'test....message.', ), );});// $producer->setLogger($logger);$producer->success(function($result) { var_dump($result);});$producer->error(function($errorCode) { var_dump($errorCode);});$producer->send(true);
<?phprequire 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger('my_logger');// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer();// $producer->setLogger($logger);for($i = 0; $i < 100; $i++) { $result = $producer->send(array( array( 'topic' => 'test1', 'value' => 'test1....message.', 'key' => 'key'.$i, ), )); var_dump($result);}
Die Codes dieser beiden Hersteller können mit den folgenden Shell-Befehlen empfangen werden.
kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning
require 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger('my_logger');// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ConsumerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setGroupId('test');$config->setBrokerVersion('0.11.0.0');$config->setTopics(array('test1'));//$config->setOffsetReset('earliest');$consumer = new \Kafka\Consumer();//$consumer->setLogger($logger);$consumer->start(function($topic, $part, $message) { var_dump($message);});
Dieser Verbrauchercode kann Daten über den folgenden Shell-Befehl senden.
kafka-console-producer --broker-list localhost:9092 --topic test1
Es ist erwähnenswert, dass dieser Verbrauchercode auch auf der Webseite ausgeführt werden kann. Auf der Seite werden relevante Daten in Echtzeit angezeigt. Es wird geschätzt, dass die PHP-Seite ein unendlicher Zustand ist.
Natürlich muss jeder daran denken, Zookeeper und Kafka zu aktivieren, um diese Experimente durchführen zu können. Klicken Sie hier, um Informationen zu verwandten Kafka-Installationsproblemen anzuzeigen. https://newsn.net/tag/kafka/ .
Die Adresse der Klassenbibliothek in diesem Artikel lautet: https://github.com/weiboad/kafka-php. Diese Open-Source-Klassenbibliothek scheint ein chinesisches Werk zu sein, daher gibt es ein chinesisches Dokument. Siehe hier. https://github.com/weiboad/kafka-php/blob/master/README_CH.md.
Verwandte Empfehlungen:
Beispielfreigabe der PHP-Erweiterung kafka unter Linux
Kafka-Client geschrieben in PHP
Verwendung von Kafka-Assembly und Kafka-PHP-Erweiterung
Das obige ist der detaillierte Inhalt vonPHP sendet Daten an den Kafka-Implementierungscode. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!