search

Home  >  Q&A  >  body text

php - popen如何实现多进程并发执行,循环里的pclose会等待进程完毕再进行下一次循环

1.PHP popen如何实现多进程并发执行,循环里的pclose会等待进程完毕再进行下一次循环

2.假设有17个进程要开启,如何实现每次启动5个进程,并且每完成一个进程就关闭一个进程,同时开启下一个进程,也就是说最多只有5个进程同时执行

//启动2个进程
for($i = 0;$i < 2;$i++){
    $command = "$phpPath $destPHPFile >> $logFile$i";
    echo "进程开启时间".date('Y-m-d H:i:s')."\n";
    $resource = popen($command,'r');
    if(is_resource($resource)){
        $success++;
        pclose($resource);//下一次循环会等待上一个进程执行完毕,pclose才会释放资源
        echo date('Y-m-d H:i:s')." 进程:".$i."启动完毕,执行完毕并关闭,开启下一个进程\n";
    }else{
        $failure++;
    }
}

这样的做法相当于每次启动一个进程,循环执行,相当于单进程处理任务,如何做到多进程

ringa_leeringa_lee2802 days ago1568

reply all(4)I'll reply

  • 怪我咯

    怪我咯2017-04-10 17:24:18

    谢邀.

    如果你们有RabbitMQ的构架和经验, 实现这个很方便
    rabbitmq 消息的消费端 channel 可以设置 prefetch=5, 即最多同时处理5条消息
    rabbitmq 还有完善的ack机制, 即消息回执(该消息已正确处理完毕, 给我下条消息吧)
    而且, 不会堵塞你当前PHP的进程, 后台(消费worker)会以5个并发的情形, 慢慢处理完这些任务

    简单的可从Redis的List入手, 堵塞时判断队列的长度, 小于5时才开始popen并追加到队列

    test.php

    $redis = new \Redis();
    $redis->connect("127.0.0.1", 6379);
    
    $queue_name = "myqueue";
    $queue_prefetch = 5;
    $redis->delete($queue_name); // delete fro reseting.
    for($i=0; $i<17; $i++){
        while(1){
            $count = count($redis->lRange($queue_name, 0, -1));
            if($count <= $queue_prefetch) break;
            usleep(20); // 堵塞20ms, 视情况而定, 太低redis I/O 会比较频繁
        }
        $redis->rPush($queue_name, $i);
        $pid = pcntl_fork(); // 开启子进程, 需要pcntl模块
        if($pid){
            pclose(popen("/usr/bin/php /diandao/script.php $i $pid $queue_name", "w"));
            pcntl_wait($status);
        }
    }

    /diandao/script.php

    <?php
    sleep(1); // 模拟处理超过1秒
    
    $index = $argv[1];
    $pid = $argv[2];
    $queue = $argv[3];
    $redis = new Redis;
    $redis->connect("127.0.0.1", 6379);
    
    $f = fopen("/diandao/script.log", "a+");
    $date = date("Y-m-d H:i:s");
    fwrite($f, "[$date] ".json_encode($argv).PHP_EOL );
    fclose($f);
    $redis->lRem($queue, $index);

    /diandao/script.log 结果

    数量判断那多写了个=号, 所以并发变成了6个

    reply
    0
  • 迷茫

    迷茫2017-04-10 17:24:18

    看到个帖子说到有个扩展可以实现子进程:Thread

    大概描述了一下我的思路,这部分接触的不多,请多指教。

    <?php
    class Threads {
      protected $threads;
      protected $jobs;
      /**
       * Init Threads.
       *
       */
      public function __constructor($threadnum,$jobs)
      {
        $this->jobs = $jobs;
        for($i = 0;$i < $threadnum;$i++){
          $this->threads[$i] = new childThread();
        }
        $this->doJob();
      }
      /**
       * scan the thread.
       *
       * @return bool|@thread
       */
      public function scan()
      {
        if(count($this->jobs) <=0)
          return 'no jobs!';
        foreach($this->threads as $key => $obj)
        {
          if($obj->busy === false)
          {
            $idleThreads[] = $obj;
          }
        }
        if(count($idleThreads)>0)
        {
          $this->doJob($idleThreads);
          return false;
        }
        else
        { 
          return false;
        }
      }
      protected function doJob($idleThreads){
        foreach($idleThreads as $id => $thread){
          $thread->job = $this->getJob();
          $thread->start();
        }
      }
      protected function getJob(){
        $r = $this->jobs[0];
        unset($this->jobs[0]);
        return $r;
      }
      
    }
    class childThread extends Thread{
      public $job;
      public $busy=false;
      public function run(){
        //set busy=true
        $this->busy = true;
        //do job!
    
        //set busy=false
        $this->busy = false;
      }
    }
    //script:
    
    $pool = new Threads(5,array(1,2,3...17));
    while(1){
      if($pool->scan() == 'no jobs!'){
        exit();
      }else{
        sleep(1000);
      }
    }

    reply
    0
  • 高洛峰

    高洛峰2017-04-10 17:24:18

    谢邀,这个感觉是进程池的概念吧。

    reply
    0
  • 天蓬老师

    天蓬老师2017-04-10 17:24:18

    谢邀。

    popen是异步的,不会出现阻塞啊。理论上不会等一个线程结束才开始另一个线程的。

    reply
    0
  • Cancelreply