ホームページ  >  記事  >  バックエンド開発  >  RabbitMQ と Swoole に基づく完全な非同期タスク システム

RabbitMQ と Swoole に基づく完全な非同期タスク システム

PHPz
PHPzオリジナル
2017-04-04 16:04:133753ブラウズ

httpredisを使用して実装された初期の単一プロセス消費の非同期タスクシステムから、swooleを追加したマルチプロセス消費モデルまで、これで、私たちの非同期タスク システムはついに新たな一歩を踏み出すことができます

前の 2 つの単純なシステムの経験のおかげで、今回は、RabbitMQ に基づく非同期タスク システムが、マルチプロセスの消費、例外の再試行などを含めてより完全になりました。

システム紹介

RabbitMQ と Swoole に基づく完全な非同期タスク システム

コンシューマ

アーキテクチャ写真

写真からわかるように、私たちのシステムは

イベントに基づいた非同期タスクシステムです。つまり、イベントが発生するとプロデューサーが起動します。イベントはスケジューラにスローされ、スケジューラはイベントの下にどのようなタスクがあるかをクエリし、これらのタスクを対応するキューにスローし、最後にタスクキュー内のタスクがコンシューマによって消費されます

システム全体は主に 3 つの部分で構成されます。 1. メッセージ イベントを生成する側であるイベント プロデューサー。 2. イベントの登録とタスクのスケジュールを担当するタスク スケジューラー (スケジューラー)。 )、タスク キューの消費を担当します。

イベント プロデューサー


イベント プロデューサーは、次のとおりです。

<?php
require_once DIR.&#39;/../autoload.php&#39;;
use Asynclib\Ebats\Event;
try{
    $event = new Event(&#39;order_paied&#39;);  //定义事件
    $event->setOptions(['order_id' => 'FB138020392193312']); //事件产生的参数
    $event->publish();
}catch (Exception $exc){
    echo $exc->getMessage();
}
タスク スケジューラー

。は主に 2 つのことを行います。1 つはイベントの登録で、もう 1 つはタスクのスケジュールです。

登録イベントのコードは次のとおりです:

//注册事件
EventManager::register('order_create', 'closeOrder', 'demo', 10);//关闭未付款订单(延迟任务)
EventManager::register('order_paied', 'virtualShipping', 'demo'); //虚拟商品自动发货

このようにして、2 つのイベントがそれぞれ 1 つのタスクで登録されます。コードの具体的なスケジュール部分は非常に簡単なので、詳しくは説明しません。興味があれば読んでください。

コンシューマ

非同期について最も重要なことはここです。タスク システムはコンシューマーです。ここでワーカーのフローチャートを見てみましょう

ここでは 2 つのスイッチと 2 つのキューを使用しており、1 つは通常のタスク、つまり ntask の処理を​​担当します。もう 1 つは、遅延する必要があるタスクの処理を担当します。つまり、次のタスク

のライフ サイクル

を簡単に記述します。 タスクが生成され、通常のタスク

の交換 [ebats_core_ntask] に入ります。 2. エクスチェンジャーは、トピックに従ってタスクを対応するキューに分配します

3. サブプロセス ntask は、タスクが正常に取得されるのを待ち、実行します。再試行すると、RetryException がスローされます。 5. サブプロセス ntask が再試行例外をキャッチし、遅延タスクの Exchange [ebats_core_dtask] にタスクをスローします。保存および表示用のレベル開発者

遅延タスクRabbitMQ と Swoole に基づく完全な非同期タスク システム
1. サブプロセス dtask はブロックされ、タスクが正常に取得されるまで待機し、タスクを実行します

2. 再試行が必要な場合は、実行が失敗し、RetryException がスローされます。再試行時に TaskException がスローされます

3. サブプロセス dtask が再試行例外をキャプチャし、タスクを遅延タスク エクスチェンジャー Exchange [ebats_core_dtask] にスローします。

4. タスク実行情報を保存および表示するために上位レベルの開発者にコールバックします。

コンシューマコードは次のとおりです:

require_once DIR.'/../autoload.php';
require_once DIR.'/task/TaskDemoModel.php';
use Asynclib\Ebats\Worker;

//执行结果回调函数
$callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){

};
$worker = new Worker($callback);  //支持多进程消费默认为1
$worker->setQueue('demo');  //队列名和事件的topic一一对应
$worker->run();
カスタムスケジューラ
一般的に言えば、これはイベントベースのタスクシステムなので、タスクを直接生成できますか?答えは「はい」です。


カスタム スケジューラを作成し、自分でスケジュール ロジックを実装し、最後にタスクを生成するだけです。コードは次のとおりです:

<?php
require_once DIR.&#39;/../autoload.php&#39;;
use Asynclib\Ebats\Task;
use Asynclib\Core\Consumer;
use Asynclib\Amq\ExchangeTypes;
use Asynclib\Exception\ExceptionInterface;

/** 
 * 本示例演示了如何创建一个自定义调度器,开发者可以根据自身需求开发自己的任务调度器
 */
try{
    $worker = new Consumer();
    $worker->setExchange('order_fanout', ExchangeTypes::TOPIC);
    $worker->setQueue('shzf_order_paied', ['*.*.WAIT_SELLER_SEND_GOODS']);
    $worker->run(function($key, $msg){
        $order_data = json_encode($msg);
        echo " [$key] $order_data \n";
        Task::create('demo', 'orderAsync', $msg);//创建任务,之后消息将作为参数由任务接管处理
    });
}catch (ExceptionInterface $exc){
    echo $exc->getMessage();
}

このようにして、メッセージを受信したときに orderAsync タスクが生成されます。このトピックを使用するには、ワーカーを開始するだけです。

おそらくここにビジネス ロジック コードを直接記述するだけで十分だと思われるでしょうが、実際にそれは可能です。プロセスの消費が遅いことを許容できる場合は、これを行うことができます。ただし、ほとんどの場合、できるだけ早く消費する必要があるため、ここではタスクのみを作成し、特定のタスクのビジネス ロジックはワーカーによって実行されることをお勧めします。

以上がRabbitMQ と Swoole に基づく完全な非同期タスク システムの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。