Home  >  Article  >  Backend Development  >  How does PHP use the command line to implement task processing in asynchronous multi-process mode (code)

How does PHP use the command line to implement task processing in asynchronous multi-process mode (code)

不言
不言forward
2019-01-23 10:20:282850browse

The content of this article is about how PHP uses the command line to implement task processing (code) in asynchronous multi-process mode. It has certain reference value. Friends in need can refer to it. I hope it will be helpful to you. help.

Using PHP to implement asynchronous tasks has always been a problem. Among the existing solutions: PHP’s well-known asynchronous frameworks include swoole and Workerman, but they cannot be used directly in the web environment, even if forced To build a web environment, asynchronous calls are also implemented using multi-process mode. But sometimes there is really no need to start the service and let the server wait for client messages, not to mention that the server code cannot be changed in the middle. This article will introduce how to implement multi-process and asynchronous calls in the web environment in the CLI environment without using any framework or third-party library.

Asynchronous calls in the web environment

There are two commonly used methods

1. Use socket connection

This method is a typical C/S architecture and requires server support.

// 1. 创建socket套接字
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
// 2. 进行socket连接
socket_connect($socket, '127.0.0.1', '3939');
//socket_set_nonblock($socket); // 以非阻塞模式运行,由于在客户端不实用,所以这里不考虑
// 3. 向服务端发送请求
socket_write($socket, $request, strlen($request));
// 4. 接受服务端的回应消息(忽略非阻塞的情况,如果服务端不是提供异步服务,那这一步可以省略)
$recv = socket_read($socket, 2048);
// 5. 关闭socket连接
socket_close($socket);

2. Use popen to open the process pipe

This method uses operating system commands and is directly executed by the operating system.

The asynchronous calls discussed in this article use this method.

$sf = '/path/to/cli_async_task.php'; //要执行的脚本文件
$op = 'call'; //脚本文件接收的参数1
$data = base64_encode(serialize(['TestTask', 'arg1', 'arg2'])); //脚本文件接收的参数2
pclose(popen("php '$sf' --op $op --data $data &", 'r')); //打开之后接着就关闭进程管道,让该进程以守护模式运行
echo PHP_EOL.'异步任务已执行。'.PHP_EOL;

The advantage of this method is that it can be solved in one step and the current process does not require any overhead.
The disadvantage is also obvious: the running status of the task script cannot be tracked.
So the highlight will be the script file that executes the task. The following will introduce the implementation of task processing and multi-processing.

Multi-process task processing in the CLI environment

Note: Multi-process mode only supports Linux, not Windows! !

Here we will introduce each step from 0 (without using any framework or class library), At the end there will be a complete code .

1. Create a script

  • The aspect that cannot be ignored in any script is error handling. So when writing a task processing script, the first step is to write an error handling method.

In PHP, we call the three functions set_exception_handler set_error_handler register_shutdown_function, and then write a custom processing method.

  • The next step is to define the automatic loading function spl_autoload_register to avoid the trouble of require / include every time a new class is used.

  • Define log operation method.

  • Define task processing method.

  • Read the parameters from the command line and start executing the task.

2. Multi-process processing

PHP creates multiple processes by using the pcntl_fork function, which will fork a copy of the current process (shadow clone technique), so there is There are two processes, the current process is the main process (the ontology), and the forked process is the child process (the shadow clone). It should be noted that the code environment of the two processes is the same, and both processes have executed to the pcntl_fork function location. The difference is that the process number obtained by getmypid is different. The most important difference is that when the pcntl_fork function is called, the return value obtained by the child process is 0, while the main process obtains the process number pid of the child process.

Okay, when we know who the child process is, we can let the child process perform tasks.

So how does the main process know the status of the child process?
Use pcntl_wait. This function has two parameters $status and $options. $status is a reference type, used to store the status of the child process. $options has two optional constants WNOHANG|WUNTRACED, which means returning immediately without waiting for the child process to end and waiting for the child process respectively. The process ends. Obviously using WUNTRACED will block the main process. (You can also use the pcntl_waitpid function to obtain the specific pid child process status)

In multiple processes, what the main process has to do is to manage the status of each child process, otherwise the child process is likely to be unable to exit and become a zombie process.

About message communication between multiple processes
This section needs to involve specific business logic, so I can only briefly mention it. Without considering the use of third-party services such as redis, PHP can natively implement pipeline communication and shared memory. The implementation is relatively simple, but the disadvantage is that the usable data capacity is limited, and data can only be exchanged using simple text protocols.

How to manually end all process tasks

如果多进程处理不当,很可能导致进程任务卡死,甚至占用过多系统资源,此时只能手动结束进程。
除了一个个的根据进程号来结束,还有一个快速的方法是首先在任务脚本里自定义进程名称,就是调用cli_set_process_title函数,然后在命令行输入:ps aux|grep cli_async_worker |grep -v grep|awk '{print $2}'|xargs kill -9 (里面的 cli_async_worker 就是自定义的进程名称),这样就可以快速结束多进程任务了。

以下是完整的任务执行脚本代码:

可能无法直接使用,需要修改的地方有:

  1. 脚本目录和日志目录常量

  2. 自动加载任务类的方法(默认是加载脚本目录中以Task结尾的文件)

  3. 其他的如:错误和日志处理方式和文本格式就随意吧...

  4. 如果命名管道文件设置有错误,可能导致进程假死,你可能需要手动删除进程管道通信的代码。

  5. 多进程的例子:execAsyncTask('multi', [ 'test' => ['a', 'b', 'c'], 'grab' => [['url' => 'https://www.baidu.com', 'callback' => 'http://localhost']] ]);。执行情况可以在日志文件中查看。execAsyncTask函数参考【__使用popen打开进程管道__】。

465c7f3b575b14ef14154648576e2d16[%s] %s (%s)39528cedfa926ea0c01e69ef5b2ea9b0'. "\n". 'e03b848252eb9375d56be284e690e873%sbc5574f69a0cba105bc93bd3dc13c4ec',
        $time, $e->getMessage(), $e->getCode(), $e->getTraceAsString()
    );
    file_put_contents(TASK_LOGS_PATH .'/exception-'.date('Ymd').'.log', $msg.PHP_EOL, FILE_APPEND|LOCK_EX);
});
set_error_handler(function($errno, $errmsg, $filename, $line) {
    if (!(error_reporting() & $errno)) return;
    ob_start();
    debug_print_backtrace();
    $backtrace = ob_get_contents(); ob_end_clean();
    $datetime = date('Y-m-d H:i:s', time());
    $msg = 15b4e9f286bd197fe4b387be60410133 $header) {
            if (!is_numeric($_k)) 
                $header = sprintf('%s: %s', $_k, $header);
            $_headers .= $header . "\r\n";
        }
    }
    $headers = "Connection: close\r\n" . $_headers;
    $opts = array(
        'http' => array(
            'method' => strtoupper(@$job['method'] ?: 'get'),
            'content' => @$job['data'] ?: null,
            'header' => $headers,
            'user_agent' => @$job['args']['user_agent'] ?: 'HTTPGRAB/1.0 (compatible)',
            'proxy' => @$job['args']['proxy'] ?: null,
            'timeout' => intval(@$job['args']['timeout'] ?: 120),
            'protocol_version' => @$job['args']['protocol_version'] ?: '1.1',
            'max_redirects' => 3,
            'ignore_errors' => true
        )
    );
    $ret = @file_get_contents($url, false, stream_context_create($opts));
    //debug_log($url.' -->'.strlen($ret));
    if ($ret and isset($job['callback'])) {
        $postdata = http_build_query(array(
                'msg_id' => @$job['msg_id'] ?: 0,
                'url' => @$job['url'],
                'result' => $ret
            ));
        $opts = array(
            'http' => array(
                'method' => 'POST',
                'header' => 'Content-type:application/x-www-form-urlencoded'. "\r\n",
                'content' => $postdata,
                'timeout' => 30
            )
        );
        file_get_contents($job['callback'], false, stream_context_create($opts));
        //debug_log(json_encode(@$http_response_header));
        //debug_log($job['callback'].' -->'.$ret2);
    }
    
    return $ret;
}

