Heim  >  Artikel  >  Backend-Entwicklung  >  Wie verwendet PHP die Befehlszeile, um die Aufgabenverarbeitung im asynchronen Multiprozessmodus zu implementieren (Code)

Wie verwendet PHP die Befehlszeile, um die Aufgabenverarbeitung im asynchronen Multiprozessmodus zu implementieren (Code)

不言
不言nach vorne
2019-01-23 10:20:282808Durchsuche

Der Inhalt dieses Artikels befasst sich mit der Verwendung der Befehlszeile zur Implementierung der Aufgabenverarbeitung (Code) im asynchronen Multiprozessmodus. Ich hoffe, dass er hilfreich ist Dir zu helfen.

Die Verwendung von PHP zur Implementierung asynchroner Aufgaben war schon immer ein Problem. Zu den bekannten asynchronen Frameworks von PHP gehören Swoole und Workerman, die jedoch nicht direkt in der Webumgebung verwendet werden können erzwungen Um eine Webumgebung aufzubauen, werden auch asynchrone Aufrufe im Multiprozessmodus implementiert. Aber manchmal ist es wirklich nicht nötig, den Dienst zu starten und den Server auf Client-Nachrichten warten zu lassen, ganz zu schweigen davon, dass der Servercode zwischendurch nicht geändert werden kann. In diesem Artikel wird erläutert, wie Sie Multiprozess- und asynchrone Aufrufe in der Webumgebung in der CLI-Umgebung implementieren, ohne ein Framework oder eine Bibliothek eines Drittanbieters zu verwenden.

Asynchrone Aufrufe in der Webumgebung

Es gibt zwei häufig verwendete Methoden

Verwenden Sie eine Socket-Verbindung

Diese Methode ist eine typische C/S-Architektur und erfordert Serverunterstützung.

// 1. 创建socket套接字
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
// 2. 进行socket连接
socket_connect($socket, '127.0.0.1', '3939');
//socket_set_nonblock($socket); // 以非阻塞模式运行,由于在客户端不实用,所以这里不考虑
// 3. 向服务端发送请求
socket_write($socket, $request, strlen($request));
// 4. 接受服务端的回应消息(忽略非阻塞的情况,如果服务端不是提供异步服务,那这一步可以省略)
$recv = socket_read($socket, 2048);
// 5. 关闭socket连接
socket_close($socket);

2. Verwenden Sie popen, um die Prozesspipeline zu öffnen

Diese Methode verwendet Betriebssystembefehle und wird direkt vom Betriebssystem ausgeführt.

Die in diesem Artikel besprochenen asynchronen Aufrufe verwenden diese Methode.

$sf = '/path/to/cli_async_task.php'; //要执行的脚本文件
$op = 'call'; //脚本文件接收的参数1
$data = base64_encode(serialize(['TestTask', 'arg1', 'arg2'])); //脚本文件接收的参数2
pclose(popen("php '$sf' --op $op --data $data &", 'r')); //打开之后接着就关闭进程管道,让该进程以守护模式运行
echo PHP_EOL.'异步任务已执行。'.PHP_EOL;

Der Vorteil dieser Methode besteht darin, dass sie in einem Schritt gelöst werden kann und der aktuelle Prozess keinen Overhead erfordert.
Der Nachteil liegt auch auf der Hand: Der Laufstatus des Aufgabenskripts kann nicht nachverfolgt werden.
Das Highlight wird also die Skriptdatei sein, die die Aufgabe ausführt. Im Folgenden wird die Implementierung der Aufgabenverarbeitung und der Mehrfachverarbeitung vorgestellt.

Multiprozess-Aufgabenverarbeitung in CLI Umgebung

Hinweis: Der Multiprozessmodus unterstützt nur Linux, nicht Windows! !

Jeder Schritt wird hier beginnend bei 0 vorgestellt (ohne Verwendung eines Frameworks oder einer Klassenbibliothek), Ein vollständiger Code wird am Ende angehängt.

1. Erstellen Sie ein Skript

  • Ein Aspekt, der in keinem Skript ignoriert werden kann, ist die Fehlerbehandlung. Beim Schreiben eines Aufgabenverarbeitungsskripts besteht der erste Schritt darin, eine Fehlerbehandlungsmethode zu schreiben.

