Heim >Backend-Entwicklung >PHP-Tutorial >kafka装配及Kafka-PHP扩展的使用
kafka安装及Kafka-PHP扩展的使用
话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用。
实话说,如果用于队列的话,跟PHP比较配的,还是Redis。用的顺手,呵呵,只是Redis不能有多个consumer。但Kafka官方对PHP不支持,PHP扩展是爱好者或使用者写的。下面就开始讲Kafka的安装吧。我以CentOS6.4为例,64位。
一. 首先确认下jdk有没有安装
使用命令
[[email protected] ~]# java -<span style="color: #000000;">versionjava version </span><span style="color: #800000;">"</span><span style="color: #800000;">1.8.0_73</span><span style="color: #800000;">"</span><span style="color: #000000;">Java(TM) SE Runtime Environment (build </span><span style="color: #800080;">1.8</span>.0_73-<span style="color: #000000;">b02)Java HotSpot(TM) </span><span style="color: #800080;">64</span>-Bit Server VM (build <span style="color: #800080;">25.73</span>-b02, mixed mode)
如果有以上信息的话,就往下安装吧,有些可能是jdk对不上,那就装到对的上的。如果没有安装,就看一下下面的jdk安装方法:
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
到这个地址下载jdk8版本,我下载的是jdk-8u73-linux-x64.tar.gz,然后解压到/usr/local/jdk/下。
然后打开/etc/profile文件
[[email protected] ~]# vim /etc/profile
把下面这段代码写到文件里
export JAVA_HOME=/usr/local/jdk/jdk1.<span style="color: #800080;">8</span><span style="color: #000000;">.0_73export CLASSPATH</span>=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/<span style="color: #000000;">dt.jarexport PATH</span>=$JAVA_HOME/bin:$PATH
最后
[[email protected] ~]# source /etc/profile
这时jdk就生效了,可以使用 java -version验证下。
二. 接下来安装Kafka
1. 下载Kafka
到http://kafka.apache.org/downloads.html下载相应的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz。
2. 下载完解压到你喜欢的目录
我是解压到 /usr/local/kafka/kafka_2.9.1-0.8.2.2
3. 运行默认的Kafka
启动Zookeeper server
[[email protected] kafka_2.<span style="color: #800080;">9.1</span>-<span style="color: #800080;">0.8</span>.<span style="color: #800080;">2.2</span>]# <span style="color: #0000ff;">sh</span> bin/zookeeper-server-start.<span style="color: #0000ff;">sh</span> config/zookeeper.properties &
启动Kafka server
[[email protected] kafka_2.<span style="color: #800080;">9.1</span>-<span style="color: #800080;">0.8</span>.<span style="color: #800080;">2.2</span>]# <span style="color: #0000ff;">sh</span> bin/kafka-server-start.<span style="color: #0000ff;">sh</span> config/server.properties &
运行生产者producer
[[email protected] kafka_2.<span style="color: #800080;">9.1</span>-<span style="color: #800080;">0.8</span>.<span style="color: #800080;">2.2</span>]# <span style="color: #0000ff;">sh</span> bin/kafka-console-producer.<span style="color: #0000ff;">sh</span> --broker-list localhost:<span style="color: #800080;">9092</span> --topic test
运行消费者consumer
[[email protected] kafka_2.<span style="color: #800080;">9.1</span>-<span style="color: #800080;">0.8</span>.<span style="color: #800080;">2.2</span>]# <span style="color: #0000ff;">sh</span> bin/kafka-console-consumer.<span style="color: #0000ff;">sh</span> --zookeeper localhost:<span style="color: #800080;">2181</span> --topic test --from-beginning
这样,在producer那边输入内容,consumer马上就能接收到。
4. 当有跨机的producer或consumer连接时
需要配置config/server.properties的host.name,要不然跨机的连不上。
三. Kafka-PHP扩展
使用了一圈,就https://github.com/nmred/kafka-php可以用。
我是使用composer安装的,以下是示例:
producer.php
<span style="color: #000000;">php</span><span style="color: #0000ff;">require</span> 'vendor/autoload.php'<span style="color: #000000;">;</span><span style="color: #0000ff;">while</span> (1<span style="color: #000000;">) { </span><span style="color: #800080;">$part</span> = <span style="color: #008080;">mt_rand</span>(0, 1<span style="color: #000000;">); </span><span style="color: #800080;">$produce</span> = \Kafka\Produce::getInstance('kafka0:2181', 3000<span style="color: #000000;">); </span><span style="color: #008000;">//</span><span style="color: #008000;"> get available partitions</span> <span style="color: #800080;">$partitions</span> = <span style="color: #800080;">$produce</span>->getAvailablePartitions('topic_name'<span style="color: #000000;">); </span><span style="color: #008080;">var_dump</span>(<span style="color: #800080;">$partitions</span><span style="color: #000000;">); </span><span style="color: #008000;">//</span><span style="color: #008000;"> send message</span> <span style="color: #800080;">$produce</span>->setRequireAck(-1<span style="color: #000000;">); </span><span style="color: #800080;">$produce</span>->setMessages('topic_name', 0, <span style="color: #0000ff;">array</span>(<span style="color: #008080;">date</span>('Y-m-d H:i:s'<span style="color: #000000;">)); </span><span style="color: #008080;">sleep</span>(3<span style="color: #000000;">);}</span>
consumer.php
<span style="color: #0000ff;">require</span> 'vendor/autoload.php'<span style="color: #000000;">;</span><span style="color: #800080;">$consumer</span> = \Kafka\Consumer::getInstance('kafka0:2181'<span style="color: #000000;">);</span><span style="color: #800080;">$group</span> = 'topic_name'<span style="color: #000000;">;</span><span style="color: #800080;">$consumer</span>->setGroup(<span style="color: #800080;">$group</span><span style="color: #000000;">);</span><span style="color: #800080;">$consumer</span>->setFromOffset(<span style="color: #0000ff;">true</span><span style="color: #000000;">);</span><span style="color: #800080;">$consumer</span>->setTopic('topic_name', 0<span style="color: #000000;">);</span><span style="color: #800080;">$consumer</span>->setMaxBytes(102400<span style="color: #000000;">);</span><span style="color: #800080;">$result</span> = <span style="color: #800080;">$consumer</span>-><span style="color: #000000;">fetch();</span><span style="color: #008080;">print_r</span>(<span style="color: #800080;">$result</span><span style="color: #000000;">);</span><span style="color: #0000ff;">foreach</span> (<span style="color: #800080;">$result</span> <span style="color: #0000ff;">as</span> <span style="color: #800080;">$topicName</span> => <span style="color: #800080;">$partition</span><span style="color: #000000;">) { </span><span style="color: #0000ff;">foreach</span> (<span style="color: #800080;">$partition</span> <span style="color: #0000ff;">as</span> <span style="color: #800080;">$partId</span> => <span style="color: #800080;">$messageSet</span><span style="color: #000000;">) { </span><span style="color: #008080;">var_dump</span>(<span style="color: #800080;">$partition</span>-><span style="color: #000000;">getHighOffset()); </span><span style="color: #0000ff;">foreach</span> (<span style="color: #800080;">$messageSet</span> <span style="color: #0000ff;">as</span> <span style="color: #800080;">$message</span><span style="color: #000000;">) { </span><span style="color: #008080;">var_dump</span>((<span style="color: #0000ff;">string</span>)<span style="color: #800080;">$message</span><span style="color: #000000;">); } </span><span style="color: #008080;">var_dump</span>(<span style="color: #800080;">$partition</span>-><span style="color: #000000;">getMessageOffset()); }}</span>