Maison >développement back-end >tutoriel php >Introduction à Kafka et installation et test de Kafka basé sur PHP
Le contenu de cet article concerne l'introduction de Kafka et l'installation et les tests de Kafka basé sur PHP. Le contenu est très détaillé. Les amis dans le besoin peuvent s'y référer.
Kafka est un système de messagerie de publication-abonnement distribué à haut débit
producteur : producteur .
consommateur : consommateur.
sujet : les messages sont enregistrés dans la catégorie du sujet. Kafka classe les graines de message (Flux), et chaque type de message est appelé un sujet.
Broker : fonctionne dans un cluster et peut être composé d'un ou plusieurs services. Chaque service est appelé un courtier ; les consommateurs peuvent s'abonner à un ou plusieurs sujets et extraire les données du courtier pour les consulter.
1. La partition sous un sujet ne peut pas être inférieure au nombre de consommateurs, c'est-à-dire que le nombre de consommateurs sous un sujet ne peut pas être supérieur à la partition si c'est le cas. plus grand, cela fera perdre du temps d'inactivité
2 . Une partition sous un sujet peut être consommée par un consommateur dans différents groupes de consommateurs en même temps
3. Une partition sous un sujet ne peut être consommée que par un consommateur dans le même groupe. même groupe de consommateurs
L'ack du producteur Kafka a 3 mécanismes . La configuration du producteur lors de l'initialisation du producteur peut être configurée via Différentes valeurs pour request.required.acks sont implémentées.
0 : Cela signifie que le producteur n'attend pas la confirmation du courtier que la synchronisation est terminée pour continuer à envoyer le prochain message (batch). Cette option offre la latence la plus faible mais la garantie de durabilité la plus faible (certaines données seront perdues en cas de panne du serveur, comme par exemple si le leader est mort, mais le producteur ne le sait pas et le courtier ne peut pas recevoir les informations envoyées).
1 : Cela signifie que le producteur envoie le message suivant une fois que le leader a reçu avec succès les données et les a confirmées. Cette option offre une meilleure durabilité car le client attend que le serveur confirme que la requête a réussi (le seul message qui a été écrit au leader mort mais pas encore répliqué sera perdu).
-1 : Cela signifie que le producteur ne terminera pas une transmission tant que la copie suiveuse n'aura pas confirmé qu'elle a reçu les données.
Cette option offre la meilleure durabilité, nous garantissons qu'aucune information ne sera perdue tant qu'au moins une réplique synchronisée reste vivante.
Trois mécanismes, les performances diminuent dans l'ordre (le débit du producteur diminue) et la robustesse des données augmente dans l'ordre.
1. au plus tôt : réinitialisez automatiquement le décalage au décalage le plus ancien
2. au plus tard : réinitialisez automatiquement le décalage au dernier décalage (par défaut)
. 3. aucun : si le groupe de consommateurs ne trouve pas le décalage précédent, lancez une exception au consommateur.
4. Autres paramètres : lancer une exception (paramètre invalide) au consommateur
# 官方下载地址:http://kafka.apache.org/downloads # wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz tar -xzf kafka_2.12-1.1.1.tgz cd kafka_2.12-1.1.0
# 需先启动zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
# 创建一个话题,test话题2个分区 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test Created topic "test". # 显示所有话题 bin/kafka-topics.sh --list --zookeeper localhost:2181 test # 显示话题信息 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:2 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0 # 启动一个生产者(输入消息) bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [等待输入自己的内容 出现>输入即可] >i am a new msg ! >i am a good msg ? # 启动一个生产者(等待消息) # 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning [等待消息] i am a new msg ! i am a good msg ?
git clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka phpize ./configure make all -j 5 sudo make install vim [php]/php.ini extension=rdkafka.so
<?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); $rk = new RdKafka\Producer($conf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); $cf = new RdKafka\TopicConf(); $cf->set('request.required.acks', 0); $topic = $rk->newTopic("test", $cf); $option = 'qkl'; for ($i = 0; $i produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option); } $len = $rk->getOutQLen(); while ($len > 0) { $len = $rk->getOutQLen(); var_dump($len); $rk->poll(50); }
php producer.php # output int(20) int(20) int(20) int(20) int(0) # 你可以查看你刚才上面启动的消费者shell应该会输出消息 qkl . 0 qkl . 1 qkl . 2 qkl . 3 qkl . 4 qkl . 5 qkl . 6 qkl . 7 qkl . 8 qkl . 9 qkl . 10 qkl . 11 qkl . 12 qkl . 13 qkl . 14 qkl . 15 qkl . 16 qkl . 17 qkl . 18 qkl . 19
<?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); //设置消费组 $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $topicConf = new RdKafka\TopicConf(); $topicConf->set('request.required.acks', 1); //在interval.ms的时间内自动提交确认、建议不要启动 //$topicConf->set('auto.commit.enable', 1); $topicConf->set('auto.commit.enable', 0); $topicConf->set('auto.commit.interval.ms', 100); // 设置offset的存储为file //$topicConf->set('offset.store.method', 'file'); // 设置offset的存储为broker $topicConf->set('offset.store.method', 'broker'); //$topicConf->set('offset.store.path', __DIR__); //smallest:简单理解为从头开始消费,其实等价于上面的 earliest //largest:简单理解为从最新的开始消费,其实等价于上面的 latest //$topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic("test", $topicConf); // 参数1消费分区0 // RD_KAFKA_OFFSET_BEGINNING 重头开始消费 // RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费 // RD_KAFKA_OFFSET_END 最后一条消费 $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); //$topic->consumeStart(0, RD_KAFKA_OFFSET_END); // //$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { //参数1表示消费分区,这里是分区0 //参数2表示同步阻塞多久 $message = $topic->consume(0, 12 * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } }
<?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./xx.log", var_export($message, true), FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason); }); $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $allInfo = $rk->metadata(true, NULL, 60e3); $topics = $allInfo->getTopics(); echo rd_kafka_offset_tail(100); echo "--"; echo count($topics); echo "--"; foreach ($topics as $topic) { $topicName = $topic->getTopic(); if ($topicName == "__consumer_offsets") { continue ; } $partitions = $topic->getPartitions(); foreach ($partitions as $partition) { // $rf = new ReflectionClass(get_class($partition)); // foreach ($rf->getMethods() as $f) { // var_dump($f); // } // die(); $topPartition = new RdKafka\TopicPartition($topicName, $partition->getId()); echo "当前的话题:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - "; echo "offset:" . ($topPartition->getOffset()) . PHP_EOL; } }
Recommandations associées :
installation de Kafka et utilisation de l'extension Kafka-PHP, extension kafkakafka-php
Assemblage de Kafka et utilisation de Kafka-PHP extension
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!