In PHP rufen Sie einfach die drei Funktionen set_Exception_handler set_error_handler register_shutdown_function auf und schreiben dann eine benutzerdefinierte Verarbeitungsmethode.

  • Der nächste Schritt besteht darin, die Autoloading-Funktion spl_autoload_register zu definieren, um die Mühe zu vermeiden, jedes Mal eine neue Klasse zu erfordern/inzuschließen.

  • Definieren Sie die Protokolloperationsmethode.

  • Definieren Sie die Aufgabenverarbeitungsmethode.

  • Lesen Sie die Parameter aus der Befehlszeile und beginnen Sie mit der Ausführung der Aufgabe.

2. Multiprozessverarbeitung

PHP erstellt mehrere Prozesse mithilfe der Funktion pcntl_fork, die eine Kopie des aktuellen Prozesses aufspaltet (Schattenklontechnik). Es gibt zwei Prozesse: Der aktuelle Prozess ist der Hauptprozess (die Ontologie) und der gegabelte Prozess ist der untergeordnete Prozess (der Schattenklon). Es ist zu beachten, dass die Codeumgebung der beiden Prozesse dieselbe ist und beide Prozesse am Speicherort der Funktion pcntl_fork ausgeführt wurden. Der Unterschied besteht darin, dass die von getmypid erhaltene Prozessnummer unterschiedlich ist. Der wichtigste Unterschied besteht darin, dass beim Aufruf der Funktion pcntl_fork der vom untergeordneten Prozess erhaltene Rückgabewert 0 ist, während der Hauptprozess die Prozessnummer pid des untergeordneten Prozesses erhält .

Okay, nachdem wir wissen, wer der untergeordnete Prozess ist, können wir den untergeordneten Prozess Aufgaben ausführen lassen.

Woher kennt der Hauptprozess den Status des untergeordneten Prozesses?
Verwenden Sie pcntl_wait. Diese Funktion verfügt über zwei Parameter: $status und $options. $status ist ein Referenztyp, der zum Speichern des Status des untergeordneten Prozesses verwendet wird. $options verfügt über zwei optionale Konstanten, WNOHANG|WUNTRACED, was bedeutet, dass sofort zurückgegeben wird, ohne auf das Ende des untergeordneten Prozesses zu warten bzw. Warten auf den untergeordneten Prozess. Offensichtlich blockiert die Verwendung von WUNTRACED den Hauptprozess. (Sie können auch die Funktion pcntl_waitpid verwenden, um den spezifischen PID-Status des untergeordneten Prozesses abzurufen.)

In mehreren Prozessen muss der Hauptprozess den Status jedes untergeordneten Prozesses verwalten, andernfalls ist der untergeordnete Prozess wahrscheinlich nicht in der Lage zu sein, den Prozess zu beenden und zu einem Zombie zu werden.

Über die Nachrichtenkommunikation zwischen mehreren Prozessen
Dieser Bereich muss eine bestimmte Geschäftslogik beinhalten, daher kann ich ihn nur kurz erwähnen. Ohne die Nutzung von Drittanbieterdiensten wie redis in Betracht zu ziehen, kann PHP Methoden wie Pipeline-Kommunikation und Shared Memory nativ implementieren. Die Implementierung ist relativ einfach, der Nachteil besteht jedoch darin, dass die nutzbare Datenkapazität begrenzt ist und der Datenaustausch nur über einfache Textprotokolle möglich ist.

So beenden Sie alle Prozessaufgaben manuell

如果多进程处理不当,很可能导致进程任务卡死,甚至占用过多系统资源,此时只能手动结束进程。
除了一个个的根据进程号来结束,还有一个快速的方法是首先在任务脚本里自定义进程名称,就是调用cli_set_process_title函数,然后在命令行输入:ps aux|grep cli_async_worker |grep -v grep|awk '{print $2}'|xargs kill -9 (里面的 cli_async_worker 就是自定义的进程名称),这样就可以快速结束多进程任务了。

以下是完整的任务执行脚本代码:

可能无法直接使用,需要修改的地方有:

  1. 脚本目录和日志目录常量

  2. 自动加载任务类的方法(默认是加载脚本目录中以Task结尾的文件)

  3. 其他的如:错误和日志处理方式和文本格式就随意吧...

  4. 如果命名管道文件设置有错误,可能导致进程假死,你可能需要手动删除进程管道通信的代码。

  5. 多进程的例子:execAsyncTask('multi', [ 'test' => ['a', 'b', 'c'], 'grab' => [['url' => 'https://www.baidu.com', 'callback' => 'http://localhost']] ]);。执行情况可以在日志文件中查看。execAsyncTask函数参考【__使用popen打开进程管道__】。

