>  기사  >  백엔드 개발  >  PHP와 Kafka를 사용하여 실시간 데이터 처리를 구현하는 방법

PHP와 Kafka를 사용하여 실시간 데이터 처리를 구현하는 방법

WBOY
WBOY원래의
2023-06-28 11:02:282074검색

최근 몇 년 동안 실시간 데이터 처리에 대한 수요가 계속해서 증가해 왔습니다. 콜드 스타트 ​​및 배치 기반 기술은 더 이상 실시간 데이터 처리 요구 사항을 충족할 수 없습니다. 따라서 실시간 데이터 처리 기술로 눈을 돌리는 기업이 늘어나고 있습니다. 이 기사에서는 PHP와 Kafka를 사용하여 실시간 데이터 처리를 달성하는 방법을 소개합니다.

Kafka는 원래 LinkedIn이 개발한 처리량이 높은 분산 스트림 처리 플랫폼입니다. Kafka는 새로운 스트림 처리, 일괄 처리, 메시징 시스템, 조정 시스템 등을 만드는 데 사용될 수 있습니다.

PHP는 인터넷 애플리케이션을 구축하는 데 널리 사용되는 인기 있는 동적 프로그래밍 언어입니다. PHP는 실시간 데이터 처리를 위한 첫 번째 선택은 아니지만 웹 개발 및 데이터 처리에 널리 사용됩니다.

이제 PHP와 Kafka를 사용하여 실시간 데이터 처리를 구현하는 단계를 소개하겠습니다.

1단계: PHP 설치 및 구성

PHP로 실시간 데이터 처리를 시작하기 전에 PHP 환경을 설치하고 Kafka 확장 및 Redis 확장과 같은 필요한 PHP 확장을 추가해야 합니다.

Kafka 확장은 이 링크 kafka에서 다운로드하여 설치할 수 있으며, pecl install kafka를 설치하면 kafka 확장을 설치할 수 있습니다.

Redis 확장 여기에서 PHP Redis 확장을 다운로드하여 설치하거나 PECL을 사용하여 설치할 수 있습니다. 명령: pecl install redis.

PHP 확장을 설치하고 구성한 후 실시간 데이터 처리 프로그램 작성을 시작할 수 있습니다.

2단계: Kafka에 연결

Kafka에서는 Kafka 생산자와 Kafka 소비자가 데이터 스트림을 연결하여 데이터를 "데이터 파이프라인"으로 전송하는 데 사용됩니다. PHP에서는 Kafka에서 제공하는 KafkaProducer 및 KafkaConsumer 클래스를 사용하고 이를 인스턴스화하여 Kafka에 연결할 수 있습니다.

샘플 코드는 다음과 같습니다.

<?php

$kafkaConf = new RdKafkaConf();
$kafkaConf->set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息
$kafkaProducer = new RdKafkaProducer($kafkaConf);
$kafkaConsumer = new RdKafkaConsumer($kafkaConf);
$topic = $kafkaProducer->newTopic('sample');

?>

3단계: 데이터 읽기

KafkaConsumer 클래스를 사용하여 실시간 데이터 스트림을 얻을 수 있습니다. Kafka에는 데이터 흐름을 하나 이상의 파티션으로 나누는 스트림 개념이 있으며, 각 파티션은 마스터 파티션과 0개 이상의 슬레이브 파티션으로 구성됩니다. PHP에서는 KafkaConsumer 클래스를 사용하여 소비자 개체를 인스턴스화하고 하나 이상의 파티션을 구독하여 데이터를 읽을 수 있습니다.

샘플 코드는 다음과 같습니다.

<?php

$kafkaConf = new RdKafkaConf();
$kafkaConf->set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息
$kafkaConsumer = new RdKafkaConsumer($kafkaConf);

$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$topic = $kafkaConsumer->newTopic('sample', $topicConf);

var_dump($topic->getMetadata(true, 10000));

$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
    $message = $topic->consume(0, 1000);
    if (null !== $message) {
        print_r($message->payload);
    }
}

?>

4단계: 데이터 처리

데이터를 받은 후 데이터를 처리하여 메모리에 저장할 수 있습니다. Redis를 사용하면 적절한 시간에 정기적으로 데이터베이스에 데이터를 새로 고쳐 데이터를 저장하고 안전하게 유지할 수 있습니다.

샘플 코드는 다음과 같습니다.

<?php

$kafkaConf = new RdKafkaConf();
$kafkaConf->set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息
$kafkaConsumer = new RdKafkaConsumer($kafkaConf);

$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$topic = $kafkaConsumer->newTopic('sample', $topicConf);

$redisClient = new Redis();
$redisClient->connect('127.0.0.1', 6379);

$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
    $message = $topic->consume(0, 1000);
    if (null !== $message) {
        $data = json_decode($message->payload);
        $redisClient->hMSet('my_data', [
            $data->key1 => $data->value1,
            $data->key2 => $data->value2,
        ]);
    }
}

?>

5단계: 데이터 동기화

마지막으로 실시간 데이터 스트림을 데이터베이스로 다시 플러시해야 합니다. 타이머와 PHP 프로세스를 사용하여 정기적으로 Redis 캐시를 데이터베이스로 다시 플러시할 수 있습니다.

샘플 코드는 다음과 같습니다.

<?php

$kafkaConf = new RdKafkaConf();
$kafkaConf->set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息
$kafkaConsumer = new RdKafkaConsumer($kafkaConf);

$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$topic = $kafkaConsumer->newTopic('sample', $topicConf);

$redisClient = new Redis();
$redisClient->connect('127.0.0.1', 6379);

$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

$count = 0;
while (true) {
    $message = $topic->consume(0, 1000);
    if (null !== $message) {
        $data = json_decode($message->payload);
        $redisClient->hMSet('my_data', [
            $data->key1 => $data->value1,
            $data->key2 => $data->value2,
        ]);
        $count++;
        if ($count == 5) {
            $count = 0;
            $allData = $redisClient->hGetAll('my_data');
            //将数据更新到数据库中
            //...
        }
    }
}

?>

결론

이번 글에서는 PHP와 Kafka를 이용하여 실시간 데이터 처리를 구현하는 방법을 소개했습니다. Kafka를 사용하면 실시간 데이터를 데이터 파이프라인으로 쉽게 스트리밍하고, PHP를 사용하여 데이터를 처리하고 저장할 수 있습니다. 또한 Redis를 캐시 및 인메모리 스토리지로 사용하여 실시간 데이터를 처리합니다. 이 접근 방식은 캐싱 및 메시징 솔루션을 쉽게 대체하는 동시에 더 뛰어난 성능과 확장성을 제공할 수 있습니다.

위 내용은 PHP와 Kafka를 사용하여 실시간 데이터 처리를 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.