ホームページ  >  記事  >  PHPフレームワーク  >  Swooleでのプロセスの導入

Swooleでのプロセスの導入

coldplay.xixi
coldplay.xixi転載
2021-03-01 10:13:232309ブラウズ

Swooleでのプロセスの導入

推奨 (無料): swoole

当初は Swft フレームワークで Process モジュールを開発する予定でした, なので、swooleのProcessモジュールをより深く理解する必要があります。ただし、swooleの公式wikiの実践プロセスによると、どうしても理解できない部分があります。マルチプロセスプログラミングをたくさんやったことがありますが、フレームワークを本当に開発する必要がある場合、以前に学んだ知識が全体的な設計を導くのに十分なほど包括的ではないことがわかります。幸いなことに、あなたは粘り強く努力し、現在の理解レベルを提示しました。

コンテンツの概要:

  • プロセスに関連する基本操作: fork/exit/kill/wait
  • プロセスに関連する高度な操作: メイン プロセスが終了し、子プロセスも終了します作業終了後に終了します。子プロセスが異常終了すると、メインプロセスは自動的に再起動します。
  • プロセス間通信 (IPC) - パイプ (pipe)
  • プロセス間通信 (IPC) - メッセージqueue (メッセージキュー)
  • swoole プロセスモジュールが提供するその他の機能

プロセスに関する基本操作

とはプロセス: プロセスはランナーのプログラムです。

最初に見てみましょう。最も単純な例を見てみましょう。

<?phpecho posix_getpid(); // 获取当前进程的 pidswoole_set_process_name(&#39;swoole process master&#39;); // 修改所在进程的进程名sleep(100); // 模拟一个持续运行 100s 的程序, 这样就可以在进程中查看到它, 而不是运行完了就结束

ps aux:# を通じてプロセスを確認します。

##プロセス名が設定されていません

##プロセス名が設定されています

swoole でサブプロセスを使用する基本操作を見てみましょう:

use Swoole\Process;

$process = new Process(function (Process $worker) {    if (Process::kill($worker->pid, 0)) { // kill操作常用来杀死进程, 传入 0 可以用来检测进程是否存在
        $worker->exit(); // 退出子进程
    }
});
$process->start(); // 启动子进程Process::wait(); // 回收退出的子进程

  • new Process()

    : コールバック関数を使用して、サブプロセスが実行するロジックを設定します

  • $process->start ()

    : fork() システム コールを呼び出して子プロセスを生成します

  • Process::kill()

    : kill 操作は、プロセスを強制終了するためによく使用されるシグナルをプロセスに送信します。0 を渡すと、プロセスが存在するかどうかを検出できます

  • #Process::wait( )
  • :

    wait() システム コールを呼び出して、子プロセスをリサイクルします。リサイクルしない場合、子プロセスは ゾンビ プロセス にプログラムされ、システム リソースを無駄にします

    #$worker->exit()
  • : 子プロセスがアクティブに終了しました
  • ここに 1 つあります質問:

メイン プロセスのライフ サイクルは何ですか?子プロセスのライフ サイクルは何ですか?

この質問は、私の以前の思考慣性からも来ています。モノのライフサイクルから理解してください

プロセスは実行中のプログラム
と組み合わせて理解します:

new Process(): プロセスではコールバック関数のロジックのみが実行されます

    他のコードはメイン プロセスで実行されます
  • プロセス関連の高度な操作

