>  기사  >  백엔드 개발  >  스트리밍 데이터 처리 및 관리를 위해 PHP 및 Google Cloud Dataflow를 사용하는 방법

스트리밍 데이터 처리 및 관리를 위해 PHP 및 Google Cloud Dataflow를 사용하는 방법

王林
王林원래의
2023-06-25 08:07:39999검색

정보 폭발 시대가 도래하면서 데이터의 활용과 처리가 더욱 중요해졌습니다. 스트리밍 데이터 처리는 대용량 데이터를 처리하는 중요한 방법 중 하나가 되었습니다. PHP 개발자로서 실시간 데이터 처리에 대한 경험과 요구 사항이 있어야 합니다. 이 문서에서는 스트리밍 데이터 처리 및 관리를 위해 PHP와 Google Cloud Dataflow를 사용하는 방법을 소개합니다.

1. Google Cloud Dataflow 소개

Google Cloud Dataflow는 대규모 데이터 처리 작업을 관리하는 클라우드 서비스로, 일괄 처리와 스트림 처리를 혼합할 수 있습니다. 사용.

Google Cloud Dataflow의 특징은 다음과 같습니다.

  1. 단일 노드의 메모리가 충분하지 않으면 자동으로 확장됩니다.
  2. 기본 추상화를 사용자에게 숨길 수 있어 사용자가 더 쉽게 코드를 작성할 수 있습니다.
  3. 필요 없음 클러스터 구축 또는 관리 데이터 처리
  4. 다국어 지원 ​​

2. Google Cloud Dataflow 프로젝트 생성 및 환경 설정

  1. Google Cloud 프로젝트 생성

먼저 Google Cloud 프로젝트를 생성해야 합니다. .

  1. Google Cloud SDK 설치

Google Cloud Dataflow를 사용하려면 Google Cloud SDK를 설치해야 합니다. 설치 패키지를 다운로드하고 지시에 따라 설치를 완료합니다.

  1. 환경 변수 설정

다음 명령어를 사용하여 현재 Google Cloud 프로젝트에 환경 변수를 설정합니다.

$ gcloud config set project [PROJECT_ID]

3. 필요한 PHP 확장 프로그램을 설치합니다.

PHP에서 Dataflow 서비스를 사용하려면 다음을 수행해야 합니다. 다음 확장을 설치하십시오:

  1. GRPC 확장 下 GRPC 확장을 설치하려면 다음 명령을 사용하십시오:
  2. R
    $ pecl install grpc
    E

PROTOBUF 확장

  1. 다음 명령을 사용하여 Protobuf 확장을 설치하십시오:
  2. $ pecl install protobuf
    EE

DAFLOW PHP 확장

  1. DataFlow PHP 확장 프로그램을 설치하는 명령은 다음과 같습니다.
  2. Rreee
Four, 데이터 흐름 처리 코드 작성

다음은 Pub/Sub 주제에서 메시지를 수신하여 처리가 완료된 후 Dataflow 처리 파이프라인으로 전달할 수 있는 예시입니다. 결과는 BigQuery 테이블에 기록됩니다.

$ pecl install google-cloud-dataflow-alpha

5. Dataflow 처리 파이프라인을 실행합니다.

다음 명령어를 사용하여 Dataflow 처리 파이프라인을 실행합니다.

<?php
require __DIR__ . '/vendor/autoload.php';

use GoogleCloudBigQueryBigQueryClient;
use GoogleCloudDataflowDataflowClient;
use GoogleCloudDataflowPubSubPubSubOptions;
use GoogleCloudPubSubPubSubClient;
use GoogleCloudDataflowOptions;

$configs = include __DIR__ . '/config.php';

$inputTopic = $configs['input_topic'];
$outputTable = $configs['output_table'];
$project = $configs['project_id'];
$bucket = $configs['bucket'];
$stagingLocation = $configs['staging_location'];
$tempLocation = $configs['temp_location'];
$jobName = 'test-job';

$options = [
    'project' => $project,
    'stagingLocation' => $stagingLocation,
    'tempLocation' => $tempLocation,
    'jobName' => $jobName,
];

$pubsub = new PubSubClient([
    'projectId' => $project
]);

$pubsub_topic = $pubsub->topic($inputTopic);

$bigquery = new BigQueryClient([
    'projectId' => $project
]);

$dataset = $bigquery->dataset('test_dataset');
$table = $dataset->table($outputTable);

$table->create([
    'schema' => [
        [
            'name' => 'id',
            'type' => 'STRING',
        ],
        [
            'name' => 'timestamp',
            'type' => 'TIMESTAMP',
        ],
        [
            'name' => 'message',
            'type' => 'STRING',
        ],
    ],
]);

$dataflow = new DataflowClient();

$pubsubOptions = PubSubOptions::fromArray([
    'topic' => sprintf('projects/%s/topics/%s', $project, $inputTopic),
]);

$options = [
    Options::PROJECT => $project,
    Options::STAGING_LOCATION => $stagingLocation,
    Options::TEMP_LOCATION => $tempLocation,
    Options::JOB_NAME => $jobName,
];

$job = $dataflow->createJob([
    'projectId' => $project,
    'name' => $jobName,
    'environment' => [
        'tempLocation' => sprintf('gs://%s/temp', $bucket),
    ],
    'steps' => [
        [
            'name' => 'Read messages from Pub/Sub',
            'pubsubio' => (new GoogleCloudDataflowIoPubsubPubsubMessage())
                ->expand($pubsubOptions)
                ->withAttributes(false)
                ->withIdAttribute('unique_id')
                ->withTimestampAttribute('publish_time')
        ],
        [
            'name' => 'Write messages to BigQuery',
            'bigquery' => (new GoogleCloudDataflowIoBigQueryBigQueryWrite())
                ->withJsonSchema(file_get_contents(__DIR__ . '/schema.json'))
                ->withTable($table->tableId())
        ],
    ]
]);

$operation = $job->run();

# Poll the operation until it is complete
$operation->pollUntilComplete();

if (!$operation->isComplete()) {
    exit(1);
}

if ($operation->getError()) {
    print_r($operation->getError());
    exit(1);
}

echo "Job has been launched";

6. 데이터 처리 파이프라인 모니터링 및 관리

Google Cloud Console에서 제공하는 데이터 처리 파이프라인을 보고 관리하는 데 사용할 수 있는 데이터 흐름 페이지입니다.

7. 요약

이 글에서는 Google Cloud 프로젝트 생성부터 환경 설정, 필요한 PHP 확장 프로그램 설치, 데이터 스트림 처리 코드 작성 및 관리에 이르기까지 PHP와 Google Cloud Dataflow를 사용하여 스트림 데이터를 처리하는 방법을 소개합니다. Dataflow 처리 파이프라인 실행, 데이터 처리 파이프라인 모니터링 및 관리는 Dataflow의 프로세스와 단계를 자세히 소개하여 모든 사람에게 도움이 되기를 바랍니다.

위 내용은 스트리밍 데이터 처리 및 관리를 위해 PHP 및 Google Cloud Dataflow를 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.