Maison  >  Article  >  développement back-end  >  PHP envoie des données au code d'implémentation de Kafka

PHP envoie des données au code d'implémentation de Kafka

小云云
小云云original
2018-03-06 11:07:544948parcourir

kafka n'est qu'un petit lien. Souvent utilisé pour envoyer et transférer des données. Dans l'exemple officiel de kafka, il n'existe en fait aucune version d'implémentation pertinente de php. Les bibliothèques PHP liées à Kafka qui circulent actuellement sur Internet sont toutes des bibliothèques de classes écrites par des passionnés de programmation eux-mêmes, il n'y aura donc certainement pas de standard d'interface unifié.

Ce qui suit prend une certaine bibliothèque de classes comme exemple pour montrer l'utilisation de la bibliothèque d'extension PHP kafka appropriée. Après avoir comparé de manière exhaustive plusieurs bibliothèques PHP Kafka, Oncle Sunan estime que la bibliothèque open source suivante, nmred/kafka-php, est plus simple et plus pratique.

Tout d’abord, installez la bibliothèque de classes composer nmred/kafka-php.

Pour les problèmes d'utilisation de base de composer, vous pouvez consulter mes articles relatifs au compositeur. https://newsn.net/tag/composer/.

composer require "nmred/kafka-php" -vvv

Bien sûr, vous pouvez également utiliser des miroirs pour accélérer les téléchargements.

{
  "config": {
      "secure-http": false,
      "preferred-install": "dist",
      "sort-packages": true
  },
  "repositories": {
      "packagist": {
          "type": "composer",
          "url": "https://packagist.phpcomposer.com"
      }
  },
  "require": {
    "nmred/kafka-php": "v0.2.0.7"
  }
}

Déterminez le port et le sujet, vérifiez le numéro de version de Kafka

J'ai choisi le port local comme 9092, le sujet est test1 et vérifiez que ma version locale de Kafka est 0.11.0.0 . Ceux-ci sont tous utilisés dans le code.

php如何发送数据到kafka - kafka_version.png

php如何发送数据到kafka - receive.png

Code producteur (asynchrone)

<?phprequire &#39;vendor/autoload.php&#39;;date_default_timezone_set(&#39;PRC&#39;);// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// // Create the logger// $logger = new Logger(&#39;my_logger&#39;);// // Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer(function() {
    return array(
        array(
            'topic' => 'test1',     //注意对应topic
            'key' => 'testkey',
            'value' => 'test....message.',
            ),
    );});// $producer->setLogger($logger);$producer->success(function($result) {
    var_dump($result);});$producer->error(function($errorCode) {
    var_dump($errorCode);});$producer->send(true);

Code producteur (synchrone)

<?phprequire &#39;vendor/autoload.php&#39;;date_default_timezone_set(&#39;PRC&#39;);// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger(&#39;my_logger&#39;);// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer();// $producer->setLogger($logger);for($i = 0; $i < 100; $i++) {
  $result = $producer->send(array(
    array(
        'topic' => 'test1',
        'value' => 'test1....message.',
        'key' => 'key'.$i,
    ),
  ));
  var_dump($result);}

Les codes de ces deux producteurs peuvent être reçus en utilisant les commandes shell suivantes.

kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning

Code consommateur

require 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger('my_logger');// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ConsumerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setGroupId('test');$config->setBrokerVersion('0.11.0.0');$config->setTopics(array('test1'));//$config->setOffsetReset('earliest');$consumer = new \Kafka\Consumer();//$consumer->setLogger($logger);$consumer->start(function($topic, $part, $message) {
    var_dump($message);});

Ce code consommateur peut envoyer des données via la commande shell suivante.

kafka-console-producer --broker-list localhost:9092 --topic test1

Il est à noter que ce code consommateur peut également être exécuté sur la page Web. La page affichera les données pertinentes en temps réel. On estime que le côté PHP est un état infini.

Bien sûr, tout le monde doit penser à activer Zookeeper et Kafka pour pouvoir faire ces expériences. Pour les problèmes d’installation de Kafka liés, veuillez cliquer ici pour les consulter. https://newsn.net/tag/kafka/ .

L'adresse de la bibliothèque de classes impliquée dans cet article est : https://github.com/weiboad/kafka-php. Cette bibliothèque de classes open source semble être une œuvre chinoise, il existe donc un document chinois. Voir ici. https://github.com/weiboad/kafka-php/blob/master/README_CH.md.

Recommandations associées :

Exemple de partage de l'extension PHP kafka sous Linux

Client Kafka écrit en PHP

Utilisation de l'assembly kafka et de l'extension Kafka-PHP

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn