ホームページ >PHPフレームワーク >Swoole >swoole でタスクプロセスを使用して時間のかかるタスクを処理するにはどうすればよいですか?

swoole でタスクプロセスを使用して時間のかかるタスクを処理するにはどうすればよいですか?

angryTom
angryTom転載
2020-01-27 21:49:592296ブラウズ

この記事では、swooleのタスクプロセスを利用して時間のかかるタスクを処理する方法を紹介しますが、一定の参考になるので、これからswooleのフレームワークを学習する学生の参考になれば幸いです。

swoole でタスクプロセスを使用して時間のかかるタスクを処理するにはどうすればよいですか?

#swoole でタスクプロセスを使用して時間のかかるタスクを処理するにはどうすればよいですか?

swoole には 2 つの主要なプロセス、つまり master main プロセスと manager management プロセスがあることがわかっています。

マスター メイン プロセスにはメイン リアクター スレッドと複数のリアクター スレッドがあり、主な機能は TCP 接続の維持、ネットワーク IO の処理、データの送受信です。

マネージャーはプロセスを管理し、その役割はワーカーとタスクのプロセスをフォークして管理することです。

ワーカープロセスの機能は、リアクタースレッドから渡されたデータを受け取り、データを処理し、処理結果をリアクタースレッドに返すことです。

タスク プロセスの役割は、比較的時間のかかるタスクを処理することです。タスク プロセスはワーカー プロセスから独立しており、クライアント リクエストのワーカー プロセスの処理には影響しません。

1. タスク プロセスの適用シナリオ:

1. 100 万人のユーザーにイベント メールを送信する必要がある特定のイベントなど、比較的時間のかかる大量メール送信。

2. 特定の大きな V の更新をプッシュします。たとえば、大きな V が新しいメッセージを投稿した場合、ファンは時間内に更新を取得する必要があります。

推奨学習: swoole チュートリアル

2. ワーカーとタスクの関係:

1. できることワーカー プロセス内 タスクは task() を呼び出すことによって配信され、タスク プロセスは onTask イベントを通じて配信されたタスクに応答します。

2. タスク プロセスでは、直接返すか、finish() を呼び出すことで、タスクが完了したことをワーカー プロセスに伝えることができます。ワーカー プロセスでは、onFinish イベントを通じてタスクの完了に応答できます。

3. タスクを使用するための前提条件:

1. Server で task_worker_num の数を設定します。

2. サーバーの onTask および onFinish イベント コールバック関数を設定します。

4. task を使用して累積合計を計算する簡単な例

<?php
 
$server = new swoole_server(&#39;0.0.0.0&#39;, 6666);
 
$server->set([
    &#39;worker_num&#39; => 2,
    &#39;task_worker_num&#39; => 16,
]);
 
$server->on(&#39;WorkerStart&#39;, function ($server, $worker_id) {
    //注意这里,我们通过taskworker来判断是task进程还是worker进程
    //需要在worker进程中调用task(),不然会报出警告
    //这里会执行两遍,因为我们设置了worker_num数为2
    if (!$server->taskworker) {
        echo &#39;投递任务开始...&#39;, PHP_EOL;
        //投递32个累加计算任务给16个task进程
        for ($ix = 0; $ix < 32; $ix++) {
            //注意这里的投递是异步的
            $server->task([mt_rand(1, 100), mt_rand(1000, 9999)]);
        }
        echo &#39;投递任务结束...&#39;, PHP_EOL;
    }
});
 
//server服务必须要有onReceive回调
$server->on(&#39;Receive&#39;, function ($server, $fd, $reactor_id, $data) {
 
});
 
//注意,task进程完全是同步阻塞模式的
$server->on(&#39;Task&#39;, function ($server, $task_id, $src_worker_id, $data) {
    echo "task {$task_id} 进程正在工作...", PHP_EOL;
    $start = $data[0];
    $end = $data[1];
    $total = 0;
    for (; $start <= $end; $start++) {
        $total += $start;
    }
    echo "task {$task_id} 进程完成工作...", PHP_EOL;
    return $total;
});
 
$server->on(&#39;Finish&#39;, function ($server, $task_id, $data) {
    echo "task {$task_id} 进程处理完成, 结果为 {$data}", PHP_EOL;
});
 
$server->start();

task() を呼び出してタスクをタスク プールに配信することに注意してください。最下層は順番に各タスクプロセスに配信タスクをクエリします。

配信するタスクの数が onTask の処理速度を超えると、タスク プールがいっぱいになり、ワーカー プロセスがブロックされてしまうため、task_worker_num の数と処理速度を適切に設定する必要があります。

もちろん、指定されたタスク プロセスにタスクを手動で配信することもできます。 task() 関数の第 2 引数には、配信するタスクのプロセス ID を指定できます。ID の範囲は 0 ~ (task_worker_num - 1) です。

5. タスクをセグメント化し、タスク プロセスへの配信を手動で制御します

<?php
 
$server = new swoole_server(&#39;0.0.0.0&#39;, 6666);
 
$server->set([
    &#39;worker_num&#39; => 1,
    &#39;task_worker_num&#39; => 10,
]);
 
$server->on(&#39;WorkerStart&#39;, function ($server, $worker_id) {
    //为了方便演示,把worker_num设置为1,这里只会执行一次
    if (!$server->taskworker) {
        //通过swoole_table共享内存,在不同进程中共享数据
        $server->result = new swoole_table(10240);
        //用于保存task进程完成数量
        $server->result->column(&#39;finish_nums&#39;, swoole_table::TYPE_INT);
        //用于保存最终计算结果
        $server->result->column(&#39;result&#39;, swoole_table::TYPE_INT);
        $server->result->create();
        //计算1000的累加和,并把计算任务分配到10个task进程上
        $num = 1000;
        $step = $num / $server->setting[&#39;task_worker_num&#39;];
        for ($ix = 0; $ix < $server->setting[&#39;task_worker_num&#39;]; $ix++) {
            $start = $ix * $step;
            $server->task([$start, $start + $step], $ix);
        }
    }
});
 
$server->on(&#39;Receive&#39;, function ($server, $fd, $reactor_id, $data) {
 
});
 
//注意,task进程完全是同步阻塞模式的
$server->on(&#39;Task&#39;, function ($server, $task_id, $src_worker_id, $data) {
    echo "task {$task_id} 进程正在工作... 计算 {$data[0]} - {$data[1]} ", PHP_EOL;
    $start = ++$data[0];
    $end = $data[1];
    $total = 0;
    for (; $start <= $end; $start++) {
        $total += $start;
    }
    echo "task {$task_id} 进程完成工作...", PHP_EOL;
    return $total;
});
 
$server->on(&#39;Finish&#39;, function ($server, $task_id, $data) {
    echo "task {$task_id} 进程处理完成, 结果为 {$data}", PHP_EOL;
    $server->result->incr(&#39;finish_nums&#39;, &#39;finish_nums&#39;);
    $server->result->set(&#39;result&#39;, [&#39;result&#39; => $data + $server->result->get(&#39;result&#39;, &#39;result&#39;)]);
 
    if ($server->result->get(&#39;finish_nums&#39;, &#39;finish_nums&#39;) == $server->setting[&#39;task_worker_num&#39;]) {
        echo "最终计算结果:{$server->result->get(&#39;result&#39;, &#39;result&#39;)}", PHP_EOL;
    }
});
 
$server->s
tart();

以上がswoole でタスクプロセスを使用して時間のかかるタスクを処理するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はcsdn.netで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。