Home >Backend Development >PHP Tutorial >Teach you how to quickly install php+kafka

Teach you how to quickly install php+kafka

慕斯
慕斯forward
2021-06-30 09:39:153864browse

We have learned so much about PHP. Today I will teach you how to quickly install PHP kafka. If you don’t know how to do it, then follow this article and continue learning

1. Install java and set relevant environment variables

> wget https://download.java.net/openjdk/jdk7u75/ri/openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz
> tar zxvf openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz
> mv java-se-7u75-ri/ /opt/
> export JAVA_HOME=/opt/java-se-7u75-ri
> export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin
> export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar

#验证安装
> java -verison
openjdk version "1.7.0_75"
OpenJDK Runtime Environment (build 1.7.0_75-b13)
OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)

2. Install kafka, here we take version 0.10.2 as an example

> wget http://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
> tar zxvf kafka_2.11-0.10.2.0.tgz
> mv kafka_2.11-0.10.2.0/ /opt/kafka
> cd /opt/kafka

#启动zookeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

#启动kafka
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

#尝试创建一个topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

#生产者写入消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

#消费者消费消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

3. Install the C operation library of kafka

> wget https://github.com/edenhill/librdkafka/archive/v1.3.0.tar.gz
> tar zxvf v1.3.0.tar.gz
> cd librdkafka-1.3.0
> ./configure
> make && make install

4. Install the kafka extension of php. Here select the php-rdkafka extension https://github.com/arnaud-lb/php-rdkafka

> wget https://github.com/arnaud-lb/php-rdkafka/archive/4.0.2.tar.gz
> tar 4.0.2.tar.gz
> cd php-rdkafka-4.0.2
> /opt/php7/bin/phpize
> ./configure --with-php-config=/opt/php7/bin/php-config
> make && make install

Modify php.ini and add extension=rdkafka.so

5. Install the IDE code prompt file of rdkafka

> composer create-project kwn/php-rdkafka-stubs php-rdkafka-stubs

Take phpstrom as an example, in the External of your project Right-click on Libraries, select Configure PHP Include Paths, and add the path just now.

6. Write php test code

producer:

<?php
$conf = new RdKafka\Conf();
$conf->set(&#39;log_level&#39;, LOG_ERR);
$conf->set(&#39;debug&#39;, &#39;admin&#39;);
$conf->set(&#39;metadata.broker.list&#39;, &#39;localhost:9092&#39;);

//If you need to produce exactly once and want to keep the original produce order, uncomment the line below
//$conf->set(&#39;enable.idempotence&#39;, &#39;true&#39;);

$producer = new RdKafka\Producer($conf);

$topic = $producer->newTopic("test2");

for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    $producer->poll(0);
}

for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}

if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
    throw new \RuntimeException(&#39;Was unable to flush, messages might be lost!&#39;);
}

low-level consumer:

<?php

$conf = new RdKafka\Conf();
$conf->set(&#39;log_level&#39;, LOG_ERR);
$conf->set(&#39;debug&#39;, &#39;admin&#39;);

// Set the group id. This is required when storing offsets on the broker
$conf->set(&#39;group.id&#39;, &#39;myConsumerGroup&#39;);

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$topicConf = new RdKafka\TopicConf();
$topicConf->set(&#39;auto.commit.interval.ms&#39;, 100);

// Set the offset store method to &#39;file&#39;
$topicConf->set(&#39;offset.store.method&#39;, &#39;broker&#39;);

// Alternatively, set the offset store method to &#39;none&#39;
// $topicConf->set(&#39;offset.store.method&#39;, &#39;none&#39;);

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// &#39;smallest&#39;: start from the beginning
$topicConf->set(&#39;auto.offset.reset&#39;, &#39;smallest&#39;);

$topic = $rk->newTopic("test2", $topicConf);

// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume(0, 10000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            print_r($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

high-level consumer:

<?php

$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;

         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
             echo "Revoke: ";
             var_dump($partitions);
             $kafka->assign(NULL);
             break;

         default:
            throw new \Exception($err);
    }
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set(&#39;group.id&#39;, &#39;myConsumerGroup&#39;);

// Initial list of Kafka brokers
$conf->set(&#39;metadata.broker.list&#39;, &#39;127.0.0.1&#39;);

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// &#39;smallest&#39;: start from the beginning
$conf->set(&#39;auto.offset.reset&#39;, &#39;smallest&#39;);

$consumer = new RdKafka\KafkaConsumer($conf);

// Subscribe to topic &#39;test2&#39;
$consumer->subscribe([&#39;test2&#39;]);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            print_r($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
    sleep(2);
}

Recommended study: "PHP Video Tutorial

The above is the detailed content of Teach you how to quickly install php+kafka. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:csdn.net. If there is any infringement, please contact admin@php.cn delete