ホームページ  >  記事  >  バックエンド開発  >  PHP と Google Cloud Dataflow を使用してストリーミング データの処理と管理を行う方法

PHP と Google Cloud Dataflow を使用してストリーミング データの処理と管理を行う方法

王林
王林オリジナル
2023-06-25 08:07:39950ブラウズ

情報爆発時代の到来により、データの使用と処理はますます重要になっています。ストリーミング データ処理は、大量のデータを処理する重要な方法の 1 つになっています。 PHP 開発者は、リアルタイム データの処理に関する経験とニーズを持っている必要があります。この記事では、PHP と Google Cloud Dataflow を使用してストリーミング データの処理と管理を行う方法を紹介します。

1. Google Cloud Dataflow の概要

Google Cloud Dataflow は、大規模なデータ処理タスクを管理するクラウド サービスです。大規模なデータ フローを効率的に処理できると同時に、バッチとストリームも可能です。処理が混在しています。

Google Cloud Dataflow には次の特徴があります:

  1. 単一ノードのメモリが十分でない場合は自動的に拡張されます
  2. 基盤となる抽象化を隠蔽できます。コードを記述するだけで、データ処理用のクラスターを構築または管理する必要がなくなります。
  3. 複数の言語をサポートします。
  4. 2. Google Cloud Dataflow プロジェクトと環境のセットアップ

Google Cloud プロジェクトの作成
  1. ##まず、Google Cloud プロジェクトを作成する必要があります。

Google Cloud SDK のインストール

  1. Google Cloud Dataflow を使用するには、Google Cloud SDK をインストールする必要があります。インストール パッケージをダウンロードし、プロンプトに従ってインストールを完了します。

環境変数の設定

  1. 次のコマンドを使用して環境変数を現在の Google Cloud プロジェクトに設定します:
  2. $ gcloud config set project [PROJECT_ID]
3. 必要な PHP をインストールします。拡張機能

PHP で Dataflow サービスを使用するには、次の拡張機能をインストールする必要があります:

gRPC 拡張機能

  1. 次のコマンドを使用して、 gRPC 拡張機能をインストールします:
  2. $ pecl install grpc

Protobuf 拡張機能

  1. 次のコマンドを使用して Protobuf 拡張機能をインストールします:
  2. $ pecl install protobuf

Dataflow PHP 拡張機能

  1. 次のコマンドを使用して Dataflow PHP 拡張機能をインストールします:
  2. $ pecl install google-cloud-dataflow-alpha
4. データ フロー処理コードを作成します

次は、Pub からメッセージを受信できる例です。 /Sub トピックを作成し、それらを Dataflow 処理パイプラインに渡します。処理が完了すると、結果は次のようになります。BigQuery テーブルに書き込みます:

<?php
require __DIR__ . '/vendor/autoload.php';

use GoogleCloudBigQueryBigQueryClient;
use GoogleCloudDataflowDataflowClient;
use GoogleCloudDataflowPubSubPubSubOptions;
use GoogleCloudPubSubPubSubClient;
use GoogleCloudDataflowOptions;

$configs = include __DIR__ . '/config.php';

$inputTopic = $configs['input_topic'];
$outputTable = $configs['output_table'];
$project = $configs['project_id'];
$bucket = $configs['bucket'];
$stagingLocation = $configs['staging_location'];
$tempLocation = $configs['temp_location'];
$jobName = 'test-job';

$options = [
    'project' => $project,
    'stagingLocation' => $stagingLocation,
    'tempLocation' => $tempLocation,
    'jobName' => $jobName,
];

$pubsub = new PubSubClient([
    'projectId' => $project
]);

$pubsub_topic = $pubsub->topic($inputTopic);

$bigquery = new BigQueryClient([
    'projectId' => $project
]);

$dataset = $bigquery->dataset('test_dataset');
$table = $dataset->table($outputTable);

$table->create([
    'schema' => [
        [
            'name' => 'id',
            'type' => 'STRING',
        ],
        [
            'name' => 'timestamp',
            'type' => 'TIMESTAMP',
        ],
        [
            'name' => 'message',
            'type' => 'STRING',
        ],
    ],
]);

$dataflow = new DataflowClient();

$pubsubOptions = PubSubOptions::fromArray([
    'topic' => sprintf('projects/%s/topics/%s', $project, $inputTopic),
]);

$options = [
    Options::PROJECT => $project,
    Options::STAGING_LOCATION => $stagingLocation,
    Options::TEMP_LOCATION => $tempLocation,
    Options::JOB_NAME => $jobName,
];

$job = $dataflow->createJob([
    'projectId' => $project,
    'name' => $jobName,
    'environment' => [
        'tempLocation' => sprintf('gs://%s/temp', $bucket),
    ],
    'steps' => [
        [
            'name' => 'Read messages from Pub/Sub',
            'pubsubio' => (new GoogleCloudDataflowIoPubsubPubsubMessage())
                ->expand($pubsubOptions)
                ->withAttributes(false)
                ->withIdAttribute('unique_id')
                ->withTimestampAttribute('publish_time')
        ],
        [
            'name' => 'Write messages to BigQuery',
            'bigquery' => (new GoogleCloudDataflowIoBigQueryBigQueryWrite())
                ->withJsonSchema(file_get_contents(__DIR__ . '/schema.json'))
                ->withTable($table->tableId())
        ],
    ]
]);

$operation = $job->run();

# Poll the operation until it is complete
$operation->pollUntilComplete();

if (!$operation->isComplete()) {
    exit(1);
}

if ($operation->getError()) {
    print_r($operation->getError());
    exit(1);
}

echo "Job has been launched";

5. Dataflow 処理パイプラインを実行します。 Dataflow 処理パイプラインを実行するには、次のコマンドを使用します。

$ php dataflow.php

6. データ処理パイプラインのモニタリングと管理

Google Cloud Console には、データ処理パイプラインの表示と管理に使用できる Dataflow ページが用意されています。 。

7. 概要

この記事では、Google Cloud プロジェクトの作成から環境の設定、必要な PHP 拡張機能のインストールまで、ストリーム データの処理と管理に PHP と Google Cloud Dataflow を使用する方法を紹介します。 、次に、Dataflow 処理コードの作成、Dataflow 処理パイプラインの実行、およびデータ処理パイプラインの監視と管理では、Dataflow のプロセスと手順を詳しく紹介します。

以上がPHP と Google Cloud Dataflow を使用してストリーミング データの処理と管理を行う方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。