这几天,由于工作原因,需要将一个php写的后台程序重串行执行改成并行执行。这样必然涉及到多线程/多进程相关技术。PHP貌似没有成熟的多线程机制,而有相对成熟的多进程机制。多进程编程,必然涉及到IPC(Inter-Process Communication)。PHP的IPC技术还是相当多的,可以在php源代码目录下(PHP_SRC/ext)看到sysvshm,sysvsem,sysvmsg,shmop等扩展,这些都可以用于IPC,下面简单介绍一下:

pcntl - 提供进程相关操作函数 sysvsem - 信号量,全称为system V semphore,可以用于限制访问共享资源的进程数目,一般设置为1,当作互斥锁使用 sysvshm - 共享内存,全称system V shared memory,用于提供共享内存机制,特点是省去了客户代码序列化和反序列化的麻烦,变量级别的访问控制,十分好用,属于菜鸟使用的方式。 shmop - 共享内存操作扩展,与sysvshm的区别在于是比特级别的控制,需要手动序列化和反序列化,但是如果紧缺使用,内存使用率比sysvshm高,属于高手进阶方式。 sysvmsg - 消息队列,一种特殊的共享内存,具体使用可以参见这里。



shm_attach和sem_get的方法的key需要与主进程ID关联,避免两个组件同时运行打搅,因为共享内存和信号量是系统级别的资源,进程之间会公用,如果不处理,会导致多个实例之间出现混乱。 父进程在等待子进程时,忙等会消耗大量CPU,采用sleep的方式,可以避免忙等。 父进程等待子进程最好有超时机制,避免无限制等待 子进程逻辑必须在规定的地方退出,不能执行父进程的代码,否则会混乱。常见的方式是用try catch包裹子进程的逻辑,抓到异常后,退出。



<?phpdefine('PROC_NUM', 12);define('SRC_NUM', 20);define('TIME_OUT', 60); // 父进程等待超时,单位:秒$arr = array();for ($i = 0; $i < SRC_NUM; ++$i) {    $arr[] = $i;}$sArr = serialize($arr); echo "parent id: " . posix_getpid() . "\n"; // create share memory$nShmID = shm_attach(ftok(__FILE__, 'i') . + posix_getpid(), strlen($sArr) * 2); // 采用主进程id作为key的部分,避免主进程之间干扰if ($nShmID == FALSE) {    die("Failed to create shm\n");} // write the array to the shared memory$nArrKey = 1;if (FALSE == shm_put_var($nShmID, $nArrKey, $arr)) {    die('Failed to write array to shm');} // create semphore$nSemID = sem_get(posix_getpid(), 1); // 采用主进程id作为key,避免主进程之间干扰 // child process consume the data in the shmfor($i = 0; $i < PROC_NUM; ++$i) {    $nPID = pcntl_fork();         if ($nPID == 0) {        // child        $arrProcess = array();        while (true) {            sem_acquire($nSemID);                         // get the value            $arrCur = shm_get_var($nShmID, $nArrKey);                         if (0 == count($arrCur) || $arrCur == FALSE) {                // value out                sem_release($nSemID);                break;            }            $nVal = array_pop($arrCur);            if (FALSE == shm_put_var($nShmID, $nArrKey, $arrCur)) {                die('Failed to write array to shm');            }            sem_release($nSemID);                         // simulate process value            sleep(rand(1,3));            $arrProcess[] = $nVal;        }                 echo "Child process " . posix_getpid() . " consume: ";        echo implode(' ',$arrProcess) . "\n";        exit(0); // 子进程逻辑一定要在这里退出,不能执行父进程代码    }} // wait for childrenecho "wait children\n";$n = 0;$nStart = time();while ($n < PROC_NUM && (time() - $nStart < TIME_OUT)) { // 父进程等待有超时机制,避免无限等待    //echo "wait ... \n";    $nStatus = -1;    $nPID = pcntl_wait($nStatus, WNOHANG);    if ($nPID > 0) {        echo "{$nPID} exit\n";        ++$n;    }    sleep(1); // 避免忙等} // clear shm//shm_detach($nShmID);sem_remove($nSemID);shm_remove($nShmID);echo "finished\n";?>


