>  기사  >  백엔드 개발  >  효율적인 메시지 대기열 및 배포를 위한 PHP 및 Apache Kafka 통합

효율적인 메시지 대기열 및 배포를 위한 PHP 및 Apache Kafka 통합

WBOY
WBOY원래의
2023-06-25 09:48:441709검색

최신 인터넷 애플리케이션의 지속적인 개발로 인해 점점 더 많은 애플리케이션에서 대량의 데이터 통신을 처리해야 합니다. 이러한 데이터 통신을 처리하는 전통적인 방법은 폴링 또는 차단 I/O를 사용하는 것이지만 이러한 방법은 매우 비효율적이므로 더 이상 최신 애플리케이션의 요구 사항을 충족할 수 없습니다. 이러한 문제를 해결하기 위해 업계에서는 메시지 큐 및 분산 시스템이라는 기술을 개발해 왔습니다.

메시지 대기열 및 배포 시스템에서 메시지 생산자는 메시지를 대기열로 보내고, 메시지 소비자는 대기열에서 메시지를 가져와 해당 작업을 수행합니다. 이 접근 방식은 I/O 폴링 및 차단과 같은 문제를 피할 수 있으므로 데이터 통신의 효율성을 크게 향상시킬 수 있습니다.

이 기사에서는 PHP와 Apache Kafka 통합을 사용하여 효율적인 메시지 대기열 및 배포를 달성하는 방법에 대해 설명합니다.

Apache Kafka 소개

Apache Kafka는 처리량이 높고 지연 시간이 짧으며 확장 가능한 분산 메시징 시스템입니다. 대량의 메시지를 처리하고 더 높은 로드를 수용할 수 있도록 수평으로 확장할 수 있습니다. Apache Kafka의 주요 구성 요소는 다음과 같습니다.

  1. Broker: Kafka 클러스터의 각 노드는 브로커이며 메시지 저장 및 전달을 담당합니다.
  2. 주제: 각 메시지는 메시지 생산 및 소비의 논리적 개념인 주제에 할당되어야 합니다.
  3. 파티션: 각 주제는 여러 파티션으로 나눌 수 있으며, 각 파티션에는 순서가 지정된 여러 메시지가 포함되어 있습니다.
  4. Producer: 메시지 생산자, 브로커에게 메시지를 보냅니다.
  5. Consumer: 메시지 소비자, 브로커의 메시지를 읽습니다.
  6. 소비자 그룹: 소비자 그룹은 하나 이상의 파티션에서 메시지를 공동으로 소비합니다.
  7. 오프셋: 메시지를 고유하게 식별하는 데 사용되는 메시지 번호입니다.

Apache Kafka와 통합된 PHP

Apache Kafka를 사용하려면 PHP용 Kafka 확장 프로그램을 사용해야 합니다. 이 확장은 PHP가 Kafka를 작동하는 데 필요한 모든 API를 제공합니다.

먼저 PECL에서 설치할 수 있는 Kafka 확장을 설치해야 합니다.

pecl install kafka

확장을 설치한 후 사용을 시작할 수 있습니다. 다음은 PHP와 Apache Kafka를 사용한 메시지 생성 및 소비의 간단한 예입니다.

<?php
$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka生产者
$producer = new RdKafkaProducer();
$producer->setLogLevel(LOG_DEBUG);
$producer->addBrokers($brokers);

// 创建一个Kafka消费者
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$consumer = new RdKafkaConsumer($conf);
$consumer->addBrokers($brokers);

// 生产消息
$topicProducer = $producer->newTopic($topic);
for ($i = 0; $i < 10; $i++) {
    $topicProducer->produce(RD_KAFKA_PARTITION_UA, 0, 'Message ' . $i);
}

// 消费消息
$topicConsumer = $consumer->newTopic($topic);
$topicConsumer->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
    $message = $topicConsumer->consume(0, 1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }
    echo $message->payload . PHP_EOL;
}

이 예에서는 먼저 Kafka 생산자와 Kafka 소비자를 만듭니다. 그런 다음 생산자에서는 지정된 주제로 10개의 메시지를 보냈고 소비자에서는 지정된 주제의 메시지를 소비하고 해당 내용을 출력했습니다.

이 시점에서 우리는 PHP와 Apache Kafka를 사용하여 간단한 메시지 생성 및 소비를 성공적으로 구현했습니다. 다음으로 PHP와 Apache Kafka를 사용하여 고급 기능을 구현하는 방법에 대해 설명하겠습니다.

고급 애플리케이션 예시

실제 애플리케이션에서는 일반적으로 다음과 같은 일부 고급 기능을 구현해야 합니다.

  1. 메시지 배포: 지정된 소비자에게 메시지를 보냅니다.
  2. 소비자 그룹: 여러 소비자가 하나 이상의 주제에 대한 메시지를 공동으로 소비할 수 있습니다.
  3. 오프셋 구성: 메시지를 읽는 위치를 제어할 수 있습니다.

여기에서는 이러한 기능을 구현하는 방법에 대해 설명합니다.

메시지 배포

실제 애플리케이션에서는 일반적으로 메시지 흐름을 제어해야 합니다. 예를 들어 특정 소비자만 특정 메시지를 소비하기를 원할 수 있습니다. 이 기능을 달성하기 위해 각 소비자에 대한 대기열을 만든 다음 특정 메시지를 특정 대기열에 할당할 수 있습니다.

