Maison >cadre php >Swoole >Comment utiliser le processus de tâches dans Swoole pour gérer des tâches chronophages ?

Comment utiliser le processus de tâches dans Swoole pour gérer des tâches chronophages ?

angryTom
angryTomavant
2020-01-27 21:49:592320parcourir

Cet article présente la méthode d'utilisation du processus de tâches dans swoole pour gérer des tâches chronophages. J'espère qu'il sera utile aux étudiants qui apprennent le framework swoole !

Comment utiliser le processus de tâches dans Swoole pour gérer des tâches chronophages ?

Comment utiliser le processus de tâches dans swoole pour gérer des tâches chronophages ?

Nous savons qu'il existe deux processus majeurs dans swoole, à savoir le maître processus principal et le manager processus de gestion.

Le processus principal principal aura un thread de réacteur principal et plusieurs threads de réacteur. La fonction principale est de maintenir les connexions TCP, de traiter les E/S du réseau et d'envoyer et de recevoir des données.

Le manager gère les processus et son rôle est de créer et de gérer les processus des travailleurs et des tâches.

La fonction du processus de travail est de recevoir les données transmises par le thread du réacteur, de traiter les données et de renvoyer les résultats du traitement au thread du réacteur.

Le rôle du processus de tâche est de gérer certaines tâches relativement fastidieuses. Le processus de tâche est indépendant du processus de travail et n'affectera pas le traitement des demandes des clients par le processus de travail.

1. Scénarios d'application du processus de tâche :

1. Envoi de masse relativement long, comme un certain événement, qui nécessite l'envoi d'e-mails d'événement à 1 million d'utilisateurs. .

2. Push les mises à jour de certains grands V. Par exemple, si un grand V publie un nouveau message, les fans doivent recevoir les mises à jour à temps.

Apprentissage recommandé : tutoriel swoole

2. La relation entre les travailleurs et les tâches :

1. dans le processus de travail La tâche est livrée en appelant task() et le processus de tâche répond à la tâche livrée via l'événement onTask.

2. Dans le processus de tâche, vous pouvez indiquer au processus de travail que la tâche est terminée en retournant directement ou en appelant finish(). Dans le processus de travail, vous pouvez répondre à l'achèvement de la tâche via l'événement onFinish.

3. Conditions préalables à l'utilisation de la tâche :

1. Configurez le nombre de task_worker_num dans le serveur.

2. Définissez les fonctions de rappel d'événement onTask et onFinish du serveur.

4. Un exemple simple d'utilisation de task pour calculer la somme cumulée

<?php
 
$server = new swoole_server(&#39;0.0.0.0&#39;, 6666);
 
$server->set([
    &#39;worker_num&#39; => 2,
    &#39;task_worker_num&#39; => 16,
]);
 
$server->on(&#39;WorkerStart&#39;, function ($server, $worker_id) {
    //注意这里,我们通过taskworker来判断是task进程还是worker进程
    //需要在worker进程中调用task(),不然会报出警告
    //这里会执行两遍,因为我们设置了worker_num数为2
    if (!$server->taskworker) {
        echo &#39;投递任务开始...&#39;, PHP_EOL;
        //投递32个累加计算任务给16个task进程
        for ($ix = 0; $ix < 32; $ix++) {
            //注意这里的投递是异步的
            $server->task([mt_rand(1, 100), mt_rand(1000, 9999)]);
        }
        echo &#39;投递任务结束...&#39;, PHP_EOL;
    }
});
 
//server服务必须要有onReceive回调
$server->on(&#39;Receive&#39;, function ($server, $fd, $reactor_id, $data) {
 
});
 
//注意,task进程完全是同步阻塞模式的
$server->on(&#39;Task&#39;, function ($server, $task_id, $src_worker_id, $data) {
    echo "task {$task_id} 进程正在工作...", PHP_EOL;
    $start = $data[0];
    $end = $data[1];
    $total = 0;
    for (; $start <= $end; $start++) {
        $total += $start;
    }
    echo "task {$task_id} 进程完成工作...", PHP_EOL;
    return $total;
});
 
$server->on(&#39;Finish&#39;, function ($server, $task_id, $data) {
    echo "task {$task_id} 进程处理完成, 结果为 {$data}", PHP_EOL;
});
 
$server->start();

Notez que nous livrons des tâches au pool de tâches en appelant task() et le swoole la couche inférieure interrogera à tour de rôle les tâches de livraison des requêtes pour chaque processus de tâche.

Lorsque le nombre de tâches que vous livrez dépasse la vitesse de traitement de onTask, cela entraînera le remplissage du pool de tâches, ce qui entraînera le blocage du processus de travail, donc la relation entre le nombre de task_worker_num et la vitesse de traitement doit être réglée de manière appropriée.

Bien sûr, nous pouvons également livrer manuellement la tâche au processus de tâche spécifié. Le deuxième paramètre de la fonction task() peut spécifier l'ID du processus de tâche à livrer, et la plage d'ID est comprise entre 0 et (task_worker_num - 1).

5. Segmentez la tâche et contrôlez manuellement la livraison au processus de tâche

<?php
 
$server = new swoole_server(&#39;0.0.0.0&#39;, 6666);
 
$server->set([
    &#39;worker_num&#39; => 1,
    &#39;task_worker_num&#39; => 10,
]);
 
$server->on(&#39;WorkerStart&#39;, function ($server, $worker_id) {
    //为了方便演示,把worker_num设置为1,这里只会执行一次
    if (!$server->taskworker) {
        //通过swoole_table共享内存,在不同进程中共享数据
        $server->result = new swoole_table(10240);
        //用于保存task进程完成数量
        $server->result->column(&#39;finish_nums&#39;, swoole_table::TYPE_INT);
        //用于保存最终计算结果
        $server->result->column(&#39;result&#39;, swoole_table::TYPE_INT);
        $server->result->create();
        //计算1000的累加和,并把计算任务分配到10个task进程上
        $num = 1000;
        $step = $num / $server->setting[&#39;task_worker_num&#39;];
        for ($ix = 0; $ix < $server->setting[&#39;task_worker_num&#39;]; $ix++) {
            $start = $ix * $step;
            $server->task([$start, $start + $step], $ix);
        }
    }
});
 
$server->on(&#39;Receive&#39;, function ($server, $fd, $reactor_id, $data) {
 
});
 
//注意,task进程完全是同步阻塞模式的
$server->on(&#39;Task&#39;, function ($server, $task_id, $src_worker_id, $data) {
    echo "task {$task_id} 进程正在工作... 计算 {$data[0]} - {$data[1]} ", PHP_EOL;
    $start = ++$data[0];
    $end = $data[1];
    $total = 0;
    for (; $start <= $end; $start++) {
        $total += $start;
    }
    echo "task {$task_id} 进程完成工作...", PHP_EOL;
    return $total;
});
 
$server->on(&#39;Finish&#39;, function ($server, $task_id, $data) {
    echo "task {$task_id} 进程处理完成, 结果为 {$data}", PHP_EOL;
    $server->result->incr(&#39;finish_nums&#39;, &#39;finish_nums&#39;);
    $server->result->set(&#39;result&#39;, [&#39;result&#39; => $data + $server->result->get(&#39;result&#39;, &#39;result&#39;)]);
 
    if ($server->result->get(&#39;finish_nums&#39;, &#39;finish_nums&#39;) == $server->setting[&#39;task_worker_num&#39;]) {
        echo "最终计算结果:{$server->result->get(&#39;result&#39;, &#39;result&#39;)}", PHP_EOL;
    }
});
 
$server->s
tart();

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer