ホームページ >バックエンド開発 >PHPチュートリアル >PHP と Apache Kafka の統合による効率的なメッセージのキューイングと配布
最新のインターネット アプリケーションの継続的な開発に伴い、大量のデータ通信を処理する必要があるアプリケーションがますます増えています。これらのデータ通信を処理する従来の方法は、ポーリングまたはブロック I/O を使用することですが、これらの方法は非常に非効率であるため、最新のアプリケーションのニーズを満たすことができなくなりました。この問題を解決するために、業界はメッセージ キューと分散システムと呼ばれるテクノロジーを開発しました。
メッセージ キューおよび配信システムでは、メッセージのプロデューサはメッセージをキューに送信し、メッセージのコンシューマはキューからメッセージを取得して、対応する操作を実行します。このアプローチにより、ポーリングや I/O のブロックなどの問題が回避されるため、データ通信の効率が大幅に向上します。
この記事では、PHP と Apache Kafka の統合を使用して効率的なメッセージ キューイングと配信を実現する方法について説明します。
Apache Kafka の概要
Apache Kafka は、高スループット、低遅延、スケーラブルな分散メッセージング システムです。大量のメッセージを処理し、より高い負荷に対応するために水平方向に拡張できます。 Apache Kafka の主なコンポーネントは次のとおりです。
PHP は Apache Kafka を統合します
Apache Kafka を使用するには、PHP の Kafka 拡張機能を使用する必要があります。この拡張機能は、Kafka を操作するために PHP で必要なすべての API を提供します。
まず、Kafka 拡張機能をインストールする必要があります。PECL からインストールできます:
pecl install kafka
拡張機能をインストールした後、使用を開始できます。以下は、PHP と Apache Kafka を使用したメッセージの生成と消費の簡単な例です。
<?php $brokers = 'kafka:9092'; // Kafka集群地址 $topic = 'test'; // Topic名称 // 创建一个Kafka生产者 $producer = new RdKafkaProducer(); $producer->setLogLevel(LOG_DEBUG); $producer->addBrokers($brokers); // 创建一个Kafka消费者 $conf = new RdKafkaConf(); $conf->set('group.id', 'myGroup'); $consumer = new RdKafkaConsumer($conf); $consumer->addBrokers($brokers); // 生产消息 $topicProducer = $producer->newTopic($topic); for ($i = 0; $i < 10; $i++) { $topicProducer->produce(RD_KAFKA_PARTITION_UA, 0, 'Message ' . $i); } // 消费消息 $topicConsumer = $consumer->newTopic($topic); $topicConsumer->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); while (true) { $message = $topicConsumer->consume(0, 1000); if (null === $message) { continue; } if ($message->err) { throw new Exception('Error occurred while consuming message'); } echo $message->payload . PHP_EOL; }
この例では、最初に Kafka プロデューサと Kafka コンシューマを作成します。次に、プロデューサーでは、指定されたトピックに 10 個のメッセージを送信し、コンシューマーでは、指定されたトピックからのメッセージを消費して、その内容を出力しました。
この時点で、PHP と Apache Kafka を使用した単純なメッセージの生成と消費が正常に実装されました。次に、PHP と Apache Kafka を使用して、より高度な機能を実装する方法について説明します。
高度なアプリケーションの例
実際のアプリケーションでは、通常、次のようないくつかの高度な機能を実装する必要があります。
ここでは、これらの関数の実装方法について説明します。
メッセージ配布
実際のアプリケーションでは、通常、メッセージのフローを制御する必要があります。たとえば、特定のコンシューマのみに特定のメッセージを消費させたい場合があります。この機能を実現するには、コンシューマごとにキューを作成し、特定のメッセージを特定のキューに割り当てることができます。
次は、2 つのコンシューマを使用して 2 つの異なるタスクを実行する例です。
<?php $brokers = 'kafka:9092'; // Kafka集群地址 $topic = 'test'; // Topic名称 // 创建一个Kafka消费者组 $conf = new RdKafkaConf(); $conf->set('group.id', 'myGroup'); $consumer = new RdKafkaKafkaConsumer($conf); $consumer->subscribe([$topic]); // 创建两个Kafka生产者,一个生产者用于向消费者1发送消息,另一个生产者用于向消费者2发送消息 $producer1 = new RdKafkaProducer(); $producer1->addBrokers($brokers); $producer1Topic = $producer1->newTopic($topic . '_1'); $producer2 = new RdKafkaProducer(); $producer2->addBrokers($brokers); $producer2Topic = $producer2->newTopic($topic . '_2'); // 消费消息 while (true) { $message = $consumer->consume(1000); if (null === $message) { continue; } if ($message->err) { throw new Exception('Error occurred while consuming message'); } echo 'Received message: ' . $message->payload . PHP_EOL; // 根据消息内容分配给不同的生产者 if ($message->payload === 'task1') { $producer1Topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->payload); } elseif ($message->payload === 'task2') { $producer2Topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->payload); } }
この例では、2 つのプロデューサを使用して 2 つの異なるコンシューマにメッセージを配布します。コンシューマーがメッセージを受信すると、メッセージの内容に基づいてそのメッセージを特定のプロデューサーに割り当てることができます。この方法は、メッセージ フローを制御し、メッセージの冗長な処理を回避するのに役立ちます。
コンシューマ グループ
通常の Kafka コンシューマでは、同じグループ内の異なるコンシューマが同じトピックを消費し、同じメッセージを受け取ります。これは、Kafka が自動的にパーティションのバランスをとり、各パーティションが 1 つのコンシューマのみによって処理されるようにするためです。
PHP では、group.id を使用してコンシューマをグループ化し、コンシューマ グループの機能を実現できます。
次は、同じグループ内のメッセージを並列処理できる Kafka コンシューマ グループの例です:
<?php $brokers = 'kafka:9092'; // Kafka集群地址 $topic = 'test'; // Topic名称 // 创建一个Kafka消费者组 $conf = new RdKafkaConf(); $conf->set('group.id', 'myGroup'); $conf->set('metadata.broker.list', $brokers); $conf->set('enable.auto.commit', 'false'); $consumer = new RdKafkaKafkaConsumer($conf); // 添加需要订阅的topic $consumer->subscribe([$topic]); // 处理消息 while (true) { $message = $consumer->consume(1000); if (null === $message) { continue; } if ($message->err) { throw new Exception('Error occurred while consuming message'); } echo 'Received message: ' . $message->payload . PHP_EOL; // 处理完消息后手动提交offset $consumer->commit(); }
この例では、Kafka コンシューマ グループを作成し、それを追加トピックに送信します。サブスクリプションが必要なもの。その後、同じグループ内のメッセージを並行して処理できます。
注: コンシューマ グループでは、複数のコンシューマが 1 つ以上のパーティションを一緒に消費します。データを消費するときは、同じデータのマルチスレッド処理の問題に注意する必要があります。
オフセット構成
Kafka では、各パーティションに独立したオフセットがあります。コンシューマは、パーティション内のどこを読み取るか、つまりどのメッセージを読み取るかを制御できます。コンシューマは、最後のメッセージまたは最新のメッセージから読み取りを開始できます。
PHP では、オフセットを使用してメッセージの読み取り位置を制御できます。以下はオフセット構成の例です:
<?php $brokers = 'kafka:9092'; // Kafka集群地址 $topic = 'test'; // Topic名称 // 创建一个Kafka消费者 $conf = new RdKafkaConf(); $conf->set('group.id', 'myGroup'); $consumer = new RdKafkaKafkaConsumer($conf); // 订阅topic $topicConf = new RdKafkaTopicConf(); $topicConf->set('auto.offset.reset', 'earliest'); $topic = $consumer->newTopic($topic, $topicConf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); // 消费消息 while (true) { $message = $topic->consume(0, 1000); if (null === $message) { continue; } if ($message->err) { throw new Exception('Error occurred while consuming message'); } echo 'Received message: ' . $message->payload . PHP_EOL; }
この例では、auto.offset.reset を使用してオフセット構成を設定します。この構成は、コンシューマに最も古いオフセットからメッセージの消費を開始するように指示します。
実際のアプリケーションでは、必要に応じてさまざまなオフセットを構成できます。たとえば、プロデューサーが一部のメッセージの処理に失敗した後、失敗したメッセージが以前に処理された時点からメッセージの読み取りを再開する必要がある場合があります。
結論
この記事では、PHP と Apache Kafka の統合を使用して効率的なメッセージ キューイングと配信を実現する方法について説明しました。最初に Apache Kafka の基本を紹介し、次に PHP 用の Kafka 拡張機能を使用してメッセージの生成と消費を実装する方法について説明しました。最後に、メッセージ配布、コンシューマ グループ、オフセット構成などの高度な機能を実装する方法について説明しました。
PHP と Apache Kafka の統合を使用すると、効率的なメッセージのキューイングと配信を実装できるため、アプリケーションの応答速度とスループットが向上します。大量のデータ通信を処理する必要があるアプリケーションを開発している場合は、Apache Kafka と PHP 用の Kafka 拡張機能が適切な選択となる可能性があります。
以上がPHP と Apache Kafka の統合による効率的なメッセージのキューイングと配布の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。