メインプロセスが終了し、子プロセスも作業終了後に終了します子プロセスが異常終了し、メインプロセスが自動的に再起動します

    <?phpuse Swoole\Process;class MyProcess1{    public $mpid = 0; // master pid, 即当前程序的进程ID
        public $works = []; // 记录子进程的 pid
        public $maxProcessNum = 1;    public $newIndex = 0;    public function __construct()
        {        try {
                swoole_set_process_name(__CLASS__. &#39; : master&#39;);            $this->mpid = posix_getpid();            $this->run();            $this->processWait();
            } catch (\Exception $e) {            die(&#39;Error: &#39;. $e->getMessage());
            }
        }    public function run()
        {        for ($i=0; $i<$this->maxProcessNum; $i++) {            $this->createProcess();
            }
        }    public function createProcess($index = null)
        {        if (is_null($index)) {
                $index = $this->newIndex;            $this->newIndex++;
            }
            $process = new Process(function (Process $worker) use($index) { // 子进程创建后需要执行的函数
                swoole_set_process_name(__CLASS__. ": worker $index");            for ($j=0; $j<3; $j++) { // 模拟子进程执行耗时任务
                    $this->checkMpid($worker);                echo "msg: {$j}\n";
                    sleep(1);
                }
            }, false, false); // 不重定向输入输出; 不使用管道
            $pid = $process->start();        $this->works[$index] = $pid;        return $pid;
        }    // 主进程异常退出, 子进程工作完后退出
        public function checkMpid(Process $worker) // demo中使用的引用, 引用表示传的参数可以被改变, 由于传入 $worker 是 \Swoole\Process 对象, 所以不用使用 &    {        if (!Process::kill($this->mpid, 0)) { // 0 可以用来检测进程是否存在
                $worker->exit();
                $msg = "master process exited, worker {$worker->pid} also quit\n"; // 需要写入到日志中
                file_put_contents(&#39;process.log&#39;, $msg, FILE_APPEND); // todo: 这句话没有执行
            }
        }    // 重启子进程
        public function rebootProcess($pid)
        {
            $index = array_search($pid, $this->works);        if ($index !== false) {
                $newPid = $this->createProcess($index);            echo "rebootProcess: {$index}={$pid}->{$newPid} Done\n";            return;
            }        throw new \Exception("rebootProcess error: no pid {$pid}");
        }    // 自动重启子进程
        public function processWait()
        {        while (1) {            if (count($this->works)) {
                    $ret = Process::wait(); // 子进程退出
                    if ($ret) {                    $this->rebootProcess($ret[&#39;pid&#39;]);
                    }
                } else {                break;
                }
            }
        }
    }new MyProcess1();
  • 次の点について説明します:
  • 子プロセスは実行後に終了します。
Process::wait()

を通じて、子プロセスの終了シグナルが検出され、自動再起動が実行される、子プロセスは実行を継続します

    関数パラメータの受け渡しについて
  • reference/pointer、理解する良い方法は次のとおりです: パラメータは変更可能です
  • ## 異常終了するメイン プロセスを実行およびシミュレートする:異常終了するメイン プロセスをシミュレートする
  • 出力

プロセス間通信 (IPC) - Pipe(パイプ)

パイプラインのいくつかのキーワード:

半二重: 一方向のデータ フロー、一方の端は読み取り専用、もう一方の端は書き込み可能のみ。

同期と非同期: デフォルトは同期ブロッキング モードです。

swoole_event_add()

を使用して swoole イベント ループにパイプを追加し、非同期 IO
  • Pipe を実装できます。タイプ (データ形式):
  • SOCK_STREAM
  • 、ストリーミング、ユーザーはデータのパケット化/アンパックを自分で処理する必要があります; SOCK_DGRAM、データグラム、それぞれの送受信は完全なデータ パケットです (
  • DGRAM/STREAM
  • ) 注意、swoole wiki - process->write() には、SOCK_DGRAM が順序を間違えてパケットを失わないことが記載されています
  • 最初に簡単な例を見てみましょう。シェルからの php パイプデータの読み取り元:
// get pip data$fp = fopen(&#39;php://stdin&#39;, &#39;r&#39;);if ($fp) {    while ($line = fgets($fp, 4096)) {        echo "php get pip data: ". $line;
    }
    fclose($fp);
}

シェル パイプからのデータの読み取りswoole プロセスのパイプは非常に強力で、

sub をサポートしています。 - プロセスの書き込み、メイン プロセスの読み取り

および

メイン プロセスの書き込み、子プロセスの読み取り

:

use Swoole\Process;// 子进程写, 父进程读$process = new Process(function (Process $worker) {
    $worker->write("worker");
});
$process->start();
$msg = $process->read();echo "from process: $msg", "\n";// 父进程写, 子进程读$process = new Process(function (Process $worker) {
    $msg = $worker->read();    echo "from master: $msg", "\n";
});
$process->start();
$process->write(&#39;master&#39;);

パイプを使用して複数回読み取りおよび書き込みを行います$worker->write()

$process->write()

の違いについて、私はこれら 2 つが同じものであると誤って考えていました。実際、# を間違えていました。子プロセスの ##$process

は、

$process- >write() と同等で、サブプロセスがパイプを書き込むことを意味します。実際、これはメインで実行されるロジックです。メインプロセスは、サブプロセスが読み取るためにデータをパイプに書き込みます。その他のパイプ関連操作 (swoole :非同期 IO

use Swoole\Process;use Swoole\Event;// 异步IO$process = new Process(function (Process $worker) {
    $GLOBALS[&#39;worker&#39;] = $worker;
    Event::add($worker->pipe, function (int $pipe) { // 使用 swoole_event_add 添加管道到异步IO
        /** @var Process $worker */
        $worker = $GLOBALS[&#39;worker&#39;];
        $msg = $worker->read();        echo "from master: $msg \n";
        $worker->write("hello master");
        sleep(2);
        $worker->exit(0);
    });
});
$process->start();
$process->write("master msg 1");
$msg = $process->read();echo "from process: $msg \n";
#) ##非同期 IO

タイムアウトの設定
  • use Swoole\Process;// 设置管道超时$process = new Process(function (Process $worker) {
        sleep(5);
    });
    $process->start();
    $process->setTimeout(0.5);
    $ret = $process->read();
    var_dump($ret);
    var_dump(swoole_errno());
  • パイプライン タイムアウト

插播一个趣事, @thinkpc 看完 2017北京PHP开发者年会, 就知道为啥会点赞了

  • 关闭管道
// 关闭管道: 默认值0->关闭读写 1->关闭写 2->关闭读$process->close();

进程间通信(IPC) - 消息队列(message queue)

消息队列:

  • 一系列保存在内核中的消息链表
  • 有一个 msgKey, 可以通过此访问不同的消息队列
  • 有数据大小限制, 默认 8192, 可以通过内核修改
  • 阻塞 vs 非阻塞: 阻塞模式下 pop()空消息队列/push()满消息队列会阻塞, 非阻塞模式可以直接返回

swoole 中使用消息队列:

  • 通信模式: 默认为争抢模式, 无法将消息投递给指定子进程
  • 新建消息队列后, 主进程就可以使用
  • 消息队列不可和管道一起使用, 也无法使用 swoole event loop
  • 主进程中要调用 wait(), 否则子进程中调用 pop()/push() 会报错
use Swoole\Process;
$process = new Process(function (Process $worker) {    // $worker->push(&#39;worker&#39;);
    echo "from master: ". $worker->pop(). "\n";
    sleep(2);    // $worker->exit();}, false, false); // 关闭管道// 参数一为 msgKey, 这里是默认值// 参数二为 通信模式, 默认值 2 表示争抢模式, 这里还加上了 非阻塞$process->useQueue(ftok(__FILE__, 1), 2| Process::IPC_NOWAIT);
$process->push(&#39;hello1&#39;); // 使用 useQueue 后, 主进程就可以读写消息队列了$process->push(&#39;hello2&#39;);echo "from woker: ". $process->pop(). "\n";// echo "from woker: ". $process->pop(). "\n";$process->start(); // 启动子进程// 消息队列状态var_dump($process->statQueue());// 删除队列, 如果不调用则不会在程序结束时清楚数据, 下次使用相同 msgKey 时还可以访问数据$process->freeQueue();
var_dump(Process::wait()); // 要调用 wait(), 否则子进程中 push()/pop() 会报错

消息队列

swoole process 模块提供的更多功能

  • swoole_set_process_name(): 修改进程名, 不兼容 mac

  • swoole_process->exec(string $execfile, array $args) 执行外部程序

参数 $execfile 需要使用可执行文件的绝对路径, 参数 args 为参数数组

// 比如 python test.py 123swoole_process->exec(&#39;/usr/bin/python&#39;, [&#39;test.py&#39;, 123]);// 更复杂的例子swoole_process->exec((&#39;/usr/local/bin/php&#39;, [&#39;/var/www/project/yii-best-practice/cli/yii&#39;, &#39;t/index&#39;, &#39;-m=123&#39;, &#39;abc&#39;, &#39;xyz&#39;]);// 父进程 exec 进程进行管道通信use Swoole\Process;
$process = new Process(function (Process $worker) {
    $worker->exec(&#39;/bin/echo&#39;, [&#39;hello&#39;]);
    $worker->write(&#39;hello&#39;);
}, true); // 需要启用标准输入输出重定向$process->start();echo "from exec: ". $process->read(). "\n";

父进程与exec进程通过管道通信

  • \Swoole\Process::kill($pid, $signo = SIGTERM): 向指定进程发送信号, 默认是终止进程, 传 0 可检测进程是否存在

  • \Swoole\Process::wait(): 回收子进程, 如果主进程不调用此方法, 子进程会变成 僵尸进程, 浪费系统资源

  • \Swoole\Process::signal(): 异步信号监听

use Swoole\Process;// 异步信号监听 + waitProcess::signal(SIGCHLD, function ($signal) { // 监听子进程退出信号
    // 可能同时有多个子进程退出, 所以要while循环
    while ($ret = Process::wait(false)) { // false 表示不阻塞
        var_dump($ret);
    }
});

\Swoole\Process::daemon(): 将当前进程变为一个守护进程

use Swoole\Process;// daemonProcess::daemon();
swoole_set_process_name(&#39;test daemon process&#39;);
sleep(100);

daemon-守护进程

  • \Swoole\Process::alarm(): 高精度定时器(微秒级), 对 setitimer 系统调用的封装, 可以配合 \Swoole\Process::signal() / pcntl_signal 使用

注意不可和 \Swoole\Timer 同时使用

// signal + alarm// 第一个参数表示时间, 单位 us, -1 表示清除定时器// 第二个参数表示类型 0->真实时间->SIGALAM 1->cpu时间->SIGVTALAM 2->用户态+内核态时间->SIGPROFProcess::alarm(100*1000); // 100msProcess::signal(SIGALRM, function ($signal) {    static $i = 0;    echo "#$i \t alarm \n";
    $i++;    if ($i>20) {
        Process::alarm(-1); // -1 表示清除
    }
});

alarm

  • \Swoole\Process::setaffinity(): 设置CPU亲和, 即将进程绑定到指定CPU核上

传值范围: [0, swoole_cpu_num())
CPU亲和: CPU的速度远远高于IO的速度, 所以CPU有多级缓存来解决IO等待的问题, 绑定指定CPU, 更容易命中CPU缓存

写在最后

资源推荐:

  • 图灵社区 - 理解UNIX进程 + 「理解Unix进程」读书笔记
  • blog - 「进程」编程

todo:

  • 使用输入输出重定向
  • 管道类型为 SOCK_STREAM 时的情况, 是否需要 封包/解包 处理, 即 swoole wiki - process->write() 中提到的 管道通信默认的方式是流式,write写入的数据在read可能会被底层合并
  • 多进程 + 异步IO 的注意事项

能理解 因为子进程会继承父进程的内存和IO句柄 这个会产生的影响, 但是给的示例并没有说明这个问题

use Swoole\Process;use Swoole\Event;// 多个子进程 + 异步IO$workers = [];
$workerNum = 3;for ($i=0; $i<$workerNum; $i++) {
    $process = new Process(function (Process $worker) {
        $worker->write($worker->pid);        echo "worker: {$worker->pid} \n";
    });
    $pid = $process->start();
    $workers[$pid] = $process;    // Event::add($process->pipe, function (int $pipe) use ($process) {
    //  $data = $process->read();
    //  echo "recv: $data \n";
    // });}foreach ($workers as $worker) {
    Event::add($worker->pipe, function (int $pipe) use ($worker) {
        $data = $worker->read();        echo "recv: $data \n";
    });
}

多进程异步IO

以上がSwooleでのプロセスの導入の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はcsdn.netで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。