首頁  >  文章  >  後端開發  >  基於RabbitMQ和Swoole實現的一個完整的非同步任務系統

基於RabbitMQ和Swoole實現的一個完整的非同步任務系統

PHPz
PHPz原創
2017-04-04 16:04:133753瀏覽

從最開始的使用redis實現的單一進程消費的非同步任務系統到加入swoole的多進程消費模式,現在,我們的非同步任務系統終於又能邁進一步。完善,包括多進程消費,異常重試等。

#從圖中可以看到,我們這個系統是一個基於

事件

的非同步任務系統。 ##查詢

事件下有哪些任務,然後將這些任務丟到對應的

佇列

中,最後由消費者消費任務佇列中的任務。主要分為三大部分基於RabbitMQ和Swoole實現的一個完整的非同步任務系統
1.事件生產者,即產生訊息事件的一方。者(Worker),負責消費任務佇列中的任務。

任務調度器

調度器主要做兩件事,一是註冊事件,另一個是調度任務。就註冊了兩個事件,事件下各有一個任務。 #重頭戲來了,一個非同步任務系統最重要的就是消費端了,現在讓我們來看下Worker的流程圖。完整的消費流程

可以看到,在這裡我們採用了兩個交換器和兩個佇列,一個負責處理正常的任務即ntask,另一個負責處理需要延遲執行的任務即dtask。描述下一個任務的生命週期。根據topic將任務分發到對應的佇列中3、子程序ntask阻塞等待成功取得到task,並執行該任務4、執行失敗,需要重試時拋出RetryException,不需要重試時拋出TaskException5、子程序ntask捕獲到重試異常將任務拋給延遲任務的交換器Exchange[ebats_core_dtask]6、將任務執行資訊回調給上層開發者以便保存查看

延遲任務

1、子程序dtask阻塞等待成功取得到task,並執行該任務
2、執行失敗,需要重試時拋出RetryException,不需要重試時拋出TaskException
3、子程序dtask捕獲到重試異常將任務拋給延遲任務的交換器Exchange[ebats_core_dtask]

4、將任務執行資訊回調給上層開發者以便保存查看

#消費者程式碼如下:

<?php
require_once DIR.&#39;/../autoload.php&#39;;
use Asynclib\Ebats\Event;
try{
    $event = new Event(&#39;order_paied&#39;);  //定义事件
    $event->setOptions(['order_id' => 'FB138020392193312']); //事件产生的参数
    $event->publish();
}catch (Exception $exc){
    echo $exc->getMessage();
}
自訂調度器

一般來說這是一個基於事件的任務系統,那麼能不能直接產生任務呢。答案是肯定的。

只需要建立一個自訂調度器,由您自行實作調度邏輯,最終產生一個任務即可。程式碼如下:

//注册事件
EventManager::register('order_create', 'closeOrder', 'demo', 10);//关闭未付款订单(延迟任务)
EventManager::register('order_paied', 'virtualShipping', 'demo'); //虚拟商品自动发货
這樣,當接收到訊息時就會產生一個orderAsync的任務,您只需要啟動一個用來消費這個Topic的Worker即可。

也許你會覺得這裡直接寫業務邏輯的程式碼就可以了,實際上也確實可以。當你可以忍受一個行程慢慢消費的時候是可以這樣做的。但大多數情況下我們還是希望它能夠盡快的消費掉,所以建議這裡只負責創建任務,具體任務的業務邏輯由worker去執行。

以上是基於RabbitMQ和Swoole實現的一個完整的非同步任務系統的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn