Home >Backend Development >PHP Tutorial >PHP sends data to kafka implementation code

PHP sends data to kafka implementation code

小云云
小云云Original
2018-03-06 11:07:545059browse

kafka is just a small link. Often used for sending and transferring data. In the official example of kafka, there is actually no relevant implementation version of php. The Kafka-related PHP libraries currently circulating on the Internet are all class libraries written by programming enthusiasts, so there will definitely not be a unified interface standard.

The following takes a certain class library as an example to show the use of the relevant kafka PHP extension library. After comprehensively comparing several Kafka PHP libraries, Uncle Su Nan feels that the following open source library, nmred/kafka-php, is simpler and more convenient.

First, install the composer class library nmred/kafka-php.

For basic usage issues of composer, you can check out my composer-related articles. https://newsn.net/tag/composer/.

composer require "nmred/kafka-php" -vvv

Of course, you can also use mirrors to speed up downloads.

{
  "config": {
      "secure-http": false,
      "preferred-install": "dist",
      "sort-packages": true
  },
  "repositories": {
      "packagist": {
          "type": "composer",
          "url": "https://packagist.phpcomposer.com"
      }
  },
  "require": {
    "nmred/kafka-php": "v0.2.0.7"
  }
}

Determine the port and topic, check the kafka version number

I chose the local port as 9092, the topic is test1, and check that my local kafka version is 0.11.0.0. These are all used in the code.

php如何发送数据到kafka - kafka_version.png

php如何发送数据到kafka - receive.png

##Producer code (asynchronous)

<?phprequire &#39;vendor/autoload.php&#39;;date_default_timezone_set(&#39;PRC&#39;);// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// // Create the logger// $logger = new Logger(&#39;my_logger&#39;);// // Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer(function() {
    return array(
        array(
            'topic' => 'test1',     //注意对应topic
            'key' => 'testkey',
            'value' => 'test....message.',
            ),
    );});// $producer->setLogger($logger);$producer->success(function($result) {
    var_dump($result);});$producer->error(function($errorCode) {
    var_dump($errorCode);});$producer->send(true);

Producer code (synchronous)

<?phprequire &#39;vendor/autoload.php&#39;;date_default_timezone_set(&#39;PRC&#39;);// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger(&#39;my_logger&#39;);// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer();// $producer->setLogger($logger);for($i = 0; $i < 100; $i++) {
  $result = $producer->send(array(
    array(
        'topic' => 'test1',
        'value' => 'test1....message.',
        'key' => 'key'.$i,
    ),
  ));
  var_dump($result);}

The codes of these two producers can be received using the following shell commands.

kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning
Consumer code

require 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger('my_logger');// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ConsumerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setGroupId('test');$config->setBrokerVersion('0.11.0.0');$config->setTopics(array('test1'));//$config->setOffsetReset('earliest');$consumer = new \Kafka\Consumer();//$consumer->setLogger($logger);$consumer->start(function($topic, $part, $message) {
    var_dump($message);});

This consumer code can send data through the following shell command.

kafka-console-producer --broker-list localhost:9092 --topic test1
It is worth mentioning that this consumer code can also be executed on the web page. The page will display relevant data in real time. It is estimated that the PHP side is an infinite state.

Of course, everyone must remember to enable zookeeper and kafka in order to do these experiments. For related Kafka installation issues, please click here to view. https://newsn.net/tag/kafka/ .

The class library address involved in this article is: https://github.com/weiboad/kafka-php. This open source class library seems to be a Chinese work, so there is a Chinese document. See here. https://github.com/weiboad/kafka-php/blob/master/README_CH.md.

Related recommendations:

Example sharing of PHP extension kafka under Linux

Kafka client written in PHP

Usage of kafka assembly and Kafka-PHP extension

The above is the detailed content of PHP sends data to kafka implementation code. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn