>백엔드 개발 >PHP 튜토리얼 >PHP는 감독자 프로세스 관리를 시뮬레이션합니다.

PHP는 감독자 프로세스 관리를 시뮬레이션합니다.

藏色散人
藏色散人앞으로
2021-01-08 13:53:434116검색
PHP 비디오 튜토리얼

"

머리말감독자 프로세스 관리 시뮬레이션 DEMO(쉬운 구현)

예, 바퀴를 재발명하고 있습니다! 배우는 것이 목적입니다!

스크린샷:


PHP는 감독자 프로세스 관리를 시뮬레이션합니다.

그림에서는 Copy 하위 프로세스 기능을 구현했습니다. AMQP에서 Consumer를 추가하거나 제거할 때 사용하면 매우 유용할 것 같습니다.

Copy子进程的功能。如果用在AMQP增减消费者时,我觉得应该会很有用。

实现

1、在主进程循环内启动子进程执行命令
2、在web输入 127.0.0.1:7865 获取子进程状态
3、socket接收请求消息,并且执行相应操作,返回web页面
4、回收子进程,防止称为僵尸进程

不足:无法持续监听错误页面。由于socket得到的响应是通过include函数加载的,所以在加载的页面内不能出现tail -f命令,否则stream就会掉入了死循环了~。我想应该有方案解决(写了socket+多进程模式,模仿fpm在接收到请求之后就启动一个子进程去处理的模式,但是执行有问题。因此将代码贴出来希望得到大家的指点)。
延伸:由于对进程可以很好的管理(期望如此),那么就可以定制化自己的一些需求,比如:(1)定制AMQP的消费者进程管理服务。(2)模拟crontab定时服务。

知识点

代码实现的过程中,有很多的细节是值得学习的。
1、在while()循环中,启用了stream的非阻塞模式。所以不能在循环中使用sleep(1),而是用stream_select($read, $write, $except, 1)让stream内部阻塞。
关于阻塞非阻塞模式,可以参阅这里
2、能够执行外部程序的函数很多,但是都稍有不同。这里采用的是proc_open,是一个很强大的函数。在这之前我曾用pcntl_exec执行过外部程序,但是需要先pcntl_fork。而用其他的如exec,shell_exec无法对子进程进行管理。
3、重启或停止等操作子进程时,只是先更改主进程中该子进程在内存中的的状态,并不是真正的对子进程操作。在统一处init()

Implementation1. 메인 프로세스 루프에서 명령을 실행하기 위해 서브 프로세스를 시작합니다

2. 웹에서 127.0.0.1:7865를 입력하여 서브 프로세스 상태를 가져옵니다
3. 메시지를 요청하고 해당 작업을 수행한 후 웹 페이지로 돌아갑니다.

4. 좀비 프로세스라고 불리는 것을 방지하기 위해 하위 프로세스를 재활용합니다.

부적절: 오류 페이지를 지속적으로 모니터링할 수 없습니다. 소켓의 응답은 include 함수를 통해 로드되므로 tail -f 명령은 로드된 페이지에 나타날 수 없습니다. 그렇지 않으면 스트림이 무한 루프에 빠지게 됩니다~ . 해결책이 있어야 할 것 같습니다.(fpm이 요청을 받은 후 처리하기 위해 하위 프로세스를 시작하는 모드를 모방하기 위해 소켓+멀티 프로세스 모드를 작성했지만 실행에 문제가 있습니다. 그래서 코드를 게시했습니다. 모든 사람의 조언을 얻기를 바라며).

확장: 프로세스를 잘 관리할 수 있으므로(예상) 다음과 같은 일부 요구 사항을 사용자 정의할 수 있습니다. (1) AMQP 소비자 프로세스 관리 서비스를 사용자 정의합니다. (2) crontab 타이밍 서비스를 시뮬레이션합니다.

지식 포인트

코드 구현 과정에서 배울 만한 세부 사항이 많이 있습니다.

1. while() 루프에서는 스트림의 비차단 모드가 활성화됩니다. 따라서 루프에서 sleep(1)을 사용할 수 없지만 내부적으로 스트림을 차단하려면 stream_select($read, $write, $read, 1)를 사용하세요.

차단 모드와 비차단 모드는 여기를 참고하세요

2. 외부 프로그램을 실행할 수 있는 함수는 많지만 조금씩 다릅니다. 여기서 사용되는 것은 매우 강력한 기능인 proc_open입니다. 이전에 pcntl_exec를 사용하여 외부 프로그램을 실행한 적이 있지만 먼저 pcntl_fork를 사용해야 합니다. 그러나 execshell_exec와 같은 다른 메서드는 하위 프로세스를 관리할 수 없습니다.

3. 자식 프로세스를 다시 시작하거나 중지하면 먼저 기본 프로세스의 메모리에 있는 자식 프로세스의 상태만 변경되며 실제로 자식 프로세스에서는 작동하지 않습니다. init()는 동일한 위치에서 하위 프로세스를 처리합니다. 이렇게 하면 하위 프로세스가 시작될 때 컨텍스트로 인해 발생하는 이상한 현상을 방지할 수 있습니다.



Code

코드가 너무 많아서 내 솔루션에 대한 더 나은 제안이 있으면 여기 github에서 볼 수 있습니다.

주요 프로세스 코드: Process.php

<?php require_once __DIR__ . &#39;/Consumer.php&#39;;require_once __DIR__ . &#39;/StreamConnection.php&#39;;require_once __DIR__ . &#39;/Http.php&#39;;class Process{
    /** 
     * 待启动的消费者数组
     */
    protected $consumers = array();
    protected $childPids = array();

    const PPID_FILE = __DIR__ . &#39;/process&#39;;
    protected $serializerConsumer;

    public function __construct()
    {
        $this->consumers = $this->getConsumers();
    }

    // 这里是个DEMO,实际可以用读取配置文件的方式。
    public function getConsumers()
    {
        $consumer = new Consumer([
            'program' => 'test',
            'command' => '/usr/bin/php test.php',
            'directory' => __DIR__,
            'logfile' => __DIR__ . '/test.log',
            'uniqid' => uniqid(),
            'auto_restart' => false,
        ]);
        return [
            $consumer->uniqid => $consumer,
        ];
    }

    public function run()
    {
        if (empty($this->consumers)) {
            // consumer empty
            return;
        }
        if ($this->_notifyMaster()) {
            // master alive
            return;
        }

        $pid = pcntl_fork();
        if ($pid  0) {
            exit;
        }
        if (!posix_setsid()) {
            exit;
        }

        $stream = new StreamConnection('tcp://0.0.0.0:7865');
        @cli_set_process_title('AMQP Master Process');
        // 将主进程ID写入文件
        file_put_contents(self::PPID_FILE, getmypid());
        // master进程继续
        while (true) {
            $this->init();
            pcntl_signal_dispatch();
            $this->waitpid();
            // 如果子进程被全部回收,则主进程退出
            // if (empty($this->childPids)) {
            //     $stream->close($stream->getSocket());
            //     break;
            // }
            $stream->accept(function ($uniqid, $action) {
                $this->handle($uniqid, $action);
                return $this->display();
            });
        }
    }

    protected function init()
    {
        foreach ($this->consumers as &$c) {
            switch ($c->state) {
                case Consumer::RUNNING:
                case Consumer::STOP:
                    break;
                case Consumer::NOMINAL:
                case Consumer::STARTING:
                    $this->fork($c);
                    break;
                case Consumer::STOPING:
                    if ($c->pid && posix_kill($c->pid, SIGTERM)) {
                        $this->reset($c, Consumer::STOP);
                    }
                    break;
                case Consumer::RESTART:
                    if (empty($c->pid)) {
                        $this->fork($c);
                        break;
                    }
                    if (posix_kill($c->pid, SIGTERM)) {
                        $this->reset($c, Consumer::STOP);
                        $this->fork($c);
                    }
                    break;
                default:
                    break;
            }
        }
    }

    protected function reset(Consumer $c, $state)
    {
        $c->pid = '';
        $c->uptime = '';
        $c->state = $state;
        $c->process = null;
    }

    protected function waitpid()
    {
        foreach ($this->childPids as $uniqid => $pid) {
            $result = pcntl_waitpid($pid, $status, WNOHANG);
            if ($result == $pid || $result == -1) {
                unset($this->childPids[$uniqid]);
                $c = &$this->consumers[$uniqid];
                $state = pcntl_wifexited($status) ? Consumer::EXITED : Consumer::STOP;
                $this->reset($c, $state);
            }
        }
    }


    /**
     * 父进程存活情况下,只会通知父进程信息,否则可能产生多个守护进程
     */
    private function _notifyMaster()
    {
        $ppid = file_get_contents(self::PPID_FILE );
        $isAlive = $this->checkProcessAlive($ppid);
        if (!$isAlive) return false;
        return true;
    }

    public function checkProcessAlive($pid)
    {
        if (empty($pid)) return false;
        $pidinfo = `ps co pid {$pid} | xargs`;
        $pidinfo = trim($pidinfo);
        $pattern = "/.*?PID.*?(\d+).*?/";
        preg_match($pattern, $pidinfo, $matches);
        return empty($matches) ? false : ($matches[1] == $pid ? true : false);
    }

    /**
     * fork一个新的子进程
     */
    protected function fork(Consumer $c)
    {
        $descriptorspec = [2 => ['file', $c->logfile, 'a'],];
        $process = proc_open('exec ' . $c->command, $descriptorspec, $pipes, $c->directory);
        if ($process) {
            $ret = proc_get_status($process);
            if ($ret['running']) {
                $c->state = Consumer::RUNNING;
                $c->pid = $ret['pid'];
                $c->process = $process;
                $c->uptime = date('m-d H:i');
                $this->childPids[$c->uniqid] = $ret['pid'];
            } else {
                $c->state = Consumer::EXITED;
                proc_close($process);
            }
        } else {
            $c->state = Consumer::ERROR;
        }
        return $c;
    }

    public function display()
    {
        $location = 'http://127.0.0.1:7865';
        $basePath = Http::$basePath;
        $scriptName = isset($_SERVER['SCRIPT_NAME']) &&
            !empty($_SERVER['SCRIPT_NAME']) &&
            $_SERVER['SCRIPT_NAME'] != '/' ? $_SERVER['SCRIPT_NAME'] : '/index.php';
        if ($scriptName == '/index.html') {
            return Http::status_301($location);
        }

        $sourcePath = $basePath . $scriptName;
        if (!is_file($sourcePath)) {
            return Http::status_404();
        }

        ob_start();
        include $sourcePath;
        $response = ob_get_contents();
        ob_clean();

        return Http::status_200($response);
    }

    public function handle($uniqid, $action)
    {
        if (!empty($uniqid) && !isset($this->consumers[$uniqid])) {
            return;
        }
        switch ($action) {
            case 'refresh':
                break;
            case 'restartall':
                $this->killall(true);
                break;
            case 'stopall':
                $this->killall();
                break;
            case 'stop':
                $c = &$this->consumers[$uniqid];
                if ($c->state != Consumer::RUNNING) break;
                $c->state = Consumer::STOPING;
                break;
            case 'start':
                $c = &$this->consumers[$uniqid];
                if ($c->state == Consumer::RUNNING) break;
                $c->state = Consumer::STARTING;
                break;
            case 'restart':
                $c = &$this->consumers[$uniqid];
                $c->state = Consumer::RESTART;
                break;
            case 'copy':
                $c = $this->consumers[$uniqid];
                $newC = clone $c;
                $newC->uniqid = uniqid('C');
                $newC->state = Consumer::NOMINAL;
                $newC->pid = '';
                $this->consumers[$newC->uniqid] = $newC;
                break;
            default:
                break;
        }
    }

    protected function killall($restart = false)
    {
        foreach ($this->consumers as &$c) {
            $c->state = $restart ? Consumer::RESTART : Consumer::STOPING;
        }
    }}$cli = new Process();$cli->run();
