Maison  >  Article  >  développement back-end  >  Intégration PHP et Apache Kafka pour une mise en file d'attente et une distribution efficaces des messages

Intégration PHP et Apache Kafka pour une mise en file d'attente et une distribution efficaces des messages

WBOY
WBOYoriginal
2023-06-25 09:48:441716parcourir

Avec le développement continu des applications Internet modernes, de plus en plus d'applications doivent gérer de grandes quantités de données. La manière traditionnelle de gérer ces communications de données consiste à utiliser l'interrogation ou le blocage des E/S, mais ces méthodes ne peuvent plus répondre aux besoins des applications modernes car elles sont très inefficaces. Afin de résoudre ce problème, l’industrie a développé une technologie appelée système de file d’attente et de distribution des messages.

Dans le système de file d'attente et de distribution des messages, le producteur du message envoie le message à la file d'attente, tandis que le consommateur du message obtient le message de la file d'attente et effectue les opérations correspondantes. Cette approche peut grandement améliorer l'efficacité de la communication des données, car elle permet d'éviter des problèmes tels que l'interrogation et le blocage des E/S.

Dans cet article, nous verrons comment obtenir une mise en file d'attente et une distribution efficaces des messages à l'aide de l'intégration PHP et Apache Kafka.

Introduction à Apache Kafka

Apache Kafka est un système de messagerie distribué évolutif à haut débit et à faible latence. Il peut gérer de gros volumes de messages et évoluer horizontalement pour s’adapter à des charges plus élevées. Les principaux composants d'Apache Kafka incluent :

  1. Broker : chaque nœud du cluster Kafka est un courtier, et il est responsable du stockage et du transfert des messages.
  2. Sujet : Chaque message doit être attribué à un sujet, qui est un concept logique de production et de consommation de messages.
  3. Partition : chaque sujet peut être divisé en plusieurs partitions et chaque partition contient plusieurs messages ordonnés.
  4. Producteur : Producteur de messages, envoie des messages au courtier.
  5. Consommateur : Consommateur de messages, lit les messages du courtier.
  6. Groupe de consommateurs : un groupe de consommateurs consomme conjointement des messages dans une ou plusieurs partitions.
  7. Offset : Le numéro du message, utilisé pour identifier de manière unique un message.

PHP intégré à Apache Kafka

Pour utiliser Apache Kafka, nous devons utiliser l'extension Kafka pour PHP. Cette extension fournit toutes les API dont PHP a besoin pour faire fonctionner Kafka.

Tout d'abord, nous devons installer l'extension Kafka, que nous pouvons installer depuis PECL :

pecl install kafka

Après avoir installé l'extension, vous pouvez commencer à l'utiliser. Voici un exemple simple de production et de consommation de messages à l'aide de PHP et Apache Kafka :

<?php
$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka生产者
$producer = new RdKafkaProducer();
$producer->setLogLevel(LOG_DEBUG);
$producer->addBrokers($brokers);

// 创建一个Kafka消费者
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$consumer = new RdKafkaConsumer($conf);
$consumer->addBrokers($brokers);

// 生产消息
$topicProducer = $producer->newTopic($topic);
for ($i = 0; $i < 10; $i++) {
    $topicProducer->produce(RD_KAFKA_PARTITION_UA, 0, 'Message ' . $i);
}

// 消费消息
$topicConsumer = $consumer->newTopic($topic);
$topicConsumer->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
    $message = $topicConsumer->consume(0, 1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }
    echo $message->payload . PHP_EOL;
}

Dans cet exemple, nous créons d'abord un producteur Kafka et un consommateur Kafka. Ensuite, chez le producteur, nous avons envoyé 10 messages au sujet spécifié ; chez le consommateur, nous avons consommé les messages du sujet spécifié et affiché leur contenu.

À ce stade, nous avons implémenté avec succès une production et une consommation de messages simples à l'aide de PHP et Apache Kafka. Nous verrons ensuite comment implémenter des fonctionnalités plus avancées à l'aide de PHP et Apache Kafka.

Exemples d'applications avancées

Dans les applications réelles, nous devons généralement implémenter certaines fonctions avancées, telles que :

  1. Distribution de messages : envoyer des messages à des consommateurs désignés.
  2. Groupe de consommateurs : permet à plusieurs consommateurs de consommer conjointement des messages dans un ou plusieurs sujets.
  3. configuration offset : permet de contrôler où les messages sont lus.

Ici, nous discuterons de la façon de mettre en œuvre ces fonctions.

Distribution de messages

Dans les applications pratiques, nous devons généralement contrôler le flux de messages. Par exemple, nous pouvons souhaiter que seuls certains consommateurs consomment certains messages. Pour réaliser cette fonctionnalité, nous pouvons créer une file d'attente pour chaque consommateur, puis attribuer des messages spécifiques à des files d'attente spécifiques.