465c7f3b575b14ef14154648576e2d16[%s] %s (%s)39528cedfa926ea0c01e69ef5b2ea9b0'. "\n". 'e03b848252eb9375d56be284e690e873%sbc5574f69a0cba105bc93bd3dc13c4ec',
        $time, $e->getMessage(), $e->getCode(), $e->getTraceAsString()
    );
    file_put_contents(TASK_LOGS_PATH .'/exception-'.date('Ymd').'.log', $msg.PHP_EOL, FILE_APPEND|LOCK_EX);
});
set_error_handler(function($errno, $errmsg, $filename, $line) {
    if (!(error_reporting() & $errno)) return;
    ob_start();
    debug_print_backtrace();
    $backtrace = ob_get_contents(); ob_end_clean();
    $datetime = date('Y-m-d H:i:s', time());
    $msg = 15b4e9f286bd197fe4b387be60410133 $header) {
            if (!is_numeric($_k)) 
                $header = sprintf('%s: %s', $_k, $header);
            $_headers .= $header . "\r\n";
        }
    }
    $headers = "Connection: close\r\n" . $_headers;
    $opts = array(
        'http' => array(
            'method' => strtoupper(@$job['method'] ?: 'get'),
            'content' => @$job['data'] ?: null,
            'header' => $headers,
            'user_agent' => @$job['args']['user_agent'] ?: 'HTTPGRAB/1.0 (compatible)',
            'proxy' => @$job['args']['proxy'] ?: null,
            'timeout' => intval(@$job['args']['timeout'] ?: 120),
            'protocol_version' => @$job['args']['protocol_version'] ?: '1.1',
            'max_redirects' => 3,
            'ignore_errors' => true
        )
    );
    $ret = @file_get_contents($url, false, stream_context_create($opts));
    //debug_log($url.' -->'.strlen($ret));
    if ($ret and isset($job['callback'])) {
        $postdata = http_build_query(array(
                'msg_id' => @$job['msg_id'] ?: 0,
                'url' => @$job['url'],
                'result' => $ret
            ));
        $opts = array(
            'http' => array(
                'method' => 'POST',
                'header' => 'Content-type:application/x-www-form-urlencoded'. "\r\n",
                'content' => $postdata,
                'timeout' => 30
            )
        );
        file_get_contents($job['callback'], false, stream_context_create($opts));
        //debug_log(json_encode(@$http_response_header));
        //debug_log($job['callback'].' -->'.$ret2);
    }
    
    return $ret;
}