🎜Consumer Consumer 객체🎜
<?php require_once __DIR__ . &#39;/BaseObject.php&#39;;class Consumer extends BaseObject{
    /** 开启多少个消费者 */
    public $numprocs = 1;
    /** 当前配置的唯一标志 */
    public $program;
    /** 执行的命令 */
    public $command;
    /** 当前工作的目录 */
    public $directory;

    /** 通过 $qos $queueName $duplicate 生成的 $queue */
    public $queue;
    /** 程序执行日志记录 */
    public $logfile = &#39;&#39;;
    /** 消费进程的唯一ID */
    public $uniqid;
    /** 进程IDpid */
    public $pid;
    /** 进程状态 */
    public $state = self::NOMINAL;
    /** 自启动 */
    public $auto_restart = false;

    public $process;
    /** 启动时间 */
    public $uptime;

    const RUNNING = &#39;running&#39;;
    const STOP = &#39;stoped&#39;;
    const NOMINAL = &#39;nominal&#39;;
    const RESTART = &#39;restart&#39;;
    const STOPING = &#39;stoping&#39;;
    const STARTING = &#39;stating&#39;;
    const ERROR = &#39;error&#39;;
    const BLOCKED = &#39;blocked&#39;;
    const EXITED = &#39;exited&#39;;
    const FATEL = &#39;fatel&#39;;}
