Maison  >  Article  >  cadre php  >  Vous emmène apprendre swoole_process

Vous emmène apprendre swoole_process

coldplay.xixi
coldplay.xixiavant
2021-02-25 09:51:531527parcourir

Vous emmène apprendre swoole_process

Recommandé (gratuit) : swoole

Il existe deux méthodes de communication entre les processus swoole, l'une est la file d'attente des messages ( file d'attente), l'autre est pipe (pipe), la recherche sur swoole_process est particulièrement importante en swoole.

Connaissances préliminaires

Multiplexage IO

Le multiplexage IO dans swoole est représenté par le modèle de processus epoll sous-jacent, qui est représenté par la fonction epoll en langage C.

Le modèle epoll continuera à surveiller le descripteur de socket fd sous son propre nom
Lorsqu'un événement surveillé par le socket est déclenché, la fonction epoll répondra et renverra une collection de toutes les sockets en écoute à ce moment-là
L'essence d'epoll est de bloquer les E/S, son avantage est qu'il peut gérer un grand nombre de connexions socket en même temps
Boucle d'événement de boucle d'événement

swoole implémente une encapsulation de modèle de thread Reactor d'epoll , et configure les événements de lecture et d'écriture. La fonction de rappel d'écoute. (Voir swoole_event_add pour plus de détails)

La boucle d'événement est un thread Reactor dans lequel une instance epoll est en cours d'exécution.
Ajoutez un événement du descripteur de socket à l'écouteur epoll via swoole_event_add. Lorsque l'événement se produit, la fonction de rappel sera exécutée
Elle ne peut pas être utilisée dans l'environnement fpm car fpm peut arrêter le processus à la fin. de la tâche.

swoole_process

Module de gestion de processus encapsulé en langage C, pratique à appeler par PHP
Pipeline intégré et interface de file d'attente de messages pour faciliter la communication inter-processus
Nous sommes chez php -fpm. Il a été trouvé dans le fichier de configuration de conf qu'il existe deux paramètres de gestion du pool de processus dans php-fpm.

Le mode statique consiste à initialiser un nombre fixe de processus. Lorsqu'une demande arrive, un processus est sélectionné pour la gérer.
Le mode dynamique spécifie le nombre minimum et maximum de processus. Lorsque le volume de requêtes est trop important et que le nombre de processus ne dépasse pas la limite maximale, un nouveau thread est ajouté pour traiter la requête

Ensuite, utilisez le code swoole pour l'implémenter. Ceci est juste pour comprendre swoole_process, la communication inter-processus, les minuteries, etc. Dans les situations réelles, il est plus pratique d'utiliser le swoole_server encapsulé pour implémenter le pool de files d'attente de tâches.

S'il y a une file d'attente de tâches pour une livraison planifiée :

<?php/**
 * 动态进程池,类似fpm
 * 动态新建进程
 * 有初始进程数,最小进程数,进程不够处理时候新建进程,不超过最大进程数
 */// 一个进程定时投递任务/**
 * 1. tick
 * 2. process及其管道通讯
 * 3. event loop 事件循环
 */class processPool{
  private $pool;

  /**
   * @var swoole_process[] 记录所有worker的process对象
   */
  private $workers = [];

  /**
   * @var array 记录worker工作状态
   */
  private $used_workers = [];

  /**
   * @var int 最小进程数
   */
  private $min_woker_num = 5;

  /**
   * @var int 初始进程数
   */
  private $start_worker_num = 10;

  /**
   * @var int 最大进程数
   */
  private $max_woker_num = 20;

  /**
   * 进程闲置销毁秒数
   * @var int
   */
  private $idle_seconds = 5;

  /**
   * @var int 当前进程数
   */
  private $curr_num;

  /**
   * 闲置进程时间戳
   * @var array
   */
  private $active_time = [];

  public function __construct()
  {
    $this->pool = new swoole_process(function () {
      // 循环建立worker进程
      for ($i = 0; $i < $this->start_worker_num; $i++) {
        $this->createWorker();
      }
      echo &#39;初始化进程数:&#39; . $this->curr_num . PHP_EOL;
      // 每秒定时往闲置的worker的管道中投递任务
      swoole_timer_tick(1000, function ($timer_id) {
        static $count = 0;
        $count++;
        $need_create = true;
        foreach ($this->used_workers as $pid => $used) {
          if ($used == 0) {
            $need_create = false;
            $this->workers[$pid]->write($count . &#39; job&#39;);
            // 标记使用中
            $this->used_workers[$pid] = 1;
            $this->active_time[$pid] = time();
            break;
          }
        }
        foreach ($this->used_workers as $pid => $used)
          // 如果所有worker队列都没有闲置的,则新建一个worker来处理
          if ($need_create && $this->curr_num < $this->max_woker_num) {
            $new_pid = $this->createWorker();
            $this->workers[$new_pid]->write($count . &#39; job&#39;);
            $this->used_workers[$new_pid] = 1;
            $this->active_time[$new_pid] = time();
          }

        // 闲置超过一段时间则销毁进程
        foreach ($this->active_time as $pid => $timestamp) {
          if ((time() - $timestamp) > $this->idle_seconds && $this->curr_num > $this->min_woker_num) {
            // 销毁该进程
            if (isset($this->workers[$pid]) && $this->workers[$pid] instanceof swoole_process) {
              $this->workers[$pid]->write(&#39;exit&#39;);
              unset($this->workers[$pid]);
              $this->curr_num = count($this->workers);
              unset($this->used_workers[$pid]);
              unset($this->active_time[$pid]);
              echo "{$pid} destroyed\n";
              break;
            }
          }
        }

        echo "任务{$count}/{$this->curr_num}\n";

        if ($count == 20) {
          foreach ($this->workers as $pid => $worker) {
            $worker->write(&#39;exit&#39;);
          }
          // 关闭定时器
          swoole_timer_clear($timer_id);
          // 退出进程池
          $this->pool->exit(0);
          exit();
        }
      });

    });

    $master_pid = $this->pool->start();
    echo "Master $master_pid start\n";

    while ($ret = swoole_process::wait()) {
      $pid = $ret[&#39;pid&#39;];
      echo "process {$pid} existed\n";
    }
  }

  /**
   * 创建一个新进程
   * @return int 新进程的pid
   */
  public function createWorker()
  {
    $worker_process = new swoole_process(function (swoole_process $worker) {
      // 给子进程管道绑定事件
      swoole_event_add($worker->pipe, function ($pipe) use ($worker) {
        $data = trim($worker->read());
        if ($data == &#39;exit&#39;) {
          $worker->exit(0);
          exit();
        }
        echo "{$worker->pid} 正在处理 {$data}\n";
        sleep(5);
        // 返回结果,表示空闲
        $worker->write("complete");
      });
    });

    $worker_pid = $worker_process->start();

    // 给父进程管道绑定事件
    swoole_event_add($worker_process->pipe, function ($pipe) use ($worker_process) {
      $data = trim($worker_process->read());
      if ($data == &#39;complete&#39;) {
        // 标记为空闲//        echo "{$worker_process->pid} 空闲了\n";
        $this->used_workers[$worker_process->pid] = 0;
      }
    });

    // 保存process对象
    $this->workers[$worker_pid] = $worker_process;
    // 标记为空闲
    $this->used_workers[$worker_pid] = 0;
    $this->active_time[$worker_pid] = time();
    $this->curr_num = count($this->workers);
    return $worker_pid;
  }}new processPool();

J'espère que le contenu ci-dessus aidera tout le monde

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer