Maison > Article > développement back-end > Utilisation de Kafka dans la file d'attente de messages PHP
Installez le service Kafka
Allez directement sur le site officiel de Kafka et téléchargez la dernière
wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
Décompressez et entrez dans le répertoire
tar -zxvf kafka_2.13-2.5.0.tgz cd kafka_2.13-2.5.0
Démarrez le service Kafka
Utilisez le script du package d'installation pour démarrer un Zookeeper à nœud unique instance
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
Utilisez kafka -server-start.sh Démarrez le service kafka
bin/kafka-server-start.sh config/server.properties
Créer un sujet
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Affichez la liste des sujets et vérifiez si la création est réussi
bin/kafka-topics.sh --list --zookeeper localhost:2181 $ test
Producteur, envoyer des News
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
C'est tout pour le service, la prochaine chose c'est php.
Installer l'extension PHP
L'installation de rdkafka dépend de librdkafka, alors installez d'abord librdkafka
git clone https://github.com/edenhill/librdkafka.git cd librdkafka ./configure make && make install
Installez l'extension php-rdkafka
git clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka phpize ./configure --with-php-config=/usr/local/Cellar/php@7.2/7.2.24/bin/php-config ## 这里根据自己的情况填写路径 make && make install
Ajoutez
extension=rdkafka.so
à php-ini, redémarrez php-fpm et vous devriez pouvoir voir l'extension.
Créer une classe de producteur à l'aide de Kafka
<?php class KafkaProducer { public static $brokerList = '127.0.0.1:9092'; public static function send($message, $topic) { self::producer($message, $topic); } public static function producer($message, $topic = 'test') { $conf = new \RdKafka\Conf(); $conf->set('metadata.broker.list', self::$brokerList); $producer = new \RdKafka\Producer($conf); $topic = $producer->newTopic($topic); $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message)); $producer->poll(0); $result = $producer->flush(10000); if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new \RuntimeException('Was unable to flush, messages might be lost!'); } } }
Créer une classe de consommateur
<?php class KafkaConsumer { public static $brokerList = '127.0.0.1:9092'; public static function consumer() { $conf = new \RdKafka\Conf(); $conf->set('group.id', 'test'); $rk = new \RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $topicConf = new \RdKafka\TopicConf(); $topicConf->set('auto.commit.interval.ms', 100); $topicConf->set('offset.store.method', 'broker'); $topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic('test', $topicConf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 120*10000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } } }
Résumé du problème
1. Aucun runtime Java présent, demande d'installation
Étant donné que kafka nécessite la prise en charge de l'environnement Java, l'environnement Java est installé. Vous pouvez aller sur javase-jdk14-downloads pour choisir votre propre version à télécharger et installer
2. Créez un sujet et il apparaîtra : Facteur de réplication : 1 plus grand que les courtiers disponibles : 0
Cela signifie qu'il y a au moins un courtier, c'est-à-dire qu'il n'y a pas de courtier valide disponible. Vous devez vous assurer que votre kafka a bien été démarré
Tutoriel recommandé : "Tutoriel PHP"
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!