Heim  >  Artikel  >  Backend-Entwicklung  >  Einführung in Kafka und Installation und Test von Kafka auf Basis von PHP

Einführung in Kafka und Installation und Test von Kafka auf Basis von PHP

不言
不言Original
2018-07-26 10:11:203618Durchsuche

Der Inhalt dieses Artikels befasst sich mit der Einführung von Kafka und der Installation und dem Testen von Kafka auf Basis von PHP. Ich hoffe, dass er Ihnen weiterhelfen kann.

Einführung

Kafka ist ein verteiltes Publish-Subscribe-Messagingsystem mit hohem Durchsatz

Kafka-Rollen, die Sie kennen müssen

Produzent: Produzent .
Verbraucher: Verbraucher.
Thema: Nachrichten werden in der Themenkategorie erfasst. Kafka klassifiziert Nachrichten-Seeds (Feeds) und jeder Nachrichtentyp wird als Thema bezeichnet.
Broker: Läuft in einem Cluster und kann aus einem oder mehreren Diensten bestehen, die als Broker bezeichnet werden. Verbraucher können ein oder mehrere Themen abonnieren und Daten vom Broker abrufen, um diese veröffentlichten Nachrichten zu nutzen.

Klassisches Modell

1. Die Partition unter einem Thema kann nicht kleiner sein als die Anzahl der Verbraucher, das heißt, die Anzahl der Verbraucher unter einem Thema kann nicht größer sein als die Partition größer, es wird Leerlaufzeit verschwenden
2 Eine Partition unter einem Thema kann von einem Verbraucher in verschiedenen Verbrauchergruppen gleichzeitig genutzt werden
3 Eine Partition unter einem Thema kann nur von einem Verbraucher in der verwendet werden gleiche Verbrauchergruppe

Einführung in Kafka und Installation und Test von Kafka auf Basis von PHP

Gemeinsame Parameterbeschreibung

request.required.acks

Die Bestätigung des Kafka-Produzenten verfügt über 3 Mechanismen . Die Producerconfig kann beim Initialisieren des Producers durch verschiedene Werte für request.required.acks konfiguriert werden.

0: Dies bedeutet, dass der Produzent nicht auf die Bestätigung des Brokers wartet, dass die Synchronisierung abgeschlossen ist, um mit dem Senden der nächsten (Batch-)Nachricht fortzufahren. Diese Option bietet die niedrigste Latenz, aber die schwächste Haltbarkeitsgarantie (einige Daten gehen verloren, wenn der Server ausfällt, z. B. wenn der Anführer tot ist, der Produzent dies jedoch nicht weiß und der Broker die gesendeten Informationen nicht empfangen kann).

1: Dies bedeutet, dass der Produzent die nächste Nachricht sendet, nachdem der Leiter die Daten erfolgreich empfangen und bestätigt hat. Diese Option bietet eine bessere Haltbarkeit, da der Client darauf wartet, dass der Server bestätigt, dass die Anfrage erfolgreich war (die einzige Nachricht, die an den toten Leader geschrieben, aber noch nicht repliziert wurde, geht verloren).

-1: Dies bedeutet, dass der Produzent eine Übertragung erst dann abschließt, wenn die Follower-Kopie bestätigt, dass sie die Daten erhalten hat.
Diese Option bietet die beste Haltbarkeit. Wir garantieren, dass keine Informationen verloren gehen, solange mindestens ein synchronisiertes Replikat aktiv bleibt.

Drei Mechanismen: Die Leistung nimmt in der Reihenfolge ab (der Herstellerdurchsatz nimmt ab) und die Datenrobustheit nimmt in der Reihenfolge zu.

auto.offset.reset

1. Der Offset wird automatisch auf den frühesten Offset zurückgesetzt.
Der Offset wird automatisch auf den neuesten Offset zurückgesetzt (Standard). 3. keine: Wenn die Verbrauchergruppe den vorherigen Offset nicht findet, wird eine Ausnahme an den Verbraucher ausgelöst.
4. Andere Parameter: Eine Ausnahme (ungültiger Parameter) an den Verbraucher auslösen

Kafka-Installation und einfacher Test

Kafka installieren (keine Installation erforderlich, einfach entpacken)

# 官方下载地址:http://kafka.apache.org/downloads
# wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz
tar -xzf kafka_2.12-1.1.1.tgz
cd kafka_2.12-1.1.0