function clean($tmpdirs, $expires=3600*24*7) {
    $ret = [];
    foreach ((array)$tmpdirs as $tmpdir) {
        $ret[$tmpdir] = 0;
        foreach (glob($tmpdir.DIRECTORY_SEPARATOR.'*') as $_file) {
            if (fileatime($_file) 46453edae09705589e2133e41bd51079open($file, \ZipArchive::CREATE)) {
        return false;
    }
    _backup_dir($zip, $dest);
    
    $zip->close();
    return $file;
}
function _backup_dir($zip, $dest, $sub='') {
    $dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR;
    $sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR;
    $dir = opendir($dest);
    if (!$dir) return false;
    while (false !== ($file = readdir($dir))) {
        if (is_file($dest . $file)) {
            $zip->addFile($dest . $file, $sub . $file);
        } else {
            if ($file != '.' and $file != '..' and is_dir($dest . $file)) {
                //$zip->addEmptyDir($sub . $file . DIRECTORY_SEPARATOR);
                _backup_dir($zip, $dest . $file, $file);
            }
        }
    }
    closedir($dir);
    return true;
}


function execute_task($op, $data) {
    debug_log('Start...');
    $t1 = microtime(true);
    switch($op) {
    case 'call': //执行任务脚本类
        $cmd = $data;
        if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd;
        elseif (is_array($cmd)) {
            if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0];
        }
        $ret = call($cmd);
        break;
    case 'grab': //抓取网页
        if (is_string($data)) $data = ['url' => $data];
        if (is_array($data)) $ret = grab($data);
        else throw new \Exception('无效的命令参数!');
        break;
    case 'clean': //清理缓存文件夹:dirs 需要清理的文件夹列表,expires 过期时间(秒,默认7天)
        if (isset($data['dirs'])) {
            $ret = clean($data['dirs'], @$data['expires']);
        } else {
            $ret = clean($data);
        }
        break;
    case 'backup': //备份文件:zip 备份到哪个zip文件,dest 需要备份的文件夹
        if (isset($data['zip']) and is_dir($data['dest']))
            $ret = backup($data['zip'], $data['dest']);
        else
            throw new \Exception('没有指定需要备份的文件!');
        break;
    case 'require': //加载脚本文件
        if (is_file($data)) $ret = require($data);
        else throw new \Exception('不是可请求的文件!');
        break;
    case 'test':
        sleep(rand(1, 5));
        $ret = ucfirst(strval($data)). '.PID:'. getmypid();
        break;
    case 'multi': //多进程处理模式
        $results = $childs = [];
        $fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . 'pipe.'. posix_getpid();
        if (!file_exists($fifo)) {
            if (!posix_mkfifo($fifo, 0666)) { //开启进程数据通信管道
                throw new Exception('make pipe failed!');
            }
        }
        //$shmid = shmop_open(ftok(__FILE__, 'h'), 'c', 0644, 4096); //共享内存
        //shmop_write($shmid, serialize([]), 0);
        //$data = unserialize(shmop_read($shmid, 0, 4096));
        //shmop_delete($shmid);
        //shmop_close($shmid);
        foreach($data as $_op => $_datas) {
            $_datas = (array)$_datas; //data 格式为数组表示一个 op 有多个执行数据
            foreach($_datas as $_data) {
                $pid = pcntl_fork();
                if ($pid == 0) { //子进程中执行任务
                    $_ret = execute_task($_op, $_data);
                    $_pid = getmypid();
                    $pipe = fopen($fifo, 'w'); //写
                    //stream_set_blocking($pipe, false);
                    $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => $_ret]);
                    if (strlen($_ret) > 4096) //写入管道的数据最大4K
                        $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => '[RESPONSE_TOO_LONG]']);
                    //debug_log('write pipe: '.$_ret);
                    fwrite($pipe, $_ret.PHP_EOL);
                    fflush($pipe);
                    fclose($pipe);
                    exit(0); //退出子进程
                } elseif ($pid > 0) { //主进程中记录任务
                    $childs[] = $pid;
                    $results[$pid] = 0;
                    debug_log('fork by child: '.$pid);
                    //pcntl_wait($status, WNOHANG);
                } elseif ($pid == -1) {
                    throw new Exception('could not fork at '. getmygid());
                }
            }
        }
        $pipe = fopen($fifo, 'r+'); //读
        stream_set_blocking($pipe, true); //阻塞模式,PID与读取的管道数据可能会不一致。
        $n = 0;
        while(count($childs) > 0) {
            foreach($childs as $i => $pid) {
                $res = pcntl_waitpid($pid, $status, WNOHANG);
                if (-1 == $res || $res > 0) {
                    $_ret = @unserialize(fgets($pipe)); //读取管道数据
                    $results[$pid] = $_ret;
                    unset($childs[$i]);
                    debug_log('read child: '.$pid . ' - ' . json_encode($_ret, 64|256));
                }
                if ($n > 1000) posix_kill($pid, SIGTERM); //超时(10分钟)结束子进程
            }
            usleep(200000); $n++;
        }
        debug_log('child process completed.');
        @fclose($pipe);
        @unlink($fifo);
        $ret = json_encode($results, 64|256);
        break;
    default:
        throw new \Exception('没有可执行的任务!');
        break;
    }
    $t2 = microtime(true);
    $times = round(($t2 - $t1) * 1000, 2);
    $log = sprintf('[%s] %s --> (%s) %sms', strtoupper($op), 
        @json_encode($data, 64|256), @strlen($ret)<65?$ret:@strlen($ret), $times);
    debug_log($log);
    return $ret;
}


// 读取 CLI 命令行参数
$params = getopt('', array('op:', 'data:'));
$op = $params['op'];
$data = unserialize(base64_decode($params['data']));
// 开始执行任务
execute_task($op, $data);



function __autoload($classname) {
    $parts = explode('\\', ltrim($classname, '\\'));
    if (false !== strpos(end($parts), '_')) {
        array_splice($parts, -1, 1, explode('_', current($parts)));
    }
    $filename = implode(DIRECTORY_SEPARATOR, $parts) . '.php';
    if ($filename = stream_resolve_include_path($filename)) {
        include $filename;
    } else if (preg_match('/.*Task$/', $classname)) { //查找以Task结尾的任务脚本类
        include TASK_PATH . DIRECTORY_SEPARATOR . $classname . '.php';
    } else {
        return false;
    }
}


The above is the detailed content of How does PHP use the command line to implement task processing in asynchronous multi-process mode (code). For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:segmentfault.com. If there is any infringement, please contact admin@php.cn delete