Home >Backend Development >PHP Tutorial >PHP message queue Kafka usage
Install Kafka service
Go directly to the kafka official website and download the latest
wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
Unzip and enter the directory
tar -zxvf kafka_2.13-2.5.0.tgz cd kafka_2.13-2.5.0
Start the Kafka service
Use the script in the installation package to start a single-node Zookeeper instance
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
Use kafka -server-start.sh Start kafka service
bin/kafka-server-start.sh config/server.properties
Create topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
View the topic list and check whether the creation is successful
bin/kafka-topics.sh --list --zookeeper localhost:2181 $ test
Producer, send News
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
That’s pretty much it for the service, the next thing is php.
Install PHP extension
rdkafka installation depends on librdkafka, so install librdkafka first
git clone https://github.com/edenhill/librdkafka.git cd librdkafka ./configure make && make install
Install php-rdkafka extension
git clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka phpize ./configure --with-php-config=/usr/local/Cellar/php@7.2/7.2.24/bin/php-config ## 这里根据自己的情况填写路径 make && make install
Add
extension=rdkafka.so
to php-ini, restart php-fpm, and you should be able to see the extension.
Using Kafka
Create a producer class
<?php class KafkaProducer { public static $brokerList = '127.0.0.1:9092'; public static function send($message, $topic) { self::producer($message, $topic); } public static function producer($message, $topic = 'test') { $conf = new \RdKafka\Conf(); $conf->set('metadata.broker.list', self::$brokerList); $producer = new \RdKafka\Producer($conf); $topic = $producer->newTopic($topic); $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message)); $producer->poll(0); $result = $producer->flush(10000); if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new \RuntimeException('Was unable to flush, messages might be lost!'); } } }
Create a consumer class
<?php class KafkaConsumer { public static $brokerList = '127.0.0.1:9092'; public static function consumer() { $conf = new \RdKafka\Conf(); $conf->set('group.id', 'test'); $rk = new \RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $topicConf = new \RdKafka\TopicConf(); $topicConf->set('auto.commit.interval.ms', 100); $topicConf->set('offset.store.method', 'broker'); $topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic('test', $topicConf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 120*10000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } } }
Problem summary
1. No Java runtime present, requesting install
Because kafka requires java environment support, the java environment is installed. You can go to javase-jdk14-downloads and choose your own version to download and install.
2. Create topic. Replication factor: 1 larger than available brokers: 0
means there is at least one broker. Also That is to say, there are no valid brokers available. You have to make sure that your kafka has been started
Recommended tutorial: "PHP Tutorial"
The above is the detailed content of PHP message queue Kafka usage. For more information, please follow other related articles on the PHP Chinese website!