ホームページ  >  記事  >  バックエンド開発  >  Kafka アセンブリと Kafka-PHP 拡張機能の使用

Kafka アセンブリと Kafka-PHP 拡張機能の使用

WBOY
WBOYオリジナル
2016-06-13 12:28:321525ブラウズ

Kafka のインストールと Kafka-PHP 拡張機能の使い方

使った後は生産性が高くないと時間が経つと忘れてしまうと言われているので、ここでお試し版の Kafka のインストール手順と、 PHP拡張機能のトライアル。

正直に言うと、キューに使用する場合、Redis の方が PHP と互換性があります。使い方は簡単です (笑)。しかし、Redis は複数のコンシューマーを持つことができません。ただし、Kafka は PHP を正式にサポートしておらず、PHP 拡張機能は愛好家やユーザーによって作成されています。まずはKafkaのインストールから始めましょう。 CentOS6.4(64ビット)を例に挙げます。

1. まず、jdk がインストールされているかどうかを確認します

コマンドを使用します

[[email protected] ~]# java -<span style="color: #000000;">versionjava version </span><span style="color: #800000;">"</span><span style="color: #800000;">1.8.0_73</span><span style="color: #800000;">"</span><span style="color: #000000;">Java(TM) SE Runtime Environment (build </span><span style="color: #800080;">1.8</span>.0_73-<span style="color: #000000;">b02)Java HotSpot(TM) </span><span style="color: #800080;">64</span>-Bit Server VM (build <span style="color: #800080;">25.73</span>-b02, mixed mode)

上記の情報がある場合は、インストールしてください. 中にはjdkが間違っている可能性もあるので、正しいものをインストールしてください。インストールされていない場合は、次の JDK インストール方法を確認してください:

http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

このアドレスにアクセスして、jdk8 バージョンをダウンロードします。jdk-8u73-linux-x64.tar.gz をダウンロードし、/usr/local/jdk/ に解凍しました。

次に、/etc/profile ファイルを開きます

[[email protected] ~]# vim /etc/profile

次のコードをファイルに書き込みます

export JAVA_HOME=/usr/local/jdk/jdk1.<span style="color: #800080;">8</span><span style="color: #000000;">.0_73export CLASSPATH</span>=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/<span style="color: #000000;">dt.jarexport PATH</span>=$JAVA_HOME/bin:$PATH

最後に

[[email protected] ~]# source /etc/profile

この時点で jdk が有効になります。java -version を使用して確認できます。

2. 次に Kafka をインストールします

1. Kafka をダウンロードします

http://kafka.apache.org/downloads.html に移動して、対応するバージョンをダウンロードします。 kafka_2.9.1-0.8.2.2.tgz を使用してください。

2. ダウンロード後、好きなディレクトリに解凍します

/usr/local/kafka/kafka_2.9.1-0.8.2.2に解凍しました

3.デフォルトの Kafka

Zookeeper サーバーの起動

[[email protected] kafka_2.<span style="color: #800080;">9.1</span>-<span style="color: #800080;">0.8</span>.<span style="color: #800080;">2.2</span>]# <span style="color: #0000ff;">sh</span> bin/zookeeper-server-start.<span style="color: #0000ff;">sh</span> config/zookeeper.properties &

Kafka サーバーの起動

[[email protected] kafka_2.<span style="color: #800080;">9.1</span>-<span style="color: #800080;">0.8</span>.<span style="color: #800080;">2.2</span>]# <span style="color: #0000ff;">sh</span> bin/kafka-server-start.<span style="color: #0000ff;">sh</span> config/server.properties &

プロデューサー プロデューサーの実行

[[email protected] kafka_2.<span style="color: #800080;">9.1</span>-<span style="color: #800080;">0.8</span>.<span style="color: #800080;">2.2</span>]# <span style="color: #0000ff;">sh</span> bin/kafka-console-producer.<span style="color: #0000ff;">sh</span> --broker-list localhost:<span style="color: #800080;">9092</span> --topic test

コンシューマー Consumer を実行します

[[email protected] kafka_2.<span style="color: #800080;">9.1</span>-<span style="color: #800080;">0.8</span>.<span style="color: #800080;">2.2</span>]# <span style="color: #0000ff;">sh</span> bin/kafka-console-consumer.<span style="color: #0000ff;">sh</span> --zookeeper localhost:<span style="color: #800080;">2181</span> --topic test --from-beginning

