Rumah >php教程 >php手册 >kafka安装及Kafka-PHP扩展的使用,kafkakafka-php扩展

kafka安装及Kafka-PHP扩展的使用,kafkakafka-php扩展

WBOY
WBOYasal
2016-06-13 08:46:052370semak imbas

kafka安装及Kafka-PHP扩展的使用,kafkakafka-php扩展

话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用。

实话说,如果用于队列的话,跟PHP比较配的,还是Redis。用的顺手,呵呵,只是Redis不能有多个consumer。但Kafka官方对PHP不支持,PHP扩展是爱好者或使用者写的。下面就开始讲Kafka的安装吧。我以CentOS6.4为例,64位。

一. 首先确认下jdk有没有安装

使用命令

[root@localhost ~]# java -<span>version
java version </span><span>"</span><span>1.8.0_73</span><span>"</span><span>
Java(TM) SE Runtime Environment (build </span><span>1.8</span>.0_73-<span>b02)
Java HotSpot(TM) </span><span>64</span>-Bit Server VM (build <span>25.73</span>-b02, mixed mode)

如果有以上信息的话,就往下安装吧,有些可能是jdk对不上,那就装到对的上的。如果没有安装,就看一下下面的jdk安装方法:

http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

到这个地址下载jdk8版本,我下载的是jdk-8u73-linux-x64.tar.gz,然后解压到/usr/local/jdk/下。

然后打开/etc/profile文件

[root@localhost ~]# vim /etc/profile

把下面这段代码写到文件里

export JAVA_HOME=/usr/local/jdk/jdk1.<span>8</span><span>.0_73
export CLASSPATH</span>=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/<span>dt.jar
export PATH</span>=$JAVA_HOME/bin:$PATH

最后

[root@localhost ~]# source /etc/profile

这时jdk就生效了,可以使用 java -version验证下。

二. 接下来安装Kafka

1. 下载Kafka

到http://kafka.apache.org/downloads.html下载相应的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz。

2. 下载完解压到你喜欢的目录

我是解压到 /usr/local/kafka/kafka_2.9.1-0.8.2.2

3. 运行默认的Kafka

启动Zookeeper server

[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/zookeeper-server-start.<span>sh</span> config/zookeeper.properties &

启动Kafka server

[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-server-start.<span>sh</span> config/server.properties &

运行生产者producer

[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-console-producer.<span>sh</span> --broker-list localhost:<span>9092</span> --topic test

运行消费者consumer

[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-console-consumer.<span>sh</span> --zookeeper localhost:<span>2181</span> --topic test --from-beginning

这样,在producer那边输入内容,consumer马上就能接收到。

4. 当有跨机的producer或consumer连接时

需要配置config/server.properties的host.name,要不然跨机的连不上。

三. Kafka-PHP扩展

使用了一圈,就https://github.com/nmred/kafka-php可以用。

我是使用composer安装的,以下是示例:

producer.php

<?<span>php
</span><span>require</span> 'vendor/autoload.php'<span>;

</span><span>while</span> (1<span>) {
    </span><span>$part</span> = <span>mt_rand</span>(0, 1<span>);
    </span><span>$produce</span> = \Kafka\Produce::getInstance('kafka0:2181', 3000<span>);
    </span><span>//</span><span> get available partitions</span>
    <span>$partitions</span> = <span>$produce</span>->getAvailablePartitions('topic_name'<span>);
    </span><span>var_dump</span>(<span>$partitions</span><span>);
    </span><span>//</span><span> send message</span>
    <span>$produce</span>->setRequireAck(-1<span>);
    </span><span>$produce</span>->setMessages('topic_name', 0, <span>array</span>(<span>date</span>('Y-m-d H:i:s'<span>));
   
    </span><span>sleep</span>(3<span>);
}</span>

consumer.php

<span>require</span> 'vendor/autoload.php'<span>;

</span><span>$consumer</span> = \Kafka\Consumer::getInstance('kafka0:2181'<span>);
</span><span>$group</span> = 'topic_name'<span>;
</span><span>$consumer</span>->setGroup(<span>$group</span><span>);
</span><span>$consumer</span>->setFromOffset(<span>true</span><span>);
</span><span>$consumer</span>->setTopic('topic_name', 0<span>);
</span><span>$consumer</span>->setMaxBytes(102400<span>);
</span><span>$result</span> = <span>$consumer</span>-><span>fetch();
</span><span>print_r</span>(<span>$result</span><span>);
</span><span>foreach</span> (<span>$result</span> <span>as</span> <span>$topicName</span> => <span>$partition</span><span>) {
    </span><span>foreach</span> (<span>$partition</span> <span>as</span> <span>$partId</span> => <span>$messageSet</span><span>) {
    </span><span>var_dump</span>(<span>$partition</span>-><span>getHighOffset());
        </span><span>foreach</span> (<span>$messageSet</span> <span>as</span> <span>$message</span><span>) {
            </span><span>var_dump</span>((<span>string</span>)<span>$message</span><span>);
        }
    </span><span>var_dump</span>(<span>$partition</span>-><span>getMessageOffset());
    }
}</span>

 

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn