首頁  >  文章  >  後端開發  >  php傳送資料到kafka實作程式碼

php傳送資料到kafka實作程式碼

小云云
小云云原創
2018-03-06 11:07:544947瀏覽

kafka只是個小小的紐帶。經常用於資料的發送及轉移。在kafka官方的例子中,其實並沒有php的相關實作版本。現在網路上流傳的kafka的相關php函式庫,都是些程式愛好者自己寫的類別庫,所以就肯定不會有太統一的介面標準了。

下面以某個類別庫為例,展示相關的kafka的php擴充庫使用。綜合比較了幾個kafka的php函式庫,蘇南叔公覺得下面的這個開源類別函式庫,nmred/kafka-php ,比較簡單方便一些。

先要安裝composer的類別庫nmred/kafka-php 。

composer的基本使用問題,大家可以查看我的composer相關文章。 https://newsn.net/tag/composer/ 。

composer require "nmred/kafka-php" -vvv

當然,你也可以用鏡像加速下載。

{
  "config": {
      "secure-http": false,
      "preferred-install": "dist",
      "sort-packages": true
  },
  "repositories": {
      "packagist": {
          "type": "composer",
          "url": "https://packagist.phpcomposer.com"
      }
  },
  "require": {
    "nmred/kafka-php": "v0.2.0.7"
  }
}

確定連接埠及topic,查看kafka版本號

我選擇本地的連接埠是9092,topic是test1,同時查看我本地的kafka版本是0.11.0.0。這些在程式碼中都是要用到的。

php如何发送数据到kafka - kafka_version.png

php如何发送数据到kafka - receive.png

生產者程式碼(非同步)

<?phprequire &#39;vendor/autoload.php&#39;;date_default_timezone_set(&#39;PRC&#39;);// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// // Create the logger// $logger = new Logger(&#39;my_logger&#39;);// // Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer(function() {
    return array(
        array(
            'topic' => 'test1',     //注意对应topic
            'key' => 'testkey',
            'value' => 'test....message.',
            ),
    );});// $producer->setLogger($logger);$producer->success(function($result) {
    var_dump($result);});$producer->error(function($errorCode) {
    var_dump($errorCode);});$producer->send(true);

生產者程式碼(同步)

<?phprequire &#39;vendor/autoload.php&#39;;date_default_timezone_set(&#39;PRC&#39;);// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger(&#39;my_logger&#39;);// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer();// $producer->setLogger($logger);for($i = 0; $i < 100; $i++) {
  $result = $producer->send(array(
    array(
        'topic' => 'test1',
        'value' => 'test1....message.',
        'key' => 'key'.$i,
    ),
  ));
  var_dump($result);}

這兩個生產者的程式碼,可以用下列shell指令接收。

kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning

消費者程式碼

require 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger('my_logger');// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ConsumerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setGroupId('test');$config->setBrokerVersion('0.11.0.0');$config->setTopics(array('test1'));//$config->setOffsetReset('earliest');$consumer = new \Kafka\Consumer();//$consumer->setLogger($logger);$consumer->start(function($topic, $part, $message) {
    var_dump($message);});

這個消費者程式碼,可以透過下面的shell指令傳送資料。

kafka-console-producer --broker-list localhost:9092 --topic test1

值得特別說明的是,這個消費者的程式碼,在網頁裡面執行也是可以的。頁面會即時顯示相關數據。估計php端是個無限長的endless狀態。

當然,大家要記得開啟zookeeper和kafka,才能做這些實驗。相關kafka的安裝問題,請點這裡查看。 https://newsn.net/tag/kafka/ 。

本文涉及的類別庫位址是:https://github.com/weiboad/kafka-php 。這個開源類別庫,似乎是國人作品,所以有個中文文檔。見這裡。 https://github.com/weiboad/kafka-php/blob/master/README_CH.md 。

相關推薦:

linux下php擴充kafka的實例分享

使用PHP 寫的Kafka 用戶端

#kafka組裝及Kafka-PHP擴充的使用

以上是php傳送資料到kafka實作程式碼的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn