推薦(免費):swoole
本來計畫開發swoft 框架中的Process 模組, 所以需要對swoole 的Process 模組要有比較深入的了解才行. 不過根據swoole 官方wiki 的實踐過程中, 一直有未理解的部分. 之前雖然也做過多次多進程編程, 但是當真正需要進行框架開發的時候, 就會發現以前學到的知識不夠全面, 無法指導整體的設計. 好在一直在堅持, 奉上現在理解的程度.
內容一覽:
進程相關基礎操作
進程是什麼: 進程是運行者的程式
先來看看一個最簡單的範例:
<?phpecho posix_getpid(); // 获取当前进程的 pidswoole_set_process_name('swoole process master'); // 修改所在进程的进程名sleep(100); // 模拟一个持续运行 100s 的程序, 这样就可以在进程中查看到它, 而不是运行完了就结束
透過ps aux
檢視進程:
use Swoole\Process; $process = new Process(function (Process $worker) { if (Process::kill($worker->pid, 0)) { // kill操作常用来杀死进程, 传入 0 可以用来检测进程是否存在 $worker->exit(); // 退出子进程 } }); $process->start(); // 启动子进程Process::wait(); // 回收退出的子进程
#new Process(): 透過回呼函數來設定子程序將要執行的邏輯
$process->start(): 呼叫
fork() 系統呼叫, 來產生子程序
#Process::kill(): kill操作向進程發送訊號, 常用來殺死進程, 傳入0 可以用來偵測進程是否存在
Process::wait(): 呼叫
wait() 系統呼叫, 回收子程序, 如果不回收, 子程序會編程
殭屍程序, 浪費系統資源
$worker->exit(): 子程序主動退出
有這樣一個疑問也來自於我之前的思考慣性:理解一個事物時從事物的生命週期進行理解
. 結合進程是運行著的程式 來一起理解:
除此之外的程式碼都是在主行程中執行主程序退出子程序幹完活後也退出
<?phpuse Swoole\Process;class MyProcess1{ public $mpid = 0; // master pid, 即当前程序的进程ID public $works = []; // 记录子进程的 pid public $maxProcessNum = 1; public $newIndex = 0; public function __construct() { try { swoole_set_process_name(__CLASS__. ' : master'); $this->mpid = posix_getpid(); $this->run(); $this->processWait(); } catch (\Exception $e) { die('Error: '. $e->getMessage()); } } public function run() { for ($i=0; $i<$this->maxProcessNum; $i++) { $this->createProcess(); } } public function createProcess($index = null) { if (is_null($index)) { $index = $this->newIndex; $this->newIndex++; } $process = new Process(function (Process $worker) use($index) { // 子进程创建后需要执行的函数 swoole_set_process_name(__CLASS__. ": worker $index"); for ($j=0; $j<3; $j++) { // 模拟子进程执行耗时任务 $this->checkMpid($worker); echo "msg: {$j}\n"; sleep(1); } }, false, false); // 不重定向输入输出; 不使用管道 $pid = $process->start(); $this->works[$index] = $pid; return $pid; } // 主进程异常退出, 子进程工作完后退出 public function checkMpid(Process $worker) // demo中使用的引用, 引用表示传的参数可以被改变, 由于传入 $worker 是 \Swoole\Process 对象, 所以不用使用 & { if (!Process::kill($this->mpid, 0)) { // 0 可以用来检测进程是否存在 $worker->exit(); $msg = "master process exited, worker {$worker->pid} also quit\n"; // 需要写入到日志中 file_put_contents('process.log', $msg, FILE_APPEND); // todo: 这句话没有执行 } } // 重启子进程 public function rebootProcess($pid) { $index = array_search($pid, $this->works); if ($index !== false) { $newPid = $this->createProcess($index); echo "rebootProcess: {$index}={$pid}->{$newPid} Done\n"; return; } throw new \Exception("rebootProcess error: no pid {$pid}"); } // 自动重启子进程 public function processWait() { while (1) { if (count($this->works)) { $ret = Process::wait(); // 子进程退出 if ($ret) { $this->rebootProcess($ret['pid']); } } else { break; } } } }new MyProcess1();
關於函數參數傳管道的幾個關鍵字:
半雙工: 資料單向流動, 一端只讀, 一端只寫.
管道類型(資料格式):
SOCK_STREAM, 資料封包, 每次收發都是一次完整的封包(
DGRAM/STREAM)
#注意, swoole wiki - process->write() 中提到先來看一個簡單的例子, php從shell管道中讀取資料:
// get pip data$fp = fopen('php://stdin', 'r');if ($fp) { while ($line = fgets($fp, 4096)) { echo "php get pip data: ". $line; } fclose($fp); }從shell管道讀取資料swoole process中的管道很強大, 支援 以及
主程序寫, 子程序讀:
use Swoole\Process;// 子进程写, 父进程读$process = new Process(function (Process $worker) { $worker->write("worker"); }); $process->start(); $msg = $process->read();echo "from process: $msg", "\n";// 父进程写, 子进程读$process = new Process(function (Process $worker) { $msg = $worker->read(); echo "from master: $msg", "\n"; }); $process->start(); $process->write('master');使用管道多次讀寫#注意區分 和
$process->write(), 之前一直錯誤的以為這2 個是相同的, 其實就是把
$process 誤以為是子行程, 從而相當於
$process- >write() 是子進程寫入管道-- 其實這裡是主進程內執行的邏輯, 是主進程寫資料到管道, 供子進程讀取
swoole中其他管道相關操作:
非同步IO
use Swoole\Process;use Swoole\Event;// 异步IO$process = new Process(function (Process $worker) { $GLOBALS['worker'] = $worker; Event::add($worker->pipe, function (int $pipe) { // 使用 swoole_event_add 添加管道到异步IO /** @var Process $worker */ $worker = $GLOBALS['worker']; $msg = $worker->read(); echo "from master: $msg \n"; $worker->write("hello master"); sleep(2); $worker->exit(0); }); }); $process->start(); $process->write("master msg 1"); $msg = $process->read();echo "from process: $msg \n";
use Swoole\Process;// 设置管道超时$process = new Process(function (Process $worker) { sleep(5); }); $process->start(); $process->setTimeout(0.5); $ret = $process->read(); var_dump($ret); var_dump(swoole_errno());
插播一个趣事, @thinkpc 看完 2017北京PHP开发者年会, 就知道为啥会点赞了
// 关闭管道: 默认值0->关闭读写 1->关闭写 2->关闭读$process->close();
进程间通信(IPC) - 消息队列(message queue)
消息队列:
pop()
空消息队列/push()
满消息队列会阻塞, 非阻塞模式可以直接返回swoole 中使用消息队列:
wait()
, 否则子进程中调用 pop()/push()
会报错use Swoole\Process; $process = new Process(function (Process $worker) { // $worker->push('worker'); echo "from master: ". $worker->pop(). "\n"; sleep(2); // $worker->exit();}, false, false); // 关闭管道// 参数一为 msgKey, 这里是默认值// 参数二为 通信模式, 默认值 2 表示争抢模式, 这里还加上了 非阻塞$process->useQueue(ftok(__FILE__, 1), 2| Process::IPC_NOWAIT); $process->push('hello1'); // 使用 useQueue 后, 主进程就可以读写消息队列了$process->push('hello2');echo "from woker: ". $process->pop(). "\n";// echo "from woker: ". $process->pop(). "\n";$process->start(); // 启动子进程// 消息队列状态var_dump($process->statQueue());// 删除队列, 如果不调用则不会在程序结束时清楚数据, 下次使用相同 msgKey 时还可以访问数据$process->freeQueue(); var_dump(Process::wait()); // 要调用 wait(), 否则子进程中 push()/pop() 会报错
swoole process 模块提供的更多功能
swoole_set_process_name()
: 修改进程名, 不兼容 mac
swoole_process->exec(string $execfile, array $args)
执行外部程序
参数 $execfile
需要使用可执行文件的绝对路径, 参数 args
为参数数组
// 比如 python test.py 123swoole_process->exec('/usr/bin/python', ['test.py', 123]);// 更复杂的例子swoole_process->exec(('/usr/local/bin/php', ['/var/www/project/yii-best-practice/cli/yii', 't/index', '-m=123', 'abc', 'xyz']);// 父进程 exec 进程进行管道通信use Swoole\Process; $process = new Process(function (Process $worker) { $worker->exec('/bin/echo', ['hello']); $worker->write('hello'); }, true); // 需要启用标准输入输出重定向$process->start();echo "from exec: ". $process->read(). "\n";
\Swoole\Process::kill($pid, $signo = SIGTERM)
: 向指定进程发送信号, 默认是终止进程, 传 0 可检测进程是否存在
\Swoole\Process::wait()
: 回收子进程, 如果主进程不调用此方法, 子进程会变成 僵尸进程, 浪费系统资源
\Swoole\Process::signal()
: 异步信号监听
use Swoole\Process;// 异步信号监听 + waitProcess::signal(SIGCHLD, function ($signal) { // 监听子进程退出信号 // 可能同时有多个子进程退出, 所以要while循环 while ($ret = Process::wait(false)) { // false 表示不阻塞 var_dump($ret); } });
\Swoole\Process::daemon()
: 将当前进程变为一个守护进程
use Swoole\Process;// daemonProcess::daemon(); swoole_set_process_name('test daemon process'); sleep(100);
\Swoole\Process::alarm()
: 高精度定时器(微秒级), 对 setitimer
系统调用的封装, 可以配合 \Swoole\Process::signal()
/ pcntl_signal
使用注意不可和 \Swoole\Timer
同时使用
// signal + alarm// 第一个参数表示时间, 单位 us, -1 表示清除定时器// 第二个参数表示类型 0->真实时间->SIGALAM 1->cpu时间->SIGVTALAM 2->用户态+内核态时间->SIGPROFProcess::alarm(100*1000); // 100msProcess::signal(SIGALRM, function ($signal) { static $i = 0; echo "#$i \t alarm \n"; $i++; if ($i>20) { Process::alarm(-1); // -1 表示清除 } });
\Swoole\Process::setaffinity()
: 设置CPU亲和, 即将进程绑定到指定CPU核上传值范围: [0, swoole_cpu_num())
CPU亲和: CPU的速度远远高于IO的速度, 所以CPU有多级缓存来解决IO等待的问题, 绑定指定CPU, 更容易命中CPU缓存
写在最后
资源推荐:
todo:
SOCK_STREAM
时的情况, 是否需要 封包/解包 处理, 即 swoole wiki - process->write() 中提到的 管道通信默认的方式是流式,write写入的数据在read可能会被底层合并
能理解 因为子进程会继承父进程的内存和IO句柄 这个会产生的影响, 但是给的示例并没有说明这个问题
use Swoole\Process;use Swoole\Event;// 多个子进程 + 异步IO$workers = []; $workerNum = 3;for ($i=0; $i<$workerNum; $i++) { $process = new Process(function (Process $worker) { $worker->write($worker->pid); echo "worker: {$worker->pid} \n"; }); $pid = $process->start(); $workers[$pid] = $process; // Event::add($process->pipe, function (int $pipe) use ($process) { // $data = $process->read(); // echo "recv: $data \n"; // });}foreach ($workers as $worker) { Event::add($worker->pipe, function (int $pipe) use ($worker) { $data = $worker->read(); echo "recv: $data \n"; }); }
以上是介紹Swoole 中 Process的詳細內容。更多資訊請關注PHP中文網其他相關文章!