このように、プロデューサー側でコンテンツを入力すると、コンシューマーはそれを受け取りますすぐに。

4. クロスマシンのプロデューサーまたはコンシューマ接続がある場合

config/server.properties の host.name を構成する必要があります。そうしないと、クロスマシン接続ができなくなります。 。

3. Kafka-PHP 拡張機能

しばらく使ってみると https://github.com/nmred/kafka-php が使えるようになります。

composer を使用してインストールしました。以下は例です:

Producer.php

<?<span style="color: #000000;">php</span><span style="color: #0000ff;">require</span> 'vendor/autoload.php'<span style="color: #000000;">;</span><span style="color: #0000ff;">while</span> (1<span style="color: #000000;">) {    </span><span style="color: #800080;">$part</span> = <span style="color: #008080;">mt_rand</span>(0, 1<span style="color: #000000;">);    </span><span style="color: #800080;">$produce</span> = \Kafka\Produce::getInstance('kafka0:2181', 3000<span style="color: #000000;">);    </span><span style="color: #008000;">//</span><span style="color: #008000;"> get available partitions</span>    <span style="color: #800080;">$partitions</span> = <span style="color: #800080;">$produce</span>->getAvailablePartitions('topic_name'<span style="color: #000000;">);    </span><span style="color: #008080;">var_dump</span>(<span style="color: #800080;">$partitions</span><span style="color: #000000;">);    </span><span style="color: #008000;">//</span><span style="color: #008000;"> send message</span>    <span style="color: #800080;">$produce</span>->setRequireAck(-1<span style="color: #000000;">);    </span><span style="color: #800080;">$produce</span>->setMessages('topic_name', 0, <span style="color: #0000ff;">array</span>(<span style="color: #008080;">date</span>('Y-m-d H:i:s'<span style="color: #000000;">));       </span><span style="color: #008080;">sleep</span>(3<span style="color: #000000;">);}</span>

consumer.php

<span style="color: #0000ff;">require</span> 'vendor/autoload.php'<span style="color: #000000;">;</span><span style="color: #800080;">$consumer</span> = \Kafka\Consumer::getInstance('kafka0:2181'<span style="color: #000000;">);</span><span style="color: #800080;">$group</span> = 'topic_name'<span style="color: #000000;">;</span><span style="color: #800080;">$consumer</span>->setGroup(<span style="color: #800080;">$group</span><span style="color: #000000;">);</span><span style="color: #800080;">$consumer</span>->setFromOffset(<span style="color: #0000ff;">true</span><span style="color: #000000;">);</span><span style="color: #800080;">$consumer</span>->setTopic('topic_name', 0<span style="color: #000000;">);</span><span style="color: #800080;">$consumer</span>->setMaxBytes(102400<span style="color: #000000;">);</span><span style="color: #800080;">$result</span> = <span style="color: #800080;">$consumer</span>-><span style="color: #000000;">fetch();</span><span style="color: #008080;">print_r</span>(<span style="color: #800080;">$result</span><span style="color: #000000;">);</span><span style="color: #0000ff;">foreach</span> (<span style="color: #800080;">$result</span> <span style="color: #0000ff;">as</span> <span style="color: #800080;">$topicName</span> => <span style="color: #800080;">$partition</span><span style="color: #000000;">) {    </span><span style="color: #0000ff;">foreach</span> (<span style="color: #800080;">$partition</span> <span style="color: #0000ff;">as</span> <span style="color: #800080;">$partId</span> => <span style="color: #800080;">$messageSet</span><span style="color: #000000;">) {    </span><span style="color: #008080;">var_dump</span>(<span style="color: #800080;">$partition</span>-><span style="color: #000000;">getHighOffset());        </span><span style="color: #0000ff;">foreach</span> (<span style="color: #800080;">$messageSet</span> <span style="color: #0000ff;">as</span> <span style="color: #800080;">$message</span><span style="color: #000000;">) {            </span><span style="color: #008080;">var_dump</span>((<span style="color: #0000ff;">string</span>)<span style="color: #800080;">$message</span><span style="color: #000000;">);        }    </span><span style="color: #008080;">var_dump</span>(<span style="color: #800080;">$partition</span>-><span style="color: #000000;">getMessageOffset());    }}</span>

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。