Maison >développement back-end >tutoriel php >PHP envoie des données au code d'implémentation de Kafka
kafka n'est qu'un petit lien. Souvent utilisé pour envoyer et transférer des données. Dans l'exemple officiel de kafka, il n'existe en fait aucune version d'implémentation pertinente de php. Les bibliothèques PHP liées à Kafka qui circulent actuellement sur Internet sont toutes des bibliothèques de classes écrites par des passionnés de programmation eux-mêmes, il n'y aura donc certainement pas de standard d'interface unifié.
Ce qui suit prend une certaine bibliothèque de classes comme exemple pour montrer l'utilisation de la bibliothèque d'extension PHP kafka appropriée. Après avoir comparé de manière exhaustive plusieurs bibliothèques PHP Kafka, Oncle Sunan estime que la bibliothèque open source suivante, nmred/kafka-php, est plus simple et plus pratique.
Pour les problèmes d'utilisation de base de composer, vous pouvez consulter mes articles relatifs au compositeur. https://newsn.net/tag/composer/.
composer require "nmred/kafka-php" -vvv
Bien sûr, vous pouvez également utiliser des miroirs pour accélérer les téléchargements.
{ "config": { "secure-http": false, "preferred-install": "dist", "sort-packages": true }, "repositories": { "packagist": { "type": "composer", "url": "https://packagist.phpcomposer.com" } }, "require": { "nmred/kafka-php": "v0.2.0.7" } }
J'ai choisi le port local comme 9092, le sujet est test1 et vérifiez que ma version locale de Kafka est 0.11.0.0 . Ceux-ci sont tous utilisés dans le code.
<?phprequire 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// // Create the logger// $logger = new Logger('my_logger');// // Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer(function() { return array( array( 'topic' => 'test1', //注意对应topic 'key' => 'testkey', 'value' => 'test....message.', ), );});// $producer->setLogger($logger);$producer->success(function($result) { var_dump($result);});$producer->error(function($errorCode) { var_dump($errorCode);});$producer->send(true);
<?phprequire 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger('my_logger');// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer();// $producer->setLogger($logger);for($i = 0; $i < 100; $i++) { $result = $producer->send(array( array( 'topic' => 'test1', 'value' => 'test1....message.', 'key' => 'key'.$i, ), )); var_dump($result);}
Les codes de ces deux producteurs peuvent être reçus en utilisant les commandes shell suivantes.
kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning
require 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger('my_logger');// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ConsumerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setGroupId('test');$config->setBrokerVersion('0.11.0.0');$config->setTopics(array('test1'));//$config->setOffsetReset('earliest');$consumer = new \Kafka\Consumer();//$consumer->setLogger($logger);$consumer->start(function($topic, $part, $message) { var_dump($message);});
Ce code consommateur peut envoyer des données via la commande shell suivante.
kafka-console-producer --broker-list localhost:9092 --topic test1
Il est à noter que ce code consommateur peut également être exécuté sur la page Web. La page affichera les données pertinentes en temps réel. On estime que le côté PHP est un état infini.
Bien sûr, tout le monde doit penser à activer Zookeeper et Kafka pour pouvoir faire ces expériences. Pour les problèmes d’installation de Kafka liés, veuillez cliquer ici pour les consulter. https://newsn.net/tag/kafka/ .
L'adresse de la bibliothèque de classes impliquée dans cet article est : https://github.com/weiboad/kafka-php. Cette bibliothèque de classes open source semble être une œuvre chinoise, il existe donc un document chinois. Voir ici. https://github.com/weiboad/kafka-php/blob/master/README_CH.md.
Recommandations associées :
Exemple de partage de l'extension PHP kafka sous Linux
Utilisation de l'assembly kafka et de l'extension Kafka-PHP
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!