Maison > Questions et réponses > le corps du texte
1.PHP popen如何实现多进程并发执行,循环里的pclose会等待进程完毕再进行下一次循环
2.假设有17个进程要开启,如何实现每次启动5个进程,并且每完成一个进程就关闭一个进程,同时开启下一个进程,也就是说最多只有5个进程同时执行
//启动2个进程
for($i = 0;$i < 2;$i++){
$command = "$phpPath $destPHPFile >> $logFile$i";
echo "进程开启时间".date('Y-m-d H:i:s')."\n";
$resource = popen($command,'r');
if(is_resource($resource)){
$success++;
pclose($resource);//下一次循环会等待上一个进程执行完毕,pclose才会释放资源
echo date('Y-m-d H:i:s')." 进程:".$i."启动完毕,执行完毕并关闭,开启下一个进程\n";
}else{
$failure++;
}
}
这样的做法相当于每次启动一个进程,循环执行,相当于单进程处理任务,如何做到多进程
怪我咯2017-04-10 17:24:18
谢邀.
如果你们有RabbitMQ的构架和经验, 实现这个很方便
rabbitmq 消息的消费端 channel 可以设置 prefetch=5, 即最多同时处理5条消息
rabbitmq 还有完善的ack机制, 即消息回执(该消息已正确处理完毕, 给我下条消息吧)
而且, 不会堵塞你当前PHP的进程, 后台(消费worker)会以5个并发的情形, 慢慢处理完这些任务
简单的可从Redis的List入手, 堵塞时判断队列的长度, 小于5时才开始popen并追加到队列
test.php
$redis = new \Redis();
$redis->connect("127.0.0.1", 6379);
$queue_name = "myqueue";
$queue_prefetch = 5;
$redis->delete($queue_name); // delete fro reseting.
for($i=0; $i<17; $i++){
while(1){
$count = count($redis->lRange($queue_name, 0, -1));
if($count <= $queue_prefetch) break;
usleep(20); // 堵塞20ms, 视情况而定, 太低redis I/O 会比较频繁
}
$redis->rPush($queue_name, $i);
$pid = pcntl_fork(); // 开启子进程, 需要pcntl模块
if($pid){
pclose(popen("/usr/bin/php /diandao/script.php $i $pid $queue_name", "w"));
pcntl_wait($status);
}
}
/diandao/script.php
<?php
sleep(1); // 模拟处理超过1秒
$index = $argv[1];
$pid = $argv[2];
$queue = $argv[3];
$redis = new Redis;
$redis->connect("127.0.0.1", 6379);
$f = fopen("/diandao/script.log", "a+");
$date = date("Y-m-d H:i:s");
fwrite($f, "[$date] ".json_encode($argv).PHP_EOL );
fclose($f);
$redis->lRem($queue, $index);
/diandao/script.log 结果
数量判断那多写了个=号, 所以并发变成了6个
迷茫2017-04-10 17:24:18
看到个帖子说到有个扩展可以实现子进程:Thread
大概描述了一下我的思路,这部分接触的不多,请多指教。
<?php
class Threads {
protected $threads;
protected $jobs;
/**
* Init Threads.
*
*/
public function __constructor($threadnum,$jobs)
{
$this->jobs = $jobs;
for($i = 0;$i < $threadnum;$i++){
$this->threads[$i] = new childThread();
}
$this->doJob();
}
/**
* scan the thread.
*
* @return bool|@thread
*/
public function scan()
{
if(count($this->jobs) <=0)
return 'no jobs!';
foreach($this->threads as $key => $obj)
{
if($obj->busy === false)
{
$idleThreads[] = $obj;
}
}
if(count($idleThreads)>0)
{
$this->doJob($idleThreads);
return false;
}
else
{
return false;
}
}
protected function doJob($idleThreads){
foreach($idleThreads as $id => $thread){
$thread->job = $this->getJob();
$thread->start();
}
}
protected function getJob(){
$r = $this->jobs[0];
unset($this->jobs[0]);
return $r;
}
}
class childThread extends Thread{
public $job;
public $busy=false;
public function run(){
//set busy=true
$this->busy = true;
//do job!
//set busy=false
$this->busy = false;
}
}
//script:
$pool = new Threads(5,array(1,2,3...17));
while(1){
if($pool->scan() == 'no jobs!'){
exit();
}else{
sleep(1000);
}
}