🎜스트림 관련 코드: StreamConnection.php🎜
<?php class StreamConnection{
    protected $socket;
    protected $timeout = 2; //s
    protected $client;

    public function __construct($host)
    {
        $this->socket = $this->connect($host);
    }

    public function connect($host)
    {
        $socket = stream_socket_server($host, $errno, $errstr);
        if (!$socket) {
            exit('stream error');
        }
        stream_set_timeout($socket, $this->timeout);
        stream_set_chunk_size($socket, 1024);
        stream_set_blocking($socket, false);
        $this->client = [$socket];
        return $socket;
    }

    public function accept(Closure $callback)
    {
        $read = $this->client;
        if (stream_select($read, $write, $except, 1) socket, $read)) {
            $cs = stream_socket_accept($this->socket);
            $this->client[] = $cs;
        }
        foreach ($read as $s) {
            if ($s == $this->socket) continue;
            $header = fread($s, 1024);
            if (empty($header)) {
                $index = array_search($s, $this->client);
                if ($index)
                    unset($this->client[$index]);
                $this->close($s);
                continue;
            }
            Http::parse_http($header);
            $uniqid = isset($_GET['uniqid']) ? $_GET['uniqid'] : '';
            $action = isset($_GET['action']) ? $_GET['action'] : '';
            $response = $callback($uniqid, $action);
            $this->write($s, $response);
            $index = array_search($s, $this->client);
            if ($index)
                unset($this->client[$index]);
            $this->close($s);
        }
    }

    public function write($socket, $response)
    {
        $ret = fwrite($socket, $response, strlen($response));
    }

    public function close($socket)
    {
        $flag = fclose($socket);
    }

    public function getSocket()
    {
        return $this->socket;
    }}
