Maison  >  Article  >  développement back-end  >  Utilisation de Kafka dans la file d'attente de messages PHP

Utilisation de Kafka dans la file d'attente de messages PHP

Guanhui
Guanhuiavant
2020-05-14 09:28:225355parcourir

Utilisation de Kafka dans la file d'attente de messages PHP

Installez le service Kafka

Allez directement sur le site officiel de Kafka et téléchargez la dernière

wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz

Décompressez et entrez dans le répertoire

tar -zxvf kafka_2.13-2.5.0.tgz
cd kafka_2.13-2.5.0

Démarrez le service Kafka

Utilisez le script du package d'installation pour démarrer un Zookeeper à nœud unique instance

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

Utilisez kafka -server-start.sh Démarrez le service kafka

bin/kafka-server-start.sh config/server.properties

Créer un sujet

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Affichez la liste des sujets et vérifiez si la création est réussi

bin/kafka-topics.sh --list --zookeeper localhost:2181
$ test

Producteur, envoyer des News

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

C'est tout pour le service, la prochaine chose c'est php.

Installer l'extension PHP

L'installation de rdkafka dépend de librdkafka, alors installez d'abord librdkafka

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make && make install

Installez l'extension php-rdkafka

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure --with-php-config=/usr/local/Cellar/php@7.2/7.2.24/bin/php-config  ## 这里根据自己的情况填写路径
make && make install

Ajoutez

extension=rdkafka.so

à php-ini, redémarrez php-fpm et vous devriez pouvoir voir l'extension.

Créer une classe de producteur à l'aide de Kafka

<?php
class KafkaProducer
{
    public static $brokerList = &#39;127.0.0.1:9092&#39;;
    public static function send($message, $topic)
    {
        self::producer($message, $topic);
    }
    public static function producer($message, $topic = &#39;test&#39;)
    {
        $conf = new \RdKafka\Conf();
        $conf->set(&#39;metadata.broker.list&#39;, self::$brokerList);
        $producer = new \RdKafka\Producer($conf);
        $topic = $producer->newTopic($topic);
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));
        $producer->poll(0);
        $result = $producer->flush(10000);
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException(&#39;Was unable to flush, messages might be lost!&#39;);
        }
    }
}

Créer une classe de consommateur

<?php
class KafkaConsumer
{
    public static $brokerList = &#39;127.0.0.1:9092&#39;;
      public static function consumer()
    {
        $conf = new \RdKafka\Conf();
        $conf->set(&#39;group.id&#39;, &#39;test&#39;);
        $rk = new \RdKafka\Consumer($conf);
        $rk->addBrokers("127.0.0.1");
        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set(&#39;auto.commit.interval.ms&#39;, 100);
        $topicConf->set(&#39;offset.store.method&#39;, &#39;broker&#39;);
        $topicConf->set(&#39;auto.offset.reset&#39;, &#39;smallest&#39;);
        $topic = $rk->newTopic(&#39;test&#39;, $topicConf);
        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
        while (true) {
            $message = $topic->consume(0, 120*10000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    var_dump($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed out\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
    }
}

Résumé du problème

1. Aucun runtime Java présent, demande d'installation

Étant donné que kafka nécessite la prise en charge de l'environnement Java, l'environnement Java est installé. Vous pouvez aller sur javase-jdk14-downloads pour choisir votre propre version à télécharger et installer

2. Créez un sujet et il apparaîtra : Facteur de réplication : 1 plus grand que les courtiers disponibles : 0

Cela signifie qu'il y a au moins un courtier, c'est-à-dire qu'il n'y a pas de courtier valide disponible. Vous devez vous assurer que votre kafka a bien été démarré

Tutoriel recommandé : "Tutoriel PHP"

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer