>  기사  >  백엔드 개발  >  PHP는 오픈 소스 Kafka Stream 실시간 데이터 처리를 구현합니다.

PHP는 오픈 소스 Kafka Stream 실시간 데이터 처리를 구현합니다.

王林
王林원래의
2023-06-18 09:09:101451검색

Kafka Stream은 스트림 컴퓨팅 엔진으로서 실시간 데이터를 빠르게 처리하고 즉시 사용 가능한 분산 처리 기능을 제공할 수 있습니다. 널리 사용되는 개발 언어인 PHP는 우수한 언어 기능과 확장 라이브러리를 사용하여 Kafka Stream 데이터 처리를 구현할 수도 있습니다.

이 기사에서는 PHP를 사용하여 Kafka Stream의 실시간 데이터 처리를 개발하는 방법을 소개하고, 예제를 사용하여 PHP를 사용하여 관찰자 모드에서 생성된 실시간 데이터를 분석하는 방법을 보여줍니다.

  1. Kafka Stream 소개

Kafka Stream은 실시간 데이터를 안정적으로 처리하고 기본적으로 분산 처리 기능을 제공할 수 있는 빠르고 안정적인 스트림 컴퓨팅 엔진입니다. Kafka Stream은 Kafka 주제의 메시지를 소비하고 처리를 위해 애플리케이션으로 보낸 다음 처리된 결과를 다시 Kafka 주제로 보내는 효율적이고 유연한 데이터 처리 방법입니다.

  1. PHP와 Kafka Stream의 통합

PHP에서는 Kafka Stream에서 공식적으로 제공하는 Kafka-PHP 라이브러리를 통해 PHP 애플리케이션을 Kafka Stream과 쉽게 통합할 수 있습니다. 다음은 Kafka-PHP 라이브러리에서 지원하는 Kafka Stream 버전입니다.

  • Kafka 0.10.x
  • Kafka 0.11.x
  • Kafka 1.0.x
  • Kafka 1.1.x
  • Kafka 2.0.x
  • Kafka 2 .1.x
  • Kafka 2.2.x

Kafka-PHP 라이브러리는 다음과 같은 핵심 기능을 제공합니다.

  • Producer: Kafka 메시지를 생성하여 지정된 주제로 보내는 기능을 제공합니다.
  • Consumer: Kafka 메시지를 소비하는 기능을 제공하고 자동 제출 및 수동 제출을 지원합니다.
  • Manager: Kafka 주제 및 파티션을 생성하고 삭제하는 기능을 제공합니다.

또한 Kafka-PHP 라이브러리는 PHP의 Swoole 확장에 대한 지원도 제공합니다. Swoole 확장을 사용하면 PHP 애플리케이션의 성능을 더욱 향상시킬 수 있습니다.

  1. 관찰자 패턴

관찰자 패턴은 객체 간의 일대다 종속 관계를 정의하는 동작 설계 패턴입니다. 객체의 상태가 변경되면 이에 의존하는 모든 객체가 알림을 받고 자동으로 업데이트됩니다. 관찰자 패턴은 이벤트 모니터링, UI 프로그래밍 및 기타 분야에서 널리 사용되며 효율적인 메시지 전달 및 처리를 달성할 수 있습니다.

  1. Kafka Stream의 관찰자 모드 데이터 처리 구현

다음에서는 샘플 코드를 사용하여 PHP를 사용하여 Kafka Stream의 실시간 데이터 처리를 개발하고 관찰자 모드를 데이터 분석에 적용하는 방법을 보여줍니다.

4.1 Kafka 생산자 구현

먼저 Kafka 주제에 메시지를 보낼 생산자를 만들어야 합니다. 다음은 간단한 Kafka 생산자 샘플 코드입니다.

<?php
require_once __DIR__ . '/vendor/autoload.php';

use RdKafkaConf;
use RdKafkaProducer;
use RdKafkaProducerTopic;

$conf = new Conf();
$conf->set('metadata.broker.list', 'kafka:9092');
$producer = new Producer($conf);
$topic = $producer->newTopic('topic1');
for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
}
?>

위 코드에서는 RdKafka 확장 라이브러리에서 제공하는 Producer 클래스를 사용하여 Kafka 생산자를 구현하고 'topic1'이라는 Kafka 주제에 메시지를 보냅니다. Kafka 생산자를 구현할 때 Kafka 클러스터가 올바르게 연결될 수 있도록 Kafka 클러스터의 연결 구성 설정에 주의해야 합니다.

4.2 Kafka 소비자 구현

다음으로 Kafka 주제의 데이터를 소비하기 위해 Kafka 소비자를 생성해야 합니다. 다음은 간단한 Kafka 소비자 샘플 코드입니다.

<?php
require_once __DIR__ . '/vendor/autoload.php';

use RdKafkaConf;
use RdKafkaConsumer;
use RdKafkaTopicPartition;

$conf = new Conf();
$conf->set('metadata.broker.list', 'kafka:9092');
$consumer = new Consumer($conf);
$consumer->addBrokers('kafka:9092');
$topic = $consumer->newTopic('topic1');
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume(0, 1000);
    if ($message === null) {
        continue;
    }
    if ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) {
        echo "Received message: {$message->payload}
";
    }
}

$consumer->close();
?>

위 코드에서는 RdKafka 확장 라이브러리에서 제공하는 Consumer 클래스를 사용하여 Kafka 소비자를 구현하고 'topic1'이라는 Kafka 주제의 데이터를 소비하며 데이터가 인쇄됩니다. 콘솔에. Kafka 소비자를 구현할 때 소비 주제와 소비 시작 오프셋을 설정해야 합니다.

