httpredisを使用して実装された初期の単一プロセス消費の非同期タスクシステムから、swooleを追加したマルチプロセス消費モデルまで、これで、私たちの非同期タスク システムはついに新たな一歩を踏み出すことができます
前の 2 つの単純なシステムの経験のおかげで、今回は、RabbitMQ に基づく非同期タスク システムが、マルチプロセスの消費、例外の再試行などを含めてより完全になりました。 システム紹介イベントに基づいた非同期タスクシステムです。つまり、イベントが発生するとプロデューサーが起動します。イベントはスケジューラにスローされ、スケジューラはイベントの下にどのようなタスクがあるかをクエリし、これらのタスクを対応するキューにスローし、最後にタスクキュー内のタスクがコンシューマによって消費されます
システム全体は主に 3 つの部分で構成されます。 1. メッセージ イベントを生成する側であるイベント プロデューサー。 2. イベントの登録とタスクのスケジュールを担当するタスク スケジューラー (スケジューラー)。 )、タスク キューの消費を担当します。イベント プロデューサー
イベント プロデューサーは、次のとおりです。
<?php require_once DIR.'/../autoload.php'; use Asynclib\Ebats\Event; try{ $event = new Event('order_paied'); //定义事件 $event->setOptions(['order_id' => 'FB138020392193312']); //事件产生的参数 $event->publish(); }catch (Exception $exc){ echo $exc->getMessage(); }タスク スケジューラー
登録イベントのコードは次のとおりです:
//注册事件 EventManager::register('order_create', 'closeOrder', 'demo', 10);//关闭未付款订单(延迟任务) EventManager::register('order_paied', 'virtualShipping', 'demo'); //虚拟商品自动发货
コンシューマ
非同期について最も重要なことはここです。タスク システムはコンシューマーです。ここでワーカーのフローチャートを見てみましょう
ここでは 2 つのスイッチと 2 つのキューを使用しており、1 つは通常のタスク、つまり ntask の処理を担当します。もう 1 つは、遅延する必要があるタスクの処理を担当します。つまり、次のタスク
のライフ サイクル3. サブプロセス ntask は、タスクが正常に取得されるのを待ち、実行します。再試行すると、RetryException がスローされます。 5. サブプロセス ntask が再試行例外をキャッチし、遅延タスクの Exchange [ebats_core_dtask] にタスクをスローします。保存および表示用のレベル開発者
遅延タスク
1. サブプロセス dtask はブロックされ、タスクが正常に取得されるまで待機し、タスクを実行します
require_once DIR.'/../autoload.php'; require_once DIR.'/task/TaskDemoModel.php'; use Asynclib\Ebats\Worker; //执行结果回调函数 $callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){ }; $worker = new Worker($callback); //支持多进程消费默认为1 $worker->setQueue('demo'); //队列名和事件的topic一一对应 $worker->run();カスタムスケジューラ 一般的に言えば、これはイベントベースのタスクシステムなので、タスクを直接生成できますか?答えは「はい」です。
カスタム スケジューラを作成し、自分でスケジュール ロジックを実装し、最後にタスクを生成するだけです。コードは次のとおりです:
<?php require_once DIR.'/../autoload.php'; use Asynclib\Ebats\Task; use Asynclib\Core\Consumer; use Asynclib\Amq\ExchangeTypes; use Asynclib\Exception\ExceptionInterface; /** * 本示例演示了如何创建一个自定义调度器,开发者可以根据自身需求开发自己的任务调度器 */ try{ $worker = new Consumer(); $worker->setExchange('order_fanout', ExchangeTypes::TOPIC); $worker->setQueue('shzf_order_paied', ['*.*.WAIT_SELLER_SEND_GOODS']); $worker->run(function($key, $msg){ $order_data = json_encode($msg); echo " [$key] $order_data \n"; Task::create('demo', 'orderAsync', $msg);//创建任务,之后消息将作为参数由任务接管处理 }); }catch (ExceptionInterface $exc){ echo $exc->getMessage(); }
以上がRabbitMQ と Swoole に基づく完全な非同期タスク システムの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。