Kafka-Server starten

# 需先启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

Kafka-Client-Test starten

# 创建一个话题,test话题2个分区
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
Created topic "test".

# 显示所有话题
bin/kafka-topics.sh --list --zookeeper localhost:2181
test

# 显示话题信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:2    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: test    Partition: 1    Leader: 0    Replicas: 0    Isr: 0


# 启动一个生产者(输入消息)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[等待输入自己的内容 出现>输入即可]
>i am a new msg !
>i am a good msg ?

# 启动一个生产者(等待消息) 
# 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
[等待消息]
i am a new msg !
i am a good msg ?

PHP-Erweiterung von Kafka installieren

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
sudo make install

vim [php]/php.ini
extension=rdkafka.so

PHP-Code-Übung

Produzent

<?php $conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

$rk = new RdKafka\Producer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$cf = new RdKafka\TopicConf();
$cf->set('request.required.acks', 0);
$topic = $rk->newTopic("test", $cf);

$option = 'qkl';
for ($i = 0; $i produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
}


$len = $rk->getOutQLen();
while ($len > 0) {
    $len = $rk->getOutQLen();
    var_dump($len);
    $rk->poll(50);
}

Produzent ausführen

php producer.php
# output

int(20)
int(20)
int(20)
int(20)
int(0)

# 你可以查看你刚才上面启动的消费者shell应该会输出消息
qkl . 0
qkl . 1
qkl . 2
qkl . 3
qkl . 4
qkl . 5
qkl . 6
qkl . 7
qkl . 8
qkl . 9
qkl . 10
qkl . 11
qkl . 12
qkl . 13
qkl . 14
qkl . 15
qkl . 16
qkl . 17
qkl . 18
qkl . 19

Consumer

<?php $conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

//设置消费组
$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$topicConf = new RdKafka\TopicConf();
$topicConf->set('request.required.acks', 1);
//在interval.ms的时间内自动提交确认、建议不要启动
//$topicConf->set('auto.commit.enable', 1);
$topicConf->set('auto.commit.enable', 0);
$topicConf->set('auto.commit.interval.ms', 100);

// 设置offset的存储为file
//$topicConf->set('offset.store.method', 'file');
// 设置offset的存储为broker
 $topicConf->set('offset.store.method', 'broker');
//$topicConf->set('offset.store.path', __DIR__);

//smallest:简单理解为从头开始消费,其实等价于上面的 earliest
//largest:简单理解为从最新的开始消费,其实等价于上面的 latest
//$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic("test", $topicConf);

// 参数1消费分区0
// RD_KAFKA_OFFSET_BEGINNING 重头开始消费
// RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费
// RD_KAFKA_OFFSET_END 最后一条消费
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
//$topic->consumeStart(0, RD_KAFKA_OFFSET_END); //
//$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    //参数1表示消费分区,这里是分区0
    //参数2表示同步阻塞多久
    $message = $topic->consume(0, 12 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

Servermetadaten anzeigen (Thema/Partition/Broker)

<?php $conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./xx.log", var_export($message, true), FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
});

$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$allInfo = $rk->metadata(true, NULL, 60e3);

$topics = $allInfo->getTopics();

echo rd_kafka_offset_tail(100);
echo "--";

echo count($topics);
echo "--";


foreach ($topics as $topic) {

    $topicName = $topic->getTopic();
    if ($topicName == "__consumer_offsets") {
        continue ;
    }

    $partitions = $topic->getPartitions();
    foreach ($partitions as $partition) {
//        $rf = new ReflectionClass(get_class($partition));
//        foreach ($rf->getMethods() as $f) {
//            var_dump($f);
//        }
//        die();
        $topPartition = new RdKafka\TopicPartition($topicName, $partition->getId());
        echo  "当前的话题:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";
        echo  "offset:" . ($topPartition->getOffset()) . PHP_EOL;
    }
}

Verwandte Empfehlungen:

kafka Installation und Verwendung der Kafka-PHP-Erweiterung, Kafkakafka-PHP-Erweiterung

Kafka-Assembly und Verwendung der Kafka-PHP-Erweiterung

Das obige ist der detaillierte Inhalt vonEinführung in Kafka und Installation und Test von Kafka auf Basis von PHP. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn