Home >Backend Development >PHP Tutorial >PHP sends data to kafka implementation code
kafka is just a small link. Often used for sending and transferring data. In the official example of kafka, there is actually no relevant implementation version of php. The Kafka-related PHP libraries currently circulating on the Internet are all class libraries written by programming enthusiasts, so there will definitely not be a unified interface standard.
The following takes a certain class library as an example to show the use of the relevant kafka PHP extension library. After comprehensively comparing several Kafka PHP libraries, Uncle Su Nan feels that the following open source library, nmred/kafka-php, is simpler and more convenient.
For basic usage issues of composer, you can check out my composer-related articles. https://newsn.net/tag/composer/.
composer require "nmred/kafka-php" -vvv
Of course, you can also use mirrors to speed up downloads.
{ "config": { "secure-http": false, "preferred-install": "dist", "sort-packages": true }, "repositories": { "packagist": { "type": "composer", "url": "https://packagist.phpcomposer.com" } }, "require": { "nmred/kafka-php": "v0.2.0.7" } }
I chose the local port as 9092, the topic is test1, and check that my local kafka version is 0.11.0.0. These are all used in the code.
##Producer code (asynchronous)<?phprequire 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// // Create the logger// $logger = new Logger('my_logger');// // Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer(function() { return array( array( 'topic' => 'test1', //注意对应topic 'key' => 'testkey', 'value' => 'test....message.', ), );});// $producer->setLogger($logger);$producer->success(function($result) { var_dump($result);});$producer->error(function($errorCode) { var_dump($errorCode);});$producer->send(true);
<?phprequire 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger('my_logger');// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer();// $producer->setLogger($logger);for($i = 0; $i < 100; $i++) { $result = $producer->send(array( array( 'topic' => 'test1', 'value' => 'test1....message.', 'key' => 'key'.$i, ), )); var_dump($result);}
kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginningConsumer code
require 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger('my_logger');// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ConsumerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setGroupId('test');$config->setBrokerVersion('0.11.0.0');$config->setTopics(array('test1'));//$config->setOffsetReset('earliest');$consumer = new \Kafka\Consumer();//$consumer->setLogger($logger);$consumer->start(function($topic, $part, $message) { var_dump($message);});
kafka-console-producer --broker-list localhost:9092 --topic test1It is worth mentioning that this consumer code can also be executed on the web page. The page will display relevant data in real time. It is estimated that the PHP side is an infinite state. Of course, everyone must remember to enable zookeeper and kafka in order to do these experiments. For related Kafka installation issues, please click here to view. https://newsn.net/tag/kafka/ . The class library address involved in this article is: https://github.com/weiboad/kafka-php. This open source class library seems to be a Chinese work, so there is a Chinese document. See here. https://github.com/weiboad/kafka-php/blob/master/README_CH.md. Related recommendations:
Example sharing of PHP extension kafka under Linux
Usage of kafka assembly and Kafka-PHP extension
The above is the detailed content of PHP sends data to kafka implementation code. For more information, please follow other related articles on the PHP Chinese website!