Heim >Backend-Entwicklung >PHP-Tutorial >Einführung in Kafka und Installation und Test von Kafka auf Basis von PHP
Der Inhalt dieses Artikels befasst sich mit der Einführung von Kafka und der Installation und dem Testen von Kafka auf Basis von PHP. Ich hoffe, dass er Ihnen weiterhelfen kann.
Kafka ist ein verteiltes Publish-Subscribe-Messagingsystem mit hohem Durchsatz
Produzent: Produzent .
Verbraucher: Verbraucher.
Thema: Nachrichten werden in der Themenkategorie erfasst. Kafka klassifiziert Nachrichten-Seeds (Feeds) und jeder Nachrichtentyp wird als Thema bezeichnet.
Broker: Läuft in einem Cluster und kann aus einem oder mehreren Diensten bestehen, die als Broker bezeichnet werden. Verbraucher können ein oder mehrere Themen abonnieren und Daten vom Broker abrufen, um diese veröffentlichten Nachrichten zu nutzen.
1. Die Partition unter einem Thema kann nicht kleiner sein als die Anzahl der Verbraucher, das heißt, die Anzahl der Verbraucher unter einem Thema kann nicht größer sein als die Partition größer, es wird Leerlaufzeit verschwenden
2 Eine Partition unter einem Thema kann von einem Verbraucher in verschiedenen Verbrauchergruppen gleichzeitig genutzt werden
3 Eine Partition unter einem Thema kann nur von einem Verbraucher in der verwendet werden gleiche Verbrauchergruppe
Die Bestätigung des Kafka-Produzenten verfügt über 3 Mechanismen . Die Producerconfig kann beim Initialisieren des Producers durch verschiedene Werte für request.required.acks konfiguriert werden.
0: Dies bedeutet, dass der Produzent nicht auf die Bestätigung des Brokers wartet, dass die Synchronisierung abgeschlossen ist, um mit dem Senden der nächsten (Batch-)Nachricht fortzufahren. Diese Option bietet die niedrigste Latenz, aber die schwächste Haltbarkeitsgarantie (einige Daten gehen verloren, wenn der Server ausfällt, z. B. wenn der Anführer tot ist, der Produzent dies jedoch nicht weiß und der Broker die gesendeten Informationen nicht empfangen kann).
1: Dies bedeutet, dass der Produzent die nächste Nachricht sendet, nachdem der Leiter die Daten erfolgreich empfangen und bestätigt hat. Diese Option bietet eine bessere Haltbarkeit, da der Client darauf wartet, dass der Server bestätigt, dass die Anfrage erfolgreich war (die einzige Nachricht, die an den toten Leader geschrieben, aber noch nicht repliziert wurde, geht verloren).
-1: Dies bedeutet, dass der Produzent eine Übertragung erst dann abschließt, wenn die Follower-Kopie bestätigt, dass sie die Daten erhalten hat.
Diese Option bietet die beste Haltbarkeit. Wir garantieren, dass keine Informationen verloren gehen, solange mindestens ein synchronisiertes Replikat aktiv bleibt.
Drei Mechanismen: Die Leistung nimmt in der Reihenfolge ab (der Herstellerdurchsatz nimmt ab) und die Datenrobustheit nimmt in der Reihenfolge zu.
1. Der Offset wird automatisch auf den frühesten Offset zurückgesetzt.
Der Offset wird automatisch auf den neuesten Offset zurückgesetzt (Standard). 3. keine: Wenn die Verbrauchergruppe den vorherigen Offset nicht findet, wird eine Ausnahme an den Verbraucher ausgelöst.
4. Andere Parameter: Eine Ausnahme (ungültiger Parameter) an den Verbraucher auslösen
# 官方下载地址:http://kafka.apache.org/downloads # wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz tar -xzf kafka_2.12-1.1.1.tgz cd kafka_2.12-1.1.0
# 需先启动zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
# 创建一个话题,test话题2个分区 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test Created topic "test". # 显示所有话题 bin/kafka-topics.sh --list --zookeeper localhost:2181 test # 显示话题信息 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:2 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0 # 启动一个生产者(输入消息) bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [等待输入自己的内容 出现>输入即可] >i am a new msg ! >i am a good msg ? # 启动一个生产者(等待消息) # 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning [等待消息] i am a new msg ! i am a good msg ?
git clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka phpize ./configure make all -j 5 sudo make install vim [php]/php.ini extension=rdkafka.so
<?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); $rk = new RdKafka\Producer($conf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); $cf = new RdKafka\TopicConf(); $cf->set('request.required.acks', 0); $topic = $rk->newTopic("test", $cf); $option = 'qkl'; for ($i = 0; $i produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option); } $len = $rk->getOutQLen(); while ($len > 0) { $len = $rk->getOutQLen(); var_dump($len); $rk->poll(50); }
php producer.php # output int(20) int(20) int(20) int(20) int(0) # 你可以查看你刚才上面启动的消费者shell应该会输出消息 qkl . 0 qkl . 1 qkl . 2 qkl . 3 qkl . 4 qkl . 5 qkl . 6 qkl . 7 qkl . 8 qkl . 9 qkl . 10 qkl . 11 qkl . 12 qkl . 13 qkl . 14 qkl . 15 qkl . 16 qkl . 17 qkl . 18 qkl . 19
<?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); //设置消费组 $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $topicConf = new RdKafka\TopicConf(); $topicConf->set('request.required.acks', 1); //在interval.ms的时间内自动提交确认、建议不要启动 //$topicConf->set('auto.commit.enable', 1); $topicConf->set('auto.commit.enable', 0); $topicConf->set('auto.commit.interval.ms', 100); // 设置offset的存储为file //$topicConf->set('offset.store.method', 'file'); // 设置offset的存储为broker $topicConf->set('offset.store.method', 'broker'); //$topicConf->set('offset.store.path', __DIR__); //smallest:简单理解为从头开始消费,其实等价于上面的 earliest //largest:简单理解为从最新的开始消费,其实等价于上面的 latest //$topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic("test", $topicConf); // 参数1消费分区0 // RD_KAFKA_OFFSET_BEGINNING 重头开始消费 // RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费 // RD_KAFKA_OFFSET_END 最后一条消费 $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); //$topic->consumeStart(0, RD_KAFKA_OFFSET_END); // //$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { //参数1表示消费分区,这里是分区0 //参数2表示同步阻塞多久 $message = $topic->consume(0, 12 * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } }
<?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./xx.log", var_export($message, true), FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason); }); $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $allInfo = $rk->metadata(true, NULL, 60e3); $topics = $allInfo->getTopics(); echo rd_kafka_offset_tail(100); echo "--"; echo count($topics); echo "--"; foreach ($topics as $topic) { $topicName = $topic->getTopic(); if ($topicName == "__consumer_offsets") { continue ; } $partitions = $topic->getPartitions(); foreach ($partitions as $partition) { // $rf = new ReflectionClass(get_class($partition)); // foreach ($rf->getMethods() as $f) { // var_dump($f); // } // die(); $topPartition = new RdKafka\TopicPartition($topicName, $partition->getId()); echo "当前的话题:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - "; echo "offset:" . ($topPartition->getOffset()) . PHP_EOL; } }
kafka Installation und Verwendung der Kafka-PHP-Erweiterung, Kafkakafka-PHP-Erweiterung
Kafka-Assembly und Verwendung der Kafka-PHP-Erweiterung
Das obige ist der detaillierte Inhalt vonEinführung in Kafka und Installation und Test von Kafka auf Basis von PHP. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!