ホームページ >バックエンド開発 >PHPチュートリアル >PHP と Apache Kafka を使用してリアルタイム ストリーム処理を実装する方法
Apache Kafka は、高スループット、低遅延の分散パブリッシュ/サブスクライブ メッセージング システムです。高周波数、大容量のデータ ストリームを処理するリアルタイム ストリーム処理システムのアーキテクチャで広く使用されています。この記事では、PHP と Apache Kafka を使用してリアルタイム ストリーム処理を実装する方法を紹介します。
Apache Kafka の使用を開始する前に、まず Apache Kafka をインストールする必要があります。公式 Web サイトから Apache Kafka をダウンロードしてインストールすることも、オープンソースのインストール スクリプトを使用することもできます。ここでは、Apache Kafka が提供するバイナリ バージョンを使用します。
次に、Kafka クラスターにデータをプッシュする Kafka プロデューサーを作成します。 PHP では、kafka-php 拡張機能を使用してこれを実現できます。
まず、kafka-php 拡張機能をダウンロードしてコンパイルする必要があります。詳しいインストール手順は、kafka-php の GitHub ページでご覧いただけます。インストールが完了したら、PHP コードで kafka-php 拡張機能を使用できるようになります。
次の例は、Kafka プロデューサを作成し、トピックにメッセージを送信する方法を示しています。
<?php require_once('KafkaProducer.php'); $producer = new KafkaProducer('localhost:9092'); $producer->send([ [ 'topic' => 'example-topic', 'value' => 'Hello, Kafka!', 'key' => 'key1' ] ]); ?>
上記のコードでは、最初に KafkaProducer オブジェクトを作成し、そのアドレスを指定します。カフカクラスタ。次に、send メソッドを通じてトピック (example-topic) にメッセージを送信しました。
送信されるメッセージは、メッセージの件名、内容、キーを含む配列です。キーを使用してメッセージをグループ化すると、Kafka クラスターが同じキーを持つメッセージを同じパーティションに分散できるようになります。
次に、Kafka クラスターからのデータを消費する Kafka コンシューマーを作成します。同様に、PHP では、kafka-php 拡張機能を使用してこれを実現できます。
<?php require_once('KafkaConsumer.php'); $consumer = new KafkaConsumer('localhost:9092', 'example-group', ['example-topic']); $consumer->consume(function($message) { echo $message->payload . " "; }); ?>
上記のコードでは、まず KafkaConsumer オブジェクトを作成し、Kafka クラスターのアドレス、コンシューマー グループ (グループ) の名前、および消費されるトピックを指定します。次に、consumption メソッドを通じてデータの消費を開始します。
consume メソッドは、Kafka クラスターから受信したメッセージを処理するためのパラメーターとしてコールバック関数を受け入れます。コールバック関数では、メッセージのコンテンツ (ペイロード) にアクセスできます。
Kafka コンシューマの作成時にコンシューマ グループの名前を指定したことに注意してください。コンシューマ グループは Kafka の重要な概念であり、メッセージをパーティションに分散するために使用されます。同じコンシューマ グループ名を持つコンシューマは同じトピックを一緒に消費し、Kafka はそれらのコンシューマ間でメッセージを自動的に配布します。コンシューマ グループの目的は、各メッセージが 1 回だけ消費されるようにすることです。
ここで、上記の 2 つの例を組み合わせて、リアルタイム ストリーム処理を実現します。 Kafka プロデューサーを作成し、トピックに定期的にメッセージを送信できます。次に、コールバック関数でトピックから受信したメッセージを処理する Kafka コンシューマーを作成できます。
次は、リアルタイム ストリーム処理を示す例です:
<?php require_once('KafkaProducer.php'); require_once('KafkaConsumer.php'); $producer = new KafkaProducer('localhost:9092'); $consumer = new KafkaConsumer('localhost:9092', 'example-group', ['example-topic']); while (true) { $producer->send([ [ 'topic' => 'example-topic', 'value' => rand(0, 10), 'key' => 'key1' ] ]); $consumer->consume(function($message) { $value = $message->payload; echo "Received $value "; }); sleep(1); } ?>
上記のコードでは、最初に Kafka プロデューサと Kafka コンシューマを作成します。次に、トピックに定期的に乱数を送信し、トピックからのメッセージを消費するループに入ります。コンシューマ コールバック関数では、受信した値をコンソールに出力します。
ここで説明するのは、単純なリアルタイム ストリーム処理プロセスです。実際には、リアルタイム ストリーム処理システムはより複雑で、複数のプロデューサーとコンシューマーが存在し、複数のトピックとパーティションが存在する場合があります。いずれにしても、PHP と Apache Kafka を使用すると、リアルタイム ストリーム処理システムを簡単に構築し、高頻度で大容量のデータ ストリームを処理できます。
以上がPHP と Apache Kafka を使用してリアルタイム ストリーム処理を実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。