Maison  >  Article  >  développement back-end  >  Un système de tâches asynchrones complet basé sur RabbitMQ et Swoole

Un système de tâches asynchrones complet basé sur RabbitMQ et Swoole

PHPz
PHPzoriginal
2017-04-04 16:04:133764parcourir

À partir du système de tâches asynchrones initial de consommation à processus unique implémenté à l'aide de redis Avec l'ajout Grâce au modèle de consommation multi-processus de swoole, notre système de tâches asynchrones peut enfin faire un pas en avant

Grâce à l'expérience des deux systèmes simples précédents, cette fois le système de tâches asynchrones basé sur RabbitMQ est mieux conçu. , y compris la consommation multi-processus, les nouvelles tentatives d'exception, etc.

Introduction au système

Un système de tâches asynchrones complet basé sur RabbitMQ et Swoole

Côté consommateurArchitectureFigure

Comme vous pouvez le voir sur l'image, notre système est un système de tâches asynchrones basé sur des événements C'est-à-dire que lorsqu'un événement se produit, le producteur le renvoie au planificateur, et au planificateur. le planificateur est responsable de<.>Requérez quelles tâches se trouvent sous l'événement , puis jetez ces tâches dans la file d'attente correspondante, et enfin le consommateur consomme les tâches dans la file d'attente des tâches

dans tout le système. Principalement divisé en trois parties

1. Producteur d'événements, qui est la partie qui génère les événements de message

2. .
3. Le consommateur est responsable de la consommation des tâches dans la file d'attente des tâches

Producteur d'événements

Le producteur d'événements peut être appelé directement dans le système métier. 🎜>

Planificateur de tâches

<?php
require_once DIR.&#39;/../autoload.php&#39;;
use Asynclib\Ebats\Event;
try{
    $event = new Event(&#39;order_paied&#39;);  //定义事件
    $event->setOptions(['order_id' => 'FB138020392193312']); //事件产生的参数
    $event->publish();
}catch (Exception $exc){
    echo $exc->getMessage();
}
Le planificateur fait principalement deux choses, l'une consiste à enregistrer des événements et l'autre à planifier des tâches

Le code d'événement d'enregistrement est le suivant :

De cette façon, deux événements sont enregistrés, chacun avec une tâche

//注册事件
EventManager::register('order_create', 'closeOrder', 'demo', 10);//关闭未付款订单(延迟任务)
EventManager::register('order_paied', 'virtualShipping', 'demo'); //虚拟商品自动发货
Le code de programmation spécifique est très simple, je n'entrerai donc pas dans les détails. Si vous êtes intéressé, vous pouvez jeter un œil au code. Consommateur

L'événement principal est ici. La chose la plus importante dans un système de tâches asynchrone est le consommateur. Jetons maintenant un œil à l'organigramme de Worker.

Un processus de consommation complet

Un système de tâches asynchrones complet basé sur RabbitMQ et SwooleComme vous pouvez le constater, nous utilisons ici deux commutateurs et deux files d'attente, l'une est chargée du traitement des tâches normales, à savoir ntask, et l'autre est responsable du traitement des tâches qui doivent être retardées. C'est-à-dire dtâche Décrivez brièvement le
cycle de vie

de la tâche suivante

Tâche normale

1. . La tâche est générée et entre dans l'échangeur Exchange[ebats_core_ntask] de la tâche normale. 2. L'échangeur distribue les tâches dans les files d'attente correspondantes selon le sujet

3. Le sous-processus ntask se bloque et attend le. la tâche doit être obtenue avec succès et exécute la tâche
4. L'exécution échoue et une RetryException est levée lorsqu'une nouvelle tentative est requise. Une TaskException est levée lorsqu'une nouvelle tentative est requise
5. La tâche du sous-processus capture l'exception de nouvelle tentative et lance la tâche à l'échangeur de tâches retardées Exchange [ebats_core_dtask]

6. Rappelle les informations d'exécution de la tâche au développeur de niveau supérieur pour les enregistrer et les visualiser

Tâche retardée

1. la tâche du sous-processus se bloque et attend que la tâche soit obtenue avec succès et exécute la tâche
2. L'exécution échoue et une RetryException est levée lorsqu'une nouvelle tentative est requise. Aucune nouvelle tentative n'est requise. la tâche de sous-processus capture l'exception de nouvelle tentative et renvoie la tâche à l'échangeur de tâches retardé Exchange[ebats_core_dtask]

4 Rappelle les informations d'exécution de la tâche au développeur de niveau supérieur pour les enregistrer et les visualiser
Le consommateur. le code est le suivant :



Planificateur personnalisé

De manière générale, il s'agit d'un système de tâches basé sur des événements, les tâches peuvent-elles donc être générées directement ? La réponse est oui.

Il vous suffit de créer un planificateur personnalisé, de mettre en œuvre vous-même la logique de planification et enfin de générer une tâche. Le code est le suivant :
require_once DIR.'/../autoload.php';
require_once DIR.'/task/TaskDemoModel.php';
use Asynclib\Ebats\Worker;

//执行结果回调函数
$callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){

};
$worker = new Worker($callback);  //支持多进程消费默认为1
$worker->setQueue('demo');  //队列名和事件的topic一一对应
$worker->run();

De cette façon, une tâche orderAsync sera générée lorsqu'un message est reçu. Il vous suffit de démarrer un Worker pour consommer ce sujet.

Peut-être penserez-vous qu'il suffit d'écrire le code de la logique métier directement ici, et en fait c'est effectivement possible. Vous pouvez le faire lorsque vous pouvez tolérer un processus qui se consomme lentement. Mais dans la plupart des cas, nous souhaitons toujours qu'il soit consommé le plus rapidement possible, il est donc recommandé de créer uniquement des tâches ici, et la logique métier des tâches spécifiques est exécutée par les travailleurs.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn