Home  >  Article  >  Backend Development  >  How to use PHP and Google Cloud Dataflow for streaming data processing and management

How to use PHP and Google Cloud Dataflow for streaming data processing and management

王林
王林Original
2023-06-25 08:07:39948browse

With the advent of the era of information explosion, the use and processing of data have become more and more important. Streaming data processing has become one of the important ways to process massive data. As a PHP developer, you must have experience and needs in processing real-time data. This article will introduce how to use PHP and Google Cloud Dataflow for streaming data processing and management.

1. Introduction to Google Cloud Dataflow

Google Cloud Dataflow is a cloud service that manages large-scale data processing tasks. It can effectively handle large-scale data flows while also allowing Batch and stream processing are mixed together.

Google Cloud Dataflow has the following characteristics:

  1. It will automatically expand when the memory of a single node is not enough
  2. It can hide the underlying abstraction from users, allowing users to be more comprehensive Simply write code
  3. No need to build or manage a cluster for data processing
  4. Supports multiple languages

2. Create a Google Cloud Dataflow project and set up the environment

  1. Create a Google Cloud project

First you need to create a Google Cloud project.

  1. Install Google Cloud SDK

You need to install Google Cloud SDK to use Google Cloud Dataflow. Download the installation package and follow the prompts to complete the installation.

  1. Set environment variables

Use the following command to set the environment variables to the current Google Cloud project:

$ gcloud config set project [PROJECT_ID]

3. Install the necessary PHP extensions

In order to use the Dataflow service in PHP, you need to install the following extension:

  1. gRPC extension

Use the following command to install the gRPC extension:

$ pecl install grpc
  1. Protobuf extension

Use the following command to install the Protobuf extension:

$ pecl install protobuf
  1. Dataflow PHP extension

Use the following command to install Dataflow PHP Extension:

$ pecl install google-cloud-dataflow-alpha

4. Write data flow processing code

The following is an example that can receive messages from the Pub/Sub topic and pass them to the Dataflow processing pipeline. After the processing is completed, the results will be Write to the BigQuery table:

<?php
require __DIR__ . '/vendor/autoload.php';

use GoogleCloudBigQueryBigQueryClient;
use GoogleCloudDataflowDataflowClient;
use GoogleCloudDataflowPubSubPubSubOptions;
use GoogleCloudPubSubPubSubClient;
use GoogleCloudDataflowOptions;

$configs = include __DIR__ . '/config.php';

$inputTopic = $configs['input_topic'];
$outputTable = $configs['output_table'];
$project = $configs['project_id'];
$bucket = $configs['bucket'];
$stagingLocation = $configs['staging_location'];
$tempLocation = $configs['temp_location'];
$jobName = 'test-job';

$options = [
    'project' => $project,
    'stagingLocation' => $stagingLocation,
    'tempLocation' => $tempLocation,
    'jobName' => $jobName,
];

$pubsub = new PubSubClient([
    'projectId' => $project
]);

$pubsub_topic = $pubsub->topic($inputTopic);

$bigquery = new BigQueryClient([
    'projectId' => $project
]);

$dataset = $bigquery->dataset('test_dataset');
$table = $dataset->table($outputTable);

$table->create([
    'schema' => [
        [
            'name' => 'id',
            'type' => 'STRING',
        ],
        [
            'name' => 'timestamp',
            'type' => 'TIMESTAMP',
        ],
        [
            'name' => 'message',
            'type' => 'STRING',
        ],
    ],
]);

$dataflow = new DataflowClient();

$pubsubOptions = PubSubOptions::fromArray([
    'topic' => sprintf('projects/%s/topics/%s', $project, $inputTopic),
]);

$options = [
    Options::PROJECT => $project,
    Options::STAGING_LOCATION => $stagingLocation,
    Options::TEMP_LOCATION => $tempLocation,
    Options::JOB_NAME => $jobName,
];

$job = $dataflow->createJob([
    'projectId' => $project,
    'name' => $jobName,
    'environment' => [
        'tempLocation' => sprintf('gs://%s/temp', $bucket),
    ],
    'steps' => [
        [
            'name' => 'Read messages from Pub/Sub',
            'pubsubio' => (new GoogleCloudDataflowIoPubsubPubsubMessage())
                ->expand($pubsubOptions)
                ->withAttributes(false)
                ->withIdAttribute('unique_id')
                ->withTimestampAttribute('publish_time')
        ],
        [
            'name' => 'Write messages to BigQuery',
            'bigquery' => (new GoogleCloudDataflowIoBigQueryBigQueryWrite())
                ->withJsonSchema(file_get_contents(__DIR__ . '/schema.json'))
                ->withTable($table->tableId())
        ],
    ]
]);

$operation = $job->run();

# Poll the operation until it is complete
$operation->pollUntilComplete();

if (!$operation->isComplete()) {
    exit(1);
}

if ($operation->getError()) {
    print_r($operation->getError());
    exit(1);
}

echo "Job has been launched";

5. Run the Dataflow processing pipeline

Use the following command to run the Dataflow processing pipeline:

$ php dataflow.php

6. Monitoring and management of the data processing pipeline

Google Cloud Console provides a Dataflow page that can be used to view and manage data processing pipelines.

7. Summary

This article introduces how to use PHP and Google Cloud Dataflow for stream data processing and management, from creating a Google Cloud project to setting up the environment, installing the necessary PHP extensions, and then writing Data flow processing code, running Dataflow processing pipeline, and monitoring and management of data processing pipelines introduce the process and steps of Dataflow in detail. I hope it will be helpful to everyone.

The above is the detailed content of How to use PHP and Google Cloud Dataflow for streaming data processing and management. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn