Heim  >  Artikel  >  Backend-Entwicklung  >  Ein vollständiges asynchrones Aufgabensystem basierend auf RabbitMQ und Swoole

Ein vollständiges asynchrones Aufgabensystem basierend auf RabbitMQ und Swoole

PHPz
PHPzOriginal
2017-04-04 16:04:133753Durchsuche

Ausgehend vom anfänglichen asynchronen Aufgabensystem mit Einzelprozessverbrauch, implementiert mit redis Mit dem Zusatz Dank des Multiprozess-Verbrauchsmodells von Swoole kann unser asynchrones Aufgabensystem endlich einen weiteren Schritt nach vorne machen.

Aufgrund der Erfahrung der beiden vorherigen einfachen Systeme ist das asynchrone Aufgabensystem, das auf RabbitMQ basiert, weiter entwickelt , einschließlich Mehrprozessverbrauch, Ausnahmewiederholung usw.

Systemeinführung

Ein vollständiges asynchrones Aufgabensystem basierend auf RabbitMQ und Swoole

VerbraucherseiteArchitekturAbbildung

Wie Sie auf dem Bild sehen können, ist unser System ein asynchrones Aufgabensystem, das auf Ereignissen basiert. Das heißt, wenn ein Ereignis eintritt, wirft der Produzent das Ereignis an den Planer Der Scheduler ist dafür verantwortlich<.>Fragen Sie ab, welche Aufgaben unter dem -Ereignis stehen, werfen Sie diese Aufgaben dann in die entsprechende Warteschlange, und schließlich verbraucht der Verbraucher die Aufgaben in der Aufgabenwarteschlange

im gesamten System.

Ereignisproduzent, der die Nachrichtenereignisse generiert.

2 .
Consumer ist für die Verarbeitung von Aufgaben in der Aufgabenwarteschlange verantwortlich.

Der Ereignisproduzent kann direkt im Geschäftssystem aufgerufen werden 🎜>

Aufgabenplaner

Der Planer erledigt hauptsächlich zwei Dinge: Zum einen werden Ereignisse registriert, zum anderen werden Aufgaben geplant.

<?php
require_once DIR.&#39;/../autoload.php&#39;;
use Asynclib\Ebats\Event;
try{
    $event = new Event(&#39;order_paied&#39;);  //定义事件
    $event->setOptions(['order_id' => 'FB138020392193312']); //事件产生的参数
    $event->publish();
}catch (Exception $exc){
    echo $exc->getMessage();
}
Der Registrierungsereigniscode lautet wie folgt:

Auf diese Weise werden zwei Ereignisse mit jeweils einer Aufgabe registriert

Der spezifische Planungscode ist sehr einfach, daher werde ich nicht auf Details eingehen. Sie können einen Blick auf den Code werfen.

//注册事件
EventManager::register('order_create', 'closeOrder', 'demo', 10);//关闭未付款订单(延迟任务)
EventManager::register('order_paied', 'virtualShipping', 'demo'); //虚拟商品自动发货
Das Wichtigste in einem asynchronen Aufgabensystem ist der Verbraucher.

Ein vollständiger Verbrauchsprozess

Wie Sie sehen, verwenden wir hier zwei Schalter und zwei Warteschlangen, eine ist für die Verarbeitung normaler Aufgaben verantwortlich, nämlich ntask, und der andere ist für die Verarbeitung von Aufgaben verantwortlich, die verzögert werden müssen. Das heißt, dtask Beschreibe kurz den

LebenszyklusEin vollständiges asynchrones Aufgabensystem basierend auf RabbitMQ und Swooleder nächsten Aufgabe

1 . Aufgabe wird generiert und gelangt in den Exchanger Exchange[ebats_core_ntask]

der normalen Task 2. Der Exchanger verteilt die Aufgaben entsprechend dem Thema auf die entsprechenden Warteschlangen

3. Der Unterprozess ntask blockiert und wartet auf die Die Aufgabe muss erfolgreich abgerufen werden und führt die Aufgabe aus.

4. Die Ausführung schlägt fehl und eine RetryException wird ausgelöst, wenn ein erneuter Versuch erforderlich ist. Eine TaskException wird ausgelöst, wenn ein erneuter Versuch erforderlich ist. 5. Der Unterprozess ntask erfasst die Wiederholungsausnahme und wirft die Aufgabe an den Austauscher für verzögerte Aufgaben Exchange [ebats_core_dtask] 6. Ruft die Informationen zur Aufgabenausführung zum Speichern und Anzeigen zurück

Verzögerte Aufgabe

1 Der untergeordnete Prozess dtask blockiert und wartet darauf, dass die Aufgabe erfolgreich abgerufen wird.
2 Die Ausführung schlägt fehl und eine RetryException wird ausgelöst. Es ist kein erneuter Versuch erforderlich.
3 -process dtask erfasst die Wiederholungsausnahme und sendet die Aufgabe an den verzögerten Aufgabenaustauscher Exchange[ebats_core_dtask]
4. Ruft die Aufgabenausführungsinformationen zum Speichern und Anzeigen an den übergeordneten Entwickler zurück

Der Verbrauchercode lautet wie folgt:

Benutzerdefinierter Planer


Im Allgemeinen handelt es sich um ein ereignisbasiertes Aufgabensystem. Können Aufgaben also direkt generiert werden? Die Antwort ist ja.

Sie müssen lediglich einen benutzerdefinierten Planer erstellen, die Planungslogik selbst implementieren und schließlich eine Aufgabe generieren. Der Code lautet wie folgt:

Auf diese Weise wird eine orderAsync-Aufgabe generiert, wenn eine Nachricht empfangen wird. Sie müssen nur einen Worker starten, um dieses Thema zu nutzen.
require_once DIR.'/../autoload.php';
require_once DIR.'/task/TaskDemoModel.php';
use Asynclib\Ebats\Worker;

//执行结果回调函数
$callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){

};
$worker = new Worker($callback);  //支持多进程消费默认为1
$worker->setQueue('demo');  //队列名和事件的topic一一对应
$worker->run();

Vielleicht denken Sie, dass Sie den Geschäftslogikcode einfach hier schreiben können, und das ist tatsächlich möglich. Sie können dies tun, wenn Sie einen langsamen Prozess tolerieren können. In den meisten Fällen möchten wir jedoch, dass es so schnell wie möglich verbraucht wird. Daher wird empfohlen, hier nur Aufgaben zu erstellen und die Geschäftslogik bestimmter Aufgaben von Mitarbeitern auszuführen.

Das obige ist der detaillierte Inhalt vonEin vollständiges asynchrones Aufgabensystem basierend auf RabbitMQ und Swoole. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn