Home >Backend Development >PHP Tutorial >PHP message queue Kafka usage

PHP message queue Kafka usage

Guanhui
Guanhuiforward
2020-05-14 09:28:225392browse

PHP message queue Kafka usage

Install Kafka service

Go directly to the kafka official website and download the latest

wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz

Unzip and enter the directory

tar -zxvf kafka_2.13-2.5.0.tgz
cd kafka_2.13-2.5.0

Start the Kafka service

Use the script in the installation package to start a single-node Zookeeper instance

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

Use kafka -server-start.sh Start kafka service

bin/kafka-server-start.sh config/server.properties

Create topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

View the topic list and check whether the creation is successful

bin/kafka-topics.sh --list --zookeeper localhost:2181
$ test

Producer, send News

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

That’s pretty much it for the service, the next thing is php.

Install PHP extension

rdkafka installation depends on librdkafka, so install librdkafka first

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make && make install

Install php-rdkafka extension

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure --with-php-config=/usr/local/Cellar/php@7.2/7.2.24/bin/php-config  ## 这里根据自己的情况填写路径
make && make install

Add

extension=rdkafka.so

to php-ini, restart php-fpm, and you should be able to see the extension.

Using Kafka

Create a producer class

<?php
class KafkaProducer
{
    public static $brokerList = &#39;127.0.0.1:9092&#39;;
    public static function send($message, $topic)
    {
        self::producer($message, $topic);
    }
    public static function producer($message, $topic = &#39;test&#39;)
    {
        $conf = new \RdKafka\Conf();
        $conf->set(&#39;metadata.broker.list&#39;, self::$brokerList);
        $producer = new \RdKafka\Producer($conf);
        $topic = $producer->newTopic($topic);
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));
        $producer->poll(0);
        $result = $producer->flush(10000);
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException(&#39;Was unable to flush, messages might be lost!&#39;);
        }
    }
}

Create a consumer class

<?php
class KafkaConsumer
{
    public static $brokerList = &#39;127.0.0.1:9092&#39;;
      public static function consumer()
    {
        $conf = new \RdKafka\Conf();
        $conf->set(&#39;group.id&#39;, &#39;test&#39;);
        $rk = new \RdKafka\Consumer($conf);
        $rk->addBrokers("127.0.0.1");
        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set(&#39;auto.commit.interval.ms&#39;, 100);
        $topicConf->set(&#39;offset.store.method&#39;, &#39;broker&#39;);
        $topicConf->set(&#39;auto.offset.reset&#39;, &#39;smallest&#39;);
        $topic = $rk->newTopic(&#39;test&#39;, $topicConf);
        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
        while (true) {
            $message = $topic->consume(0, 120*10000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    var_dump($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed out\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
    }
}

Problem summary

1. No Java runtime present, requesting install

Because kafka requires java environment support, the java environment is installed. You can go to javase-jdk14-downloads and choose your own version to download and install.

2. Create topic. Replication factor: 1 larger than available brokers: 0

means there is at least one broker. Also That is to say, there are no valid brokers available. You have to make sure that your kafka has been started

Recommended tutorial: "PHP Tutorial"

The above is the detailed content of PHP message queue Kafka usage. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:learnku.com. If there is any infringement, please contact admin@php.cn delete