다음은 두 명의 소비자를 사용하여 두 가지 다른 작업을 소비하는 예입니다.

<?php

$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka消费者组
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$consumer = new RdKafkaKafkaConsumer($conf);
$consumer->subscribe([$topic]);

// 创建两个Kafka生产者,一个生产者用于向消费者1发送消息,另一个生产者用于向消费者2发送消息
$producer1 = new RdKafkaProducer();
$producer1->addBrokers($brokers);
$producer1Topic = $producer1->newTopic($topic . '_1');

$producer2 = new RdKafkaProducer();
$producer2->addBrokers($brokers);
$producer2Topic = $producer2->newTopic($topic . '_2');

// 消费消息
while (true) {
    $message = $consumer->consume(1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }

    echo 'Received message: ' . $message->payload . PHP_EOL;

    // 根据消息内容分配给不同的生产者
    if ($message->payload === 'task1') {
        $producer1Topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->payload);
    } elseif ($message->payload === 'task2') {
        $producer2Topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->payload);
    }
}

이 예에서는 두 명의 생산자를 사용하여 두 명의 다른 소비자에게 메시지를 배포합니다. 소비자가 메시지를 받으면 메시지 내용을 기반으로 특정 생산자에게 메시지를 할당할 수 있습니다. 이 방법을 사용하면 메시지 흐름을 제어하고 메시지의 중복 처리를 방지할 수 있습니다.

Consumer Group

일반적인 Kafka Consumer에서는 같은 그룹에 속한 여러 Consumer가 동일한 주제를 함께 소비하고 동일한 메시지를 받게 됩니다. 이는 Kafka가 자동으로 파티션의 균형을 맞추고 각 파티션이 한 명의 소비자에 의해서만 처리되도록 하기 때문입니다.

PHP에서는 group.id를 사용하여 소비자를 그룹화하여 소비자 그룹의 기능을 구현할 수 있습니다.

다음은 동일한 그룹 내의 메시지를 병렬로 처리할 수 있는 Kafka 소비자 그룹의 예입니다.

<?php

$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka消费者组
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$conf->set('metadata.broker.list', $brokers);
$conf->set('enable.auto.commit', 'false');
$consumer = new RdKafkaKafkaConsumer($conf);

// 添加需要订阅的topic
$consumer->subscribe([$topic]);

// 处理消息
while (true) {
    $message = $consumer->consume(1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }

    echo 'Received message: ' . $message->payload . PHP_EOL;

    // 处理完消息后手动提交offset
    $consumer->commit();
}

이 예에서는 Kafka 소비자 그룹을 생성하고 이를 구독해야 하는 주제를 추가합니다. 그런 다음 동일한 그룹 내의 메시지를 병렬로 처리할 수 있습니다.

참고: 소비자 그룹에서는 여러 소비자가 하나 이상의 파티션을 함께 소비합니다. 데이터를 소비할 때 동일한 데이터를 처리하는 멀티스레딩 문제에 주의해야 합니다.

오프셋 구성

Kafka에서는 각 파티션이 독립적인 오프셋을 갖습니다. 소비자는 파티션에서 읽는 위치와 읽는 메시지를 제어할 수 있습니다. 소비자는 마지막 메시지나 최신 메시지부터 읽기 시작할 수 있습니다.

PHP에서는 오프셋을 사용하여 메시지 읽기 위치를 제어할 수 있습니다. 다음은 오프셋 구성의 예입니다.

<?php

$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka消费者
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$consumer = new RdKafkaKafkaConsumer($conf);

// 订阅topic
$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'earliest');
$topic = $consumer->newTopic($topic, $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

// 消费消息
while (true) {
    $message = $topic->consume(0, 1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }

    echo 'Received message: ' . $message->payload . PHP_EOL;
}

이 예에서는 auto.offset.reset을 사용하여 오프셋 구성을 설정합니다. 이 구성은 소비자에게 가장 빠른 오프셋부터 메시지 소비를 시작하라고 지시합니다.

실제 응용 분야에서는 필요에 따라 다양한 오프셋을 구성할 수 있습니다. 예를 들어 생산자가 일부 메시지를 처리하지 못한 후 실패한 메시지가 이전에 처리된 지점부터 메시지 읽기를 다시 시작해야 할 수도 있습니다.

결론

이 기사에서는 PHP와 Apache Kafka 통합을 사용하여 효율적인 메시지 대기열 및 배포를 달성하는 방법에 대해 논의했습니다. 먼저 Apache Kafka의 기본 사항을 소개한 다음 PHP용 Kafka 확장을 사용하여 메시지 생성 및 소비를 구현하는 방법을 논의했습니다. 마지막으로 메시지 배포, 소비자 그룹, 오프셋 구성과 같은 일부 고급 기능을 구현하는 방법을 논의했습니다.

PHP와 Apache Kafka 통합을 사용하면 효율적인 메시지 대기열 및 배포를 구현하여 애플리케이션 응답 속도와 처리량을 향상시킬 수 있습니다. 대량의 데이터 통신을 처리해야 하는 애플리케이션을 개발하는 경우 Apache Kafka 및 PHP용 Kafka 확장이 좋은 선택일 수 있습니다.

위 내용은 효율적인 메시지 대기열 및 배포를 위한 PHP 및 Apache Kafka 통합의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.