🎜Http 응답 코드: Http.php🎜
<?php class Http{

    public static $basePath = __DIR__ . &#39;/views&#39;;
    public static $max_age = 120; //秒

    /*
    *  函数:     parse_http
    *  描述:     解析http协议
    */
    public static function parse_http($http)
    {
        // 初始化
        $_POST = $_GET = $_COOKIE = $_REQUEST = $_SESSION = $_FILES =  array();
        $GLOBALS[&#39;HTTP_RAW_POST_DATA&#39;] = &#39;&#39;;
        // 需要设置的变量名
        $_SERVER = array(
            &#39;QUERY_STRING&#39; => '',
            'REQUEST_METHOD' => '',
            'REQUEST_URI' => '',
            'SERVER_PROTOCOL' => '',
            'SERVER_SOFTWARE' => '',
            'SERVER_NAME' => '',
            'HTTP_HOST' => '',
            'HTTP_USER_AGENT' => '',
            'HTTP_ACCEPT' => '',
            'HTTP_ACCEPT_LANGUAGE' => '',
            'HTTP_ACCEPT_ENCODING' => '',
            'HTTP_COOKIE' => '',
            'HTTP_CONNECTION' => '',
            'REMOTE_ADDR' => '',
            'REMOTE_PORT' => '0',
            'SCRIPT_NAME' => '',
            'HTTP_REFERER' => '',
            'CONTENT_TYPE' => '',
            'HTTP_IF_NONE_MATCH' => '',
        );

        // 将header分割成数组
        list($http_header, $http_body) = explode("\r\n\r\n", $http, 2);
        $header_data = explode("\r\n", $http_header);

        list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ', $header_data[0]);

        unset($header_data[0]);
        foreach ($header_data as $content) {
            // \r\n\r\n
            if (empty($content)) {
                continue;
            }
            list($key, $value) = explode(':', $content, 2);
            $key = strtolower($key);
            $value = trim($value);
            switch ($key) {
                case 'host':
                    $_SERVER['HTTP_HOST'] = $value;
                    $tmp = explode(':', $value);
                    $_SERVER['SERVER_NAME'] = $tmp[0];
                    if (isset($tmp[1])) {
                        $_SERVER['SERVER_PORT'] = $tmp[1];
                    }
                    break;
                case 'cookie':
                    $_SERVER['HTTP_COOKIE'] = $value;
                    parse_str(str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE);
                    break;
                case 'user-agent':
                    $_SERVER['HTTP_USER_AGENT'] = $value;
                    break;
                case 'accept':
                    $_SERVER['HTTP_ACCEPT'] = $value;
                    break;
                case 'accept-language':
                    $_SERVER['HTTP_ACCEPT_LANGUAGE'] = $value;
                    break;
                case 'accept-encoding':
                    $_SERVER['HTTP_ACCEPT_ENCODING'] = $value;
                    break;
                case 'connection':
                    $_SERVER['HTTP_CONNECTION'] = $value;
                    break;
                case 'referer':
                    $_SERVER['HTTP_REFERER'] = $value;
                    break;
                case 'if-modified-since':
                    $_SERVER['HTTP_IF_MODIFIED_SINCE'] = $value;
                    break;
                case 'if-none-match':
                    $_SERVER['HTTP_IF_NONE_MATCH'] = $value;
                    break;
                case 'content-type':
                    if (!preg_match('/boundary="?(\S+)"?/', $value, $match)) {
                        $_SERVER['CONTENT_TYPE'] = $value;
                    } else {
                        $_SERVER['CONTENT_TYPE'] = 'multipart/form-data';
                        $http_post_boundary = '--' . $match[1];
                    }
                    break;
            }
        }

        // script_name
        $_SERVER['SCRIPT_NAME'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_PATH);

        // QUERY_STRING
        $_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY);
        if ($_SERVER['QUERY_STRING']) {
            // $GET
            parse_str($_SERVER['QUERY_STRING'], $_GET);
        } else {
            $_SERVER['QUERY_STRING'] = '';
        }

        // REQUEST
        $_REQUEST = array_merge($_GET, $_POST);

        return array('get' => $_GET, 'post' => $_POST, 'cookie' => $_COOKIE, 'server' => $_SERVER, 'files' => $_FILES);
    }

    public static function status_404()
    {
        return 🎜실행할 스크립트: test.php🎜<pre class="brush:php;toolbar:false"><?php while(true) {
    file_put_contents(__DIR__  .  &#39;/test.log&#39;, date(&#39;Y-m-d H:i:s&#39;));
    sleep(1);}
🎜 현재 디렉토리의 페이지 보기: 🎜|- Process.php🎜|- Http.php🎜|- StreamConnection.php🎜|- Consumer.php🎜|- BaseObject.php🎜|- views/🎜🎜더 많은 프로그래밍 관련 지식을 얻으려면, 방문해주세요: 🎜프로그래밍 교육🎜! ! 🎜

위 내용은 PHP는 감독자 프로세스 관리를 시뮬레이션합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 learnku.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제