4.3 관찰자 패턴 구현

이제 Kafka 주제의 데이터를 사용할 수 있지만 관찰자 패턴을 사용하여 데이터를 분석하는 방법은 무엇입니까? 다음은 간단한 관찰자 패턴 샘플 코드입니다.

<?php
require_once __DIR__ . '/vendor/autoload.php';

use SplObserver;
use SplSubject;

class Producer implements SplSubject
{
    private array $observers = [];

    public function attach(SplObserver $observer):void
    {
        array_push($this->observers, $observer);
    }

    public function detach(SplObserver $observer):void
    {
        if (($key = array_search($observer, $this->observers, true)) !== false) {
            array_splice($this->observers, $key, 1);
        }
    }

    public function notify():void
    {
        foreach ($this->observers as $observer) {
            $observer->update($this);
        }
    }

    public function produce(string $message):void
    {
        echo "Producing message: {$message}
";
        $this->notify();
    }
}

class Consumer implements SplObserver
{
    public function update(SplSubject $subject):void
    {
        echo "Consuming message: {$subject}
";
    }
}

$producer = new Producer();
$producer->attach(new Consumer());
$producer->produce('Message 1');
?>

위 코드에서는 Producer라는 메인 클래스를 정의하고 SplSubject 인터페이스를 구현하며 관찰자 관리 메서드인 Attach, Detach, Notify 및 Production을 제공합니다. 또한 Consumer라는 관찰자 클래스를 정의하고, SplObserver 인터페이스를 구현하고, 메시지 처리를 위한 업데이트 방법을 제공했습니다. 마지막으로 Producer 인스턴스를 생성하고 Consumer 인스턴스를 관찰자로 연결하고 생성 메서드를 한 번 실행하고 Consumer의 업데이트 메서드를 트리거했습니다.

4.4 Kafka Stream의 관찰자 모드 데이터 처리 구현

마지막으로 이전 세 단계의 코드를 결합하여 Kafka Stream의 관찰자 모드 데이터 처리를 구현합니다. 다음은 간단한 Kafka Stream 데이터 처리 샘플 코드입니다.

<?php
require_once __DIR__ . '/vendor/autoload.php';

use RdKafkaConf;
use RdKafkaConsumer;
use RdKafkaProducer;
use RdKafkaTopicPartition;
use SplSubject;
use SplObserver;

class KafkaStream implements SplSubject
{
    private array $observers;
    private Conf $conf;
    private Producer $producer;
    private Consumer $consumer;

    public function __construct(string $bootstrap_servers)
    {
        $this->conf = new Conf();
        $this->conf->set('metadata.broker.list', $bootstrap_servers);
        $this->producer = new Producer($this->conf);
        $this->consumer = new Consumer($this->conf);
        $this->observers = [];
    }

    public function attach(SplObserver $observer):void
    {
        array_push($this->observers, $observer);
    }

    public function detach(SplObserver $observer):void
    {
        if (($key = array_search($observer, $this->observers, true)) !== false) {
            array_splice($this->observers, $key, 1);
        }
    }

    public function notify():void
    {
        foreach ($this->observers as $observer) {
            $observer->update($this);
        }
    }

    public function produce(string $message, string $topic):void
    {
        echo "Producing message: {$message}
";
        $this->producer->newTopic($topic)->produce(RD_KAFKA_PARTITION_UA, 0, $message);
        $this->notify();
    }

    public function consume(string $topic):void
    {
        $topic_partition = new TopicPartition($topic, 0);
        $this->consumer->assign([$topic_partition]);
        $this->consumer->seek($topic_partition, 0);

        while (true) {
            $message = $this->consumer->consume(0, 1000);
            if ($message === null) {
                continue;
            }
            if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
                echo "Error: {$message->errstr()}, exiting.
";
                break;
            }
            echo "Consuming message: {$message->payload}
";
        }

        $this->consumer->close();
    }
}

class Consumer implements SplObserver
{
    public function update(SplSubject $subject):void
    {
        echo "Processing message: {$subject}
";
    }
}

$bootstrap_servers = 'kafka:9092';
$kafka_stream = new KafkaStream($bootstrap_servers);
$kafka_stream->attach(new Consumer());
$kafka_stream->produce('Message 1', 'topic1');
$kafka_stream->consume('topic1');
?>

위 코드에서는 KafkaStream이라는 클래스를 정의하고 SplSubject 인터페이스를 구현하며 Kafka Stream 처리 핵심 메서드인 생산 및 소비와 관찰자 관리 메서드를 제공합니다. 연결, 분리, 알림. 또한 Consumer라는 관찰자 클래스를 정의하고, SplObserver 인터페이스를 구현하고, 메시지 처리를 위한 업데이트 방법을 제공했습니다. 마지막으로 KafkaStream 인스턴스를 생성하고 Consumer 인스턴스를 관찰자로 연결한 후 생성 메서드를 한 번 실행하여 메시지를 생성하고 소비 메서드에서 메시지를 소비하고 처리합니다.

  1. Summary

이 기사에서는 PHP를 사용하여 Kafka Stream의 실시간 데이터 처리를 개발하는 방법을 소개하고 관찰자 패턴을 사용하여 실시간 데이터를 분석하는 방법을 보여줍니다. Kafka Stream과 Observer 패턴은 대규모 실시간 데이터를 빠르게 처리하고 효율적인 메시지 전달 및 처리를 달성하는 데 도움이 되는 강력한 도구 조합입니다.

위 내용은 PHP는 오픈 소스 Kafka Stream 실시간 데이터 처리를 구현합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.