Voici un exemple qui utilise deux consommateurs pour consommer deux tâches différentes.

<?php

$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka消费者组
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$consumer = new RdKafkaKafkaConsumer($conf);
$consumer->subscribe([$topic]);

// 创建两个Kafka生产者,一个生产者用于向消费者1发送消息,另一个生产者用于向消费者2发送消息
$producer1 = new RdKafkaProducer();
$producer1->addBrokers($brokers);
$producer1Topic = $producer1->newTopic($topic . '_1');

$producer2 = new RdKafkaProducer();
$producer2->addBrokers($brokers);
$producer2Topic = $producer2->newTopic($topic . '_2');

// 消费消息
while (true) {
    $message = $consumer->consume(1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }

    echo 'Received message: ' . $message->payload . PHP_EOL;

    // 根据消息内容分配给不同的生产者
    if ($message->payload === 'task1') {
        $producer1Topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->payload);
    } elseif ($message->payload === 'task2') {
        $producer2Topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->payload);
    }
}

Dans cet exemple, nous utilisons deux producteurs pour distribuer des messages à deux consommateurs différents. Lorsqu'un consommateur reçoit un message, nous pouvons l'attribuer à un producteur spécifique en fonction du contenu du message. Cette méthode peut nous aider à contrôler le flux des messages et à éviter un traitement redondant des messages.

Groupe de consommateurs

Chez les consommateurs Kafka ordinaires, différents consommateurs du même groupe consomment ensemble le même sujet et recevront le même message. En effet, Kafka équilibre automatiquement les partitions et garantit que chaque partition est traitée par un seul consommateur.

En PHP, nous pouvons utiliser group.id pour regrouper les consommateurs afin d'implémenter la fonction de groupes de consommateurs.

Ce qui suit est un exemple d'un groupe de consommateurs Kafka qui peut traiter les messages au sein du même groupe en parallèle :

<?php

$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka消费者组
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$conf->set('metadata.broker.list', $brokers);
$conf->set('enable.auto.commit', 'false');
$consumer = new RdKafkaKafkaConsumer($conf);

// 添加需要订阅的topic
$consumer->subscribe([$topic]);

// 处理消息
while (true) {
    $message = $consumer->consume(1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }

    echo 'Received message: ' . $message->payload . PHP_EOL;

    // 处理完消息后手动提交offset
    $consumer->commit();
}

Dans cet exemple, nous créons un groupe de consommateurs Kafka et ajoutons des sujets auxquels il faut s'abonner. On peut alors traiter les messages au sein d’un même groupe en parallèle.

Remarque : dans un groupe de consommateurs, plusieurs consommateurs consomment une ou plusieurs partitions ensemble. Lors de la consommation de données, vous devez faire attention au problème du traitement multithread des mêmes données.

Configuration du décalage

Dans Kafka, chaque partition a un décalage indépendant. Le consommateur peut contrôler où il lit dans la partition et donc quels messages il lit. Le consommateur peut commencer la lecture à partir du dernier message ou du dernier message.

En PHP, on peut utiliser le offset pour contrôler la position de lecture des messages. Voici un exemple de configuration de décalage :

<?php

$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka消费者
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$consumer = new RdKafkaKafkaConsumer($conf);

// 订阅topic
$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'earliest');
$topic = $consumer->newTopic($topic, $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

// 消费消息
while (true) {
    $message = $topic->consume(0, 1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }

    echo 'Received message: ' . $message->payload . PHP_EOL;
}

Dans cet exemple, nous utilisons auto.offset.reset pour définir la configuration de décalage. Cette configuration indique au consommateur de commencer à consommer les messages dès le décalage le plus précoce.

Dans les applications pratiques, différents décalages peuvent être configurés en fonction des besoins. Par exemple, après que le producteur n'a pas réussi à traiter certains messages, nous devrons peut-être recommencer la lecture des messages à partir du point où le message ayant échoué a été traité précédemment.

Conclusion

Dans cet article, nous avons expliqué comment obtenir une mise en file d'attente et une distribution efficaces des messages à l'aide de l'intégration PHP et Apache Kafka. Nous avons d'abord présenté les bases d'Apache Kafka, puis expliqué comment utiliser l'extension Kafka pour PHP pour implémenter la production et la consommation de messages. Enfin, nous avons expliqué comment implémenter certaines fonctionnalités avancées telles que la distribution des messages, les groupes de consommateurs et la configuration du décalage.

L'utilisation de PHP et de l'intégration Apache Kafka nous permet de mettre en œuvre une mise en file d'attente et une distribution efficaces des messages, améliorant ainsi la vitesse de réponse et le débit des applications. Si vous développez une application devant gérer de grandes quantités de données, Apache Kafka et l'extension Kafka pour PHP peuvent constituer un bon choix.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn