Kafka is a high-throughput distributed publish-subscribe messaging system

Kafka roles you must know

producer: producer .
consumer: consumer.
topic: Messages are recorded in the topic category. Kafka classifies message seeds (Feeds), and each type of message is called a topic.
broker: Runs in a cluster and can be composed of one or more services. Each service is called a broker; consumers can subscribe to one or more topics and pull data from the Broker to consume these. released news.

Classic model

1. The partition under a topic cannot be less than the number of consumers, that is, the number of consumers under a topic cannot be greater than the partition. If it is larger, it will waste free time
2 . A partition under a topic can be consumed by a consumer in different consumer groups at the same time
3. A partition under a topic can only be consumed by one consumer of the same consumer group

Introduction to Kafka and installation and testing of PHP-based kafka

Common parameter description


Kafka producer’s ack has 3 mechanisms. The producerconfig when initializing the producer can be configured through Different values ​​for request.required.acks are implemented.

0: This means that the producer does not wait for confirmation from the broker that the synchronization is complete to continue sending the next (batch) message. This option provides the lowest latency but the weakest durability guarantee (some data will be lost when the server fails, such as the leader is dead, but the producer does not know it, and the broker cannot receive the information sent).

1: This means that the producer sends the next message after the leader has successfully received the data and confirmed it. This option provides better durability as clients wait for the server to confirm that the request was successful (the only messages that were written to the dead leader but not yet replicated will be lost).

-1: This means that the producer will not complete a transmission until the follower copy confirms receipt of the data.
This option provides the best durability, we guarantee that no information will be lost as long as at least one synchronized replica remains alive.

Three mechanisms, the performance decreases in order (producer throughput decreases), and the data robustness increases in order.


1. earliest: automatically reset the offset to the earliest offset
2. latest: automatically reset the offset to the latest Offset (default)
3. none: If the consumer group does not find the previous offset, an exception is thrown to the consumer.
4. Other parameters: throw an exception (invalid parameter) to the consumer

kafka installation and simple test

Install kafka (no installation required, just unpack)

# 官方下载地址:http://kafka.apache.org/downloads
# wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz
tar -xzf kafka_2.12-1.1.1.tgz
cd kafka_2.12-1.1.0

Start kafka server

# 需先启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

Start kafka client test

# 创建一个话题,test话题2个分区
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
Created topic "test".

# 显示所有话题
bin/kafka-topics.sh --list --zookeeper localhost:2181

# 显示话题信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:2    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: test    Partition: 1    Leader: 0    Replicas: 0    Isr: 0

# 启动一个生产者(输入消息)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[等待输入自己的内容 出现>输入即可]
>i am a new msg !
>i am a good msg ?

# 启动一个生产者(等待消息) 
# 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
i am a new msg !
i am a good msg ?

Install the php extension of kafka

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
make all -j 5
sudo make install

vim [php]/php.ini

php code practice


<?php $conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);

$rk = new RdKafka\Producer($conf);

$cf = new RdKafka\TopicConf();
$cf->set('request.required.acks', 0);
$topic = $rk->newTopic("test", $cf);

$option = 'qkl';
for ($i = 0; $i produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);

$len = $rk->getOutQLen();
while ($len > 0) {
    $len = $rk->getOutQLen();

Run producer

php producer.php
# output


# 你可以查看你刚才上面启动的消费者shell应该会输出消息
qkl . 0
qkl . 1
qkl . 2
qkl . 3
qkl . 4
qkl . 5
qkl . 6
qkl . 7
qkl . 8
qkl . 9
qkl . 10
qkl . 11
qkl . 12
qkl . 13
qkl . 14
qkl . 15
qkl . 16
qkl . 17
qkl . 18
qkl . 19


<?php $conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND);
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);

$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);

$topicConf = new RdKafka\TopicConf();
$topicConf->set('request.required.acks', 1);
//$topicConf->set('auto.commit.enable', 1);
$topicConf->set('auto.commit.enable', 0);
$topicConf->set('auto.commit.interval.ms', 100);

// 设置offset的存储为file
//$topicConf->set('offset.store.method', 'file');
// 设置offset的存储为broker
 $topicConf->set('offset.store.method', 'broker');
//$topicConf->set('offset.store.path', __DIR__);

//smallest:简单理解为从头开始消费,其实等价于上面的 earliest
//largest:简单理解为从最新的开始消费,其实等价于上面的 latest
//$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic("test", $topicConf);

// 参数1消费分区0
// RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
//$topic->consumeStart(0, RD_KAFKA_OFFSET_END); //
//$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume(0, 12 * 1000);
    switch ($message->err) {
            echo "No more messages; will wait for more\n";
            echo "Timed out\n";
            throw new \Exception($message->errstr(), $message->err);

View server metadata (topic/partition/broker)

<?php $conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./xx.log", var_export($message, true), FILE_APPEND);
$conf->setErrorCb(function ($kafka, $err, $reason) {
    printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);

$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);

$allInfo = $rk->metadata(true, NULL, 60e3);

$topics = $allInfo->getTopics();

echo rd_kafka_offset_tail(100);
echo "--";

echo count($topics);
echo "--";

foreach ($topics as $topic) {

    $topicName = $topic->getTopic();
    if ($topicName == "__consumer_offsets") {
        continue ;

    $partitions = $topic->getPartitions();
    foreach ($partitions as $partition) {
//        $rf = new ReflectionClass(get_class($partition));
//        foreach ($rf->getMethods() as $f) {
//            var_dump($f);
//        }
//        die();
        $topPartition = new RdKafka\TopicPartition($topicName, $partition->getId());
        echo  "当前的话题:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";
        echo  "offset:" . ($topPartition->getOffset()) . PHP_EOL;

