ホームページ >バックエンド開発 >PHPチュートリアル >PHP と Kafka を使用してリアルタイム データ処理を実装する方法

PHP と Kafka を使用してリアルタイム データ処理を実装する方法

WBOY
WBOYオリジナル
2023-06-28 11:02:282133ブラウズ

近年、リアルタイムデータ処理の需要が高まり続けています。コールド スタートおよびバッチ ベースのテクノロジでは、リアルタイム データ処理のニーズを満たすことができなくなりました。そのため、リアルタイム データ処理テクノロジーに注目する企業が増えています。この記事では、PHP と Kafka を使用してリアルタイム データ処理を実装する方法を紹介します。

Kafka は、もともと LinkedIn によって開発された高スループットの分散ストリーム処理プラットフォームです。 Kafka を使用して、新しいストリーム処理、バッチ処理、メッセージング システム、調整システムなどを作成できます。

PHP は、インターネット アプリケーションの構築に広く使用されている人気のある動的プログラミング言語です。 PHP はリアルタイム データ処理の第一の選択肢ではありませんが、Web 開発とデータ処理で広く使用されています。

次に、PHP と Kafka を使用してリアルタイム データ処理を実現する手順を紹介します。

ステップ 1: PHP のインストールと構成

PHP を使用したリアルタイム データ処理を開始する前に、PHP 環境をインストールし、Kafka 拡張機能や Redis 拡張機能などの必要な PHP 拡張機能を追加する必要があります。

Kafka 拡張機能は、このリンク kafka、pecl install kafka install Kafka extension からダウンロードしてインストールできます。

Redis 拡張機能 ここから PHP Redis 拡張機能をダウンロードしてインストールすることも、PECL を使用してインストールすることもできます (コマンド: pecl install redis)。

PHP 拡張機能をインストールして構成したら、リアルタイム データ処理プログラムの作成を開始できます。

ステップ 2: Kafka に接続する

#Kafka プロデューサと Kafka コンシューマを使用して、Kafka のデータ ストリームを接続し、データを「データ パイプライン」に送信します。 PHP では、Kafka が提供する KafkaProducer クラスと KafkaConsumer クラスを使用し、それらをインスタンス化して Kafka に接続できます。

サンプル コードは次のとおりです。

<?php

$kafkaConf = new RdKafkaConf();
$kafkaConf->set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息
$kafkaProducer = new RdKafkaProducer($kafkaConf);
$kafkaConsumer = new RdKafkaConsumer($kafkaConf);
$topic = $kafkaProducer->newTopic('sample');

?>

ステップ 3: データの読み取り

KafkaConsumer クラスを使用して、リアルタイム データ ストリームを取得できます。 Kafka には、データ フローを 1 つ以上のパーティションに分割するストリームの概念があり、各パーティションは 1 つのマスター パーティションと 0 個以上のスレーブ パーティションで構成されます。 PHP では、KafkaConsumer クラスを使用してコンシューマ オブジェクトをインスタンス化し、1 つ以上のパーティションをサブスクライブしてデータを読み取ることができます。

サンプル コードは次のとおりです。

<?php

$kafkaConf = new RdKafkaConf();
$kafkaConf->set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息
$kafkaConsumer = new RdKafkaConsumer($kafkaConf);

$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$topic = $kafkaConsumer->newTopic('sample', $topicConf);

var_dump($topic->getMetadata(true, 10000));

$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
    $message = $topic->consume(0, 1000);
    if (null !== $message) {
        print_r($message->payload);
    }
}

?>

ステップ 4: データ処理

データを受信した後、データを処理してメモリに保存できます。 Redis を使用すると、データを保存し、適切なタイミングで定期的にデータベースにデータを更新することで安全に保管できます。

サンプル コードは次のとおりです。

<?php

$kafkaConf = new RdKafkaConf();
$kafkaConf->set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息
$kafkaConsumer = new RdKafkaConsumer($kafkaConf);

$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$topic = $kafkaConsumer->newTopic('sample', $topicConf);

$redisClient = new Redis();
$redisClient->connect('127.0.0.1', 6379);

$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
    $message = $topic->consume(0, 1000);
    if (null !== $message) {
        $data = json_decode($message->payload);
        $redisClient->hMSet('my_data', [
            $data->key1 => $data->value1,
            $data->key2 => $data->value2,
        ]);
    }
}

?>

ステップ 5: データ同期

最後に、リアルタイム データ ストリームをデータベースにフラッシュして戻す必要があります。タイマーと PHP プロセスを使用して、Redis キャッシュを定期的にデータベースにフラッシュして戻すことができます。

サンプルコードは次のとおりです。

<?php

$kafkaConf = new RdKafkaConf();
$kafkaConf->set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息
$kafkaConsumer = new RdKafkaConsumer($kafkaConf);

$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$topic = $kafkaConsumer->newTopic('sample', $topicConf);

$redisClient = new Redis();
$redisClient->connect('127.0.0.1', 6379);

$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

$count = 0;
while (true) {
    $message = $topic->consume(0, 1000);
    if (null !== $message) {
        $data = json_decode($message->payload);
        $redisClient->hMSet('my_data', [
            $data->key1 => $data->value1,
            $data->key2 => $data->value2,
        ]);
        $count++;
        if ($count == 5) {
            $count = 0;
            $allData = $redisClient->hGetAll('my_data');
            //将数据更新到数据库中
            //...
        }
    }
}

?>

結論

この記事では、PHP と Kafka を使用してリアルタイム データ処理を実装する方法を紹介しました。リアルタイム データは、Kafka を使用してデータ パイプラインに簡単にストリーミングでき、PHP を使用して処理および保存できます。また、リアルタイム データを処理するためのキャッシュおよびメモリ内ストレージとして Redis を使用します。このアプローチは、より優れたパフォーマンスとスケーラビリティを提供しながら、キャッシュおよびメッセージング ソリューションを簡単に置き換えることができます。

以上がPHP と Kafka を使用してリアルタイム データ処理を実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。