function clean($tmpdirs, $expires=3600*24*7) {
    $ret = [];
    foreach ((array)$tmpdirs as $tmpdir) {
        $ret[$tmpdir] = 0;
        foreach (glob($tmpdir.DIRECTORY_SEPARATOR.'*') as $_file) {
            if (fileatime($_file) 46453edae09705589e2133e41bd51079open($file, \ZipArchive::CREATE)) {
        return false;
    }
    _backup_dir($zip, $dest);
    
    $zip->close();
    return $file;
}
function _backup_dir($zip, $dest, $sub='') {
    $dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR;
    $sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR;
    $dir = opendir($dest);
    if (!$dir) return false;
    while (false !== ($file = readdir($dir))) {
        if (is_file($dest . $file)) {
            $zip->addFile($dest . $file, $sub . $file);
        } else {
            if ($file != '.' and $file != '..' and is_dir($dest . $file)) {
                //$zip->addEmptyDir($sub . $file . DIRECTORY_SEPARATOR);
                _backup_dir($zip, $dest . $file, $file);
            }
        }
    }
    closedir($dir);
    return true;
}


function execute_task($op, $data) {
    debug_log('Start...');
    $t1 = microtime(true);
    switch($op) {
    case 'call': //执行任务脚本类
        $cmd = $data;
        if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd;
        elseif (is_array($cmd)) {
            if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0];
        }
        $ret = call($cmd);
        break;
    case 'grab': //抓取网页
        if (is_string($data)) $data = ['url' => $data];
        if (is_array($data)) $ret = grab($data);
        else throw new \Exception('无效的命令参数!');
        break;
    case 'clean': //清理缓存文件夹:dirs 需要清理的文件夹列表,expires 过期时间(秒,默认7天)
        if (isset($data['dirs'])) {
            $ret = clean($data['dirs'], @$data['expires']);
        } else {
            $ret = clean($data);
        }
        break;
    case 'backup': //备份文件:zip 备份到哪个zip文件,dest 需要备份的文件夹
        if (isset($data['zip']) and is_dir($data['dest']))
            $ret = backup($data['zip'], $data['dest']);
        else
            throw new \Exception('没有指定需要备份的文件!');
        break;
    case 'require': //加载脚本文件
        if (is_file($data)) $ret = require($data);
        else throw new \Exception('不是可请求的文件!');
        break;
    case 'test':
        sleep(rand(1, 5));
        $ret = ucfirst(strval($data)). '.PID:'. getmypid();
        break;
    case 'multi': //多进程处理模式
        $results = $childs = [];
        $fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . 'pipe.'. posix_getpid();
        if (!file_exists($fifo)) {
            if (!posix_mkfifo($fifo, 0666)) { //开启进程数据通信管道
                throw new Exception('make pipe failed!');
            }
        }
        //$shmid = shmop_open(ftok(__FILE__, 'h'), 'c', 0644, 4096); //共享内存
        //shmop_write($shmid, serialize([]), 0);
        //$data = unserialize(shmop_read($shmid, 0, 4096));
        //shmop_delete($shmid);
        //shmop_close($shmid);
        foreach($data as $_op => $_datas) {
            $_datas = (array)$_datas; //data 格式为数组表示一个 op 有多个执行数据
            foreach($_datas as $_data) {
                $pid = pcntl_fork();
                if ($pid == 0) { //子进程中执行任务
                    $_ret = execute_task($_op, $_data);
                    $_pid = getmypid();
                    $pipe = fopen($fifo, 'w'); //写
                    //stream_set_blocking($pipe, false);
                    $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => $_ret]);
                    if (strlen($_ret) > 4096) //写入管道的数据最大4K
                        $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => '[RESPONSE_TOO_LONG]']);
                    //debug_log('write pipe: '.$_ret);
                    fwrite($pipe, $_ret.PHP_EOL);
                    fflush($pipe);
                    fclose($pipe);
                    exit(0); //退出子进程
                } elseif ($pid > 0) { //主进程中记录任务
                    $childs[] = $pid;
                    $results[$pid] = 0;
                    debug_log('fork by child: '.$pid);
                    //pcntl_wait($status, WNOHANG);
                } elseif ($pid == -1) {
                    throw new Exception('could not fork at '. getmygid());
                }
            }
        }
        $pipe = fopen($fifo, 'r+'); //读
        stream_set_blocking($pipe, true); //阻塞模式,PID与读取的管道数据可能会不一致。
        $n = 0;
        while(count($childs) > 0) {
            foreach($childs as $i => $pid) {
                $res = pcntl_waitpid($pid, $status, WNOHANG);
                if (-1 == $res || $res > 0) {
                    $_ret = @unserialize(fgets($pipe)); //读取管道数据
                    $results[$pid] = $_ret;
                    unset($childs[$i]);
                    debug_log('read child: '.$pid . ' - ' . json_encode($_ret, 64|256));
                }
                if ($n > 1000) posix_kill($pid, SIGTERM); //超时(10分钟)结束子进程
            }
            usleep(200000); $n++;
        }
        debug_log('child process completed.');
        @fclose($pipe);
        @unlink($fifo);
        $ret = json_encode($results, 64|256);
        break;
    default:
        throw new \Exception('没有可执行的任务!');
        break;
    }
    $t2 = microtime(true);
    $times = round(($t2 - $t1) * 1000, 2);
    $log = sprintf('[%s] %s --> (%s) %sms', strtoupper($op), 
        @json_encode($data, 64|256), @strlen($ret)<65?$ret:@strlen($ret), $times);
    debug_log($log);
    return $ret;
}


// 读取 CLI 命令行参数
$params = getopt('', array('op:', 'data:'));
$op = $params['op'];
$data = unserialize(base64_decode($params['data']));
// 开始执行任务
execute_task($op, $data);



function __autoload($classname) {
    $parts = explode('\\', ltrim($classname, '\\'));
    if (false !== strpos(end($parts), '_')) {
        array_splice($parts, -1, 1, explode('_', current($parts)));
    }
    $filename = implode(DIRECTORY_SEPARATOR, $parts) . '.php';
    if ($filename = stream_resolve_include_path($filename)) {
        include $filename;
    } else if (preg_match('/.*Task$/', $classname)) { //查找以Task结尾的任务脚本类
        include TASK_PATH . DIRECTORY_SEPARATOR . $classname . '.php';
    } else {
        return false;
    }
}


Das obige ist der detaillierte Inhalt vonWie verwendet PHP die Befehlszeile, um die Aufgabenverarbeitung im asynchronen Multiprozessmodus zu implementieren (Code). Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:segmentfault.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen