Maison  >  Article  >  développement back-end  >  Introduction à Kafka et installation et test de Kafka basé sur PHP

Introduction à Kafka et installation et test de Kafka basé sur PHP

不言
不言original
2018-07-26 10:11:203676parcourir

Le contenu de cet article concerne l'introduction de Kafka et l'installation et les tests de Kafka basé sur PHP. Le contenu est très détaillé. Les amis dans le besoin peuvent s'y référer.

Introduction

Kafka est un système de messagerie de publication-abonnement distribué à haut débit

Rôles Kafka que vous devez connaître

producteur : producteur .
consommateur : consommateur.
sujet : les messages sont enregistrés dans la catégorie du sujet. Kafka classe les graines de message (Flux), et chaque type de message est appelé un sujet.
Broker : fonctionne dans un cluster et peut être composé d'un ou plusieurs services. Chaque service est appelé un courtier ; les consommateurs peuvent s'abonner à un ou plusieurs sujets et extraire les données du courtier pour les consulter.

Modèle classique

1. La partition sous un sujet ne peut pas être inférieure au nombre de consommateurs, c'est-à-dire que le nombre de consommateurs sous un sujet ne peut pas être supérieur à la partition si c'est le cas. plus grand, cela fera perdre du temps d'inactivité
2 . Une partition sous un sujet peut être consommée par un consommateur dans différents groupes de consommateurs en même temps
3. Une partition sous un sujet ne peut être consommée que par un consommateur dans le même groupe. même groupe de consommateurs

Introduction à Kafka et installation et test de Kafka basé sur PHP

Description des paramètres communs

request.required.acks

L'ack du producteur Kafka a 3 mécanismes . La configuration du producteur lors de l'initialisation du producteur peut être configurée via Différentes valeurs pour request.required.acks sont implémentées.

0 : Cela signifie que le producteur n'attend pas la confirmation du courtier que la synchronisation est terminée pour continuer à envoyer le prochain message (batch). Cette option offre la latence la plus faible mais la garantie de durabilité la plus faible (certaines données seront perdues en cas de panne du serveur, comme par exemple si le leader est mort, mais le producteur ne le sait pas et le courtier ne peut pas recevoir les informations envoyées).

1 : Cela signifie que le producteur envoie le message suivant une fois que le leader a reçu avec succès les données et les a confirmées. Cette option offre une meilleure durabilité car le client attend que le serveur confirme que la requête a réussi (le seul message qui a été écrit au leader mort mais pas encore répliqué sera perdu).

-1 : Cela signifie que le producteur ne terminera pas une transmission tant que la copie suiveuse n'aura pas confirmé qu'elle a reçu les données.
Cette option offre la meilleure durabilité, nous garantissons qu'aucune information ne sera perdue tant qu'au moins une réplique synchronisée reste vivante.

Trois mécanismes, les performances diminuent dans l'ordre (le débit du producteur diminue) et la robustesse des données augmente dans l'ordre.

auto.offset.reset

1. au plus tôt : réinitialisez automatiquement le décalage au décalage le plus ancien
2. au plus tard : réinitialisez automatiquement le décalage au dernier décalage (par défaut)
. 3. aucun : si le groupe de consommateurs ne trouve pas le décalage précédent, lancez une exception au consommateur.
4. Autres paramètres : lancer une exception (paramètre invalide) au consommateur

Installation de Kafka et test simple

Installer Kafka (aucune installation requise, il suffit de déballer)

# 官方下载地址:http://kafka.apache.org/downloads
# wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz
tar -xzf kafka_2.12-1.1.1.tgz
cd kafka_2.12-1.1.0

Démarrer le serveur Kafka

# 需先启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

Démarrer le test du client Kafka

# 创建一个话题,test话题2个分区
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
Created topic "test".

# 显示所有话题
bin/kafka-topics.sh --list --zookeeper localhost:2181
test

# 显示话题信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:2    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: test    Partition: 1    Leader: 0    Replicas: 0    Isr: 0


# 启动一个生产者(输入消息)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[等待输入自己的内容 出现>输入即可]
>i am a new msg !
>i am a good msg ?

# 启动一个生产者(等待消息) 
# 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
[等待消息]
i am a new msg !
i am a good msg ?

Installer l'extension php Kafka

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
sudo make install

vim [php]/php.ini
extension=rdkafka.so

Pratique du code php

Producteur

<?php $conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

$rk = new RdKafka\Producer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$cf = new RdKafka\TopicConf();
$cf->set('request.required.acks', 0);
$topic = $rk->newTopic("test", $cf);

$option = 'qkl';
for ($i = 0; $i produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
}


$len = $rk->getOutQLen();
while ($len > 0) {
    $len = $rk->getOutQLen();
    var_dump($len);
    $rk->poll(50);
}

Exécuter le producteur

php producer.php
# output

int(20)
int(20)
int(20)
int(20)
int(0)

# 你可以查看你刚才上面启动的消费者shell应该会输出消息
qkl . 0
qkl . 1
qkl . 2
qkl . 3
qkl . 4
qkl . 5
qkl . 6
qkl . 7
qkl . 8
qkl . 9
qkl . 10
qkl . 11
qkl . 12
qkl . 13
qkl . 14
qkl . 15
qkl . 16
qkl . 17
qkl . 18
qkl . 19

Consommateur

<?php $conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

//设置消费组
$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$topicConf = new RdKafka\TopicConf();
$topicConf->set('request.required.acks', 1);
//在interval.ms的时间内自动提交确认、建议不要启动
//$topicConf->set('auto.commit.enable', 1);
$topicConf->set('auto.commit.enable', 0);
$topicConf->set('auto.commit.interval.ms', 100);

// 设置offset的存储为file
//$topicConf->set('offset.store.method', 'file');
// 设置offset的存储为broker
 $topicConf->set('offset.store.method', 'broker');
//$topicConf->set('offset.store.path', __DIR__);

//smallest:简单理解为从头开始消费,其实等价于上面的 earliest
//largest:简单理解为从最新的开始消费,其实等价于上面的 latest
//$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic("test", $topicConf);

// 参数1消费分区0
// RD_KAFKA_OFFSET_BEGINNING 重头开始消费
// RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费
// RD_KAFKA_OFFSET_END 最后一条消费
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
//$topic->consumeStart(0, RD_KAFKA_OFFSET_END); //
//$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    //参数1表示消费分区,这里是分区0
    //参数2表示同步阻塞多久
    $message = $topic->consume(0, 12 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

Afficher les métadonnées du serveur (sujet/partition/courtier)

<?php $conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./xx.log", var_export($message, true), FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
});

$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$allInfo = $rk->metadata(true, NULL, 60e3);

$topics = $allInfo->getTopics();

echo rd_kafka_offset_tail(100);
echo "--";

echo count($topics);
echo "--";


foreach ($topics as $topic) {

    $topicName = $topic->getTopic();
    if ($topicName == "__consumer_offsets") {
        continue ;
    }

    $partitions = $topic->getPartitions();
    foreach ($partitions as $partition) {
//        $rf = new ReflectionClass(get_class($partition));
//        foreach ($rf->getMethods() as $f) {
//            var_dump($f);
//        }
//        die();
        $topPartition = new RdKafka\TopicPartition($topicName, $partition->getId());
        echo  "当前的话题:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";
        echo  "offset:" . ($topPartition->getOffset()) . PHP_EOL;
    }
}

Recommandations associées :

installation de Kafka et utilisation de l'extension Kafka-PHP, extension kafkakafka-php

Assemblage de Kafka et utilisation de Kafka-PHP extension

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn