search
HomeBackend DevelopmentPHP TutorialHow does PHP use the command line to implement task processing in asynchronous multi-process mode (code)

The content of this article is about how PHP uses the command line to implement task processing (code) in asynchronous multi-process mode. It has certain reference value. Friends in need can refer to it. I hope it will be helpful to you. help.

Using PHP to implement asynchronous tasks has always been a problem. Among the existing solutions: PHP’s well-known asynchronous frameworks include swoole and Workerman, but they cannot be used directly in the web environment, even if forced To build a web environment, asynchronous calls are also implemented using multi-process mode. But sometimes there is really no need to start the service and let the server wait for client messages, not to mention that the server code cannot be changed in the middle. This article will introduce how to implement multi-process and asynchronous calls in the web environment in the CLI environment without using any framework or third-party library.

Asynchronous calls in the web environment

There are two commonly used methods

1. Use socket connection

This method is a typical C/S architecture and requires server support.

// 1. 创建socket套接字
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
// 2. 进行socket连接
socket_connect($socket, '127.0.0.1', '3939');
//socket_set_nonblock($socket); // 以非阻塞模式运行,由于在客户端不实用,所以这里不考虑
// 3. 向服务端发送请求
socket_write($socket, $request, strlen($request));
// 4. 接受服务端的回应消息(忽略非阻塞的情况,如果服务端不是提供异步服务,那这一步可以省略)
$recv = socket_read($socket, 2048);
// 5. 关闭socket连接
socket_close($socket);

2. Use popen to open the process pipe

This method uses operating system commands and is directly executed by the operating system.

The asynchronous calls discussed in this article use this method.

$sf = '/path/to/cli_async_task.php'; //要执行的脚本文件
$op = 'call'; //脚本文件接收的参数1
$data = base64_encode(serialize(['TestTask', 'arg1', 'arg2'])); //脚本文件接收的参数2
pclose(popen("php '$sf' --op $op --data $data &", 'r')); //打开之后接着就关闭进程管道,让该进程以守护模式运行
echo PHP_EOL.'异步任务已执行。'.PHP_EOL;

The advantage of this method is that it can be solved in one step and the current process does not require any overhead.
The disadvantage is also obvious: the running status of the task script cannot be tracked.
So the highlight will be the script file that executes the task. The following will introduce the implementation of task processing and multi-processing.

Multi-process task processing in the CLI environment

Note: Multi-process mode only supports Linux, not Windows! !

Here we will introduce each step from 0 (without using any framework or class library), At the end there will be a complete code .

1. Create a script

  • The aspect that cannot be ignored in any script is error handling. So when writing a task processing script, the first step is to write an error handling method.

In PHP, we call the three functions set_exception_handler set_error_handler register_shutdown_function, and then write a custom processing method.

  • The next step is to define the automatic loading function spl_autoload_register to avoid the trouble of require / include every time a new class is used.

  • Define log operation method.

  • Define task processing method.

  • Read the parameters from the command line and start executing the task.

2. Multi-process processing

PHP creates multiple processes by using the pcntl_fork function, which will fork a copy of the current process (shadow clone technique), so there is There are two processes, the current process is the main process (the ontology), and the forked process is the child process (the shadow clone). It should be noted that the code environment of the two processes is the same, and both processes have executed to the pcntl_fork function location. The difference is that the process number obtained by getmypid is different. The most important difference is that when the pcntl_fork function is called, the return value obtained by the child process is 0, while the main process obtains the process number pid of the child process.

Okay, when we know who the child process is, we can let the child process perform tasks.

So how does the main process know the status of the child process?
Use pcntl_wait. This function has two parameters $status and $options. $status is a reference type, used to store the status of the child process. $options has two optional constants WNOHANG|WUNTRACED, which means returning immediately without waiting for the child process to end and waiting for the child process respectively. The process ends. Obviously using WUNTRACED will block the main process. (You can also use the pcntl_waitpid function to obtain the specific pid child process status)

In multiple processes, what the main process has to do is to manage the status of each child process, otherwise the child process is likely to be unable to exit and become a zombie process.

About message communication between multiple processes
This section needs to involve specific business logic, so I can only briefly mention it. Without considering the use of third-party services such as redis, PHP can natively implement pipeline communication and shared memory. The implementation is relatively simple, but the disadvantage is that the usable data capacity is limited, and data can only be exchanged using simple text protocols.

How to manually end all process tasks

如果多进程处理不当,很可能导致进程任务卡死,甚至占用过多系统资源,此时只能手动结束进程。
除了一个个的根据进程号来结束,还有一个快速的方法是首先在任务脚本里自定义进程名称,就是调用cli_set_process_title函数,然后在命令行输入:ps aux|grep cli_async_worker |grep -v grep|awk '{print $2}'|xargs kill -9 (里面的 cli_async_worker 就是自定义的进程名称),这样就可以快速结束多进程任务了。

以下是完整的任务执行脚本代码:

可能无法直接使用,需要修改的地方有:

  1. 脚本目录和日志目录常量

  2. 自动加载任务类的方法(默认是加载脚本目录中以Task结尾的文件)

  3. 其他的如:错误和日志处理方式和文本格式就随意吧...

  4. 如果命名管道文件设置有错误,可能导致进程假死,你可能需要手动删除进程管道通信的代码。

  5. 多进程的例子:execAsyncTask('multi', [ 'test' => ['a', 'b', 'c'], 'grab' => [['url' => 'https://www.baidu.com', 'callback' => 'http://localhost']] ]);。执行情况可以在日志文件中查看。execAsyncTask函数参考【__使用popen打开进程管道__】。

<?php error_reporting(E_ALL ^ E_NOTICE ^ E_USER_WARNING);
@ini_set(&#39;display_errors&#39;, 0);
@ini_set(&#39;date.timezone&#39;, &#39;PRC&#39;);

chdir(__DIR__);

/* 任务脚本目录 */
defined(&#39;TASK_PATH&#39;) or define(&#39;TASK_PATH&#39;, realpath(__DIR__ .&#39;/tasks&#39;));
/* 任务日志目录 */
defined(&#39;TASK_LOGS_PATH&#39;) or define(&#39;TASK_LOGS_PATH&#39;, __DIR__ .&#39;/tasks/logs&#39;);

if (!is_dir(TASK_LOGS_PATH)) @mkdir(TASK_LOGS_PATH, 0777, true);

set_exception_handler(function($e) {
    $time = date(&#39;H:i:s&#39;, time());
    $msg = sprintf(&#39;&#39;. &#39;<h3>[%s] %s (%s)'. "\n". '<pre class="brush:php;toolbar:false">%s
',         $time, $e->getMessage(), $e->getCode(), $e->getTraceAsString()     );     file_put_contents(TASK_LOGS_PATH .'/exception-'.date('Ymd').'.log', $msg.PHP_EOL, FILE_APPEND|LOCK_EX); }); set_error_handler(function($errno, $errmsg, $filename, $line) {     if (!(error_reporting() & $errno)) return;     ob_start();     debug_print_backtrace();     $backtrace = ob_get_contents(); ob_end_clean();     $datetime = date('Y-m-d H:i:s', time());     $msg =  $header) {             if (!is_numeric($_k))                  $header = sprintf('%s: %s', $_k, $header);             $_headers .= $header . "\r\n";         }     }     $headers = "Connection: close\r\n" . $_headers;     $opts = array(         'http' => array(             'method' => strtoupper(@$job['method'] ?: 'get'),             'content' => @$job['data'] ?: null,             'header' => $headers,             'user_agent' => @$job['args']['user_agent'] ?: 'HTTPGRAB/1.0 (compatible)',             'proxy' => @$job['args']['proxy'] ?: null,             'timeout' => intval(@$job['args']['timeout'] ?: 120),             'protocol_version' => @$job['args']['protocol_version'] ?: '1.1',             'max_redirects' => 3,             'ignore_errors' => true         )     );     $ret = @file_get_contents($url, false, stream_context_create($opts));     //debug_log($url.' -->'.strlen($ret));     if ($ret and isset($job['callback'])) {         $postdata = http_build_query(array(                 'msg_id' => @$job['msg_id'] ?: 0,                 'url' => @$job['url'],                 'result' => $ret             ));         $opts = array(             'http' => array(                 'method' => 'POST',                 'header' => 'Content-type:application/x-www-form-urlencoded'. "\r\n",                 'content' => $postdata,                 'timeout' => 30             )         );         file_get_contents($job['callback'], false, stream_context_create($opts));         //debug_log(json_encode(@$http_response_header));         //debug_log($job['callback'].' -->'.$ret2);     }          return $ret; } function clean($tmpdirs, $expires=3600*24*7) {     $ret = [];     foreach ((array)$tmpdirs as $tmpdir) {         $ret[$tmpdir] = 0;         foreach (glob($tmpdir.DIRECTORY_SEPARATOR.'*') as $_file) {             if (fileatime($_file) open($file, \ZipArchive::CREATE)) {         return false;     }     _backup_dir($zip, $dest);          $zip->close();     return $file; } function _backup_dir($zip, $dest, $sub='') {     $dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR;     $sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR;     $dir = opendir($dest);     if (!$dir) return false;     while (false !== ($file = readdir($dir))) {         if (is_file($dest . $file)) {             $zip->addFile($dest . $file, $sub . $file);         } else {             if ($file != '.' and $file != '..' and is_dir($dest . $file)) {                 //$zip->addEmptyDir($sub . $file . DIRECTORY_SEPARATOR);                 _backup_dir($zip, $dest . $file, $file);             }         }     }     closedir($dir);     return true; } function execute_task($op, $data) {     debug_log('Start...');     $t1 = microtime(true);     switch($op) {     case 'call': //执行任务脚本类         $cmd = $data;         if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd;         elseif (is_array($cmd)) {             if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0];         }         $ret = call($cmd);         break;     case 'grab': //抓取网页         if (is_string($data)) $data = ['url' => $data];         if (is_array($data)) $ret = grab($data);         else throw new \Exception('无效的命令参数!');         break;     case 'clean': //清理缓存文件夹:dirs 需要清理的文件夹列表,expires 过期时间(秒,默认7天)         if (isset($data['dirs'])) {             $ret = clean($data['dirs'], @$data['expires']);         } else {             $ret = clean($data);         }         break;     case 'backup': //备份文件:zip 备份到哪个zip文件,dest 需要备份的文件夹         if (isset($data['zip']) and is_dir($data['dest']))             $ret = backup($data['zip'], $data['dest']);         else             throw new \Exception('没有指定需要备份的文件!');         break;     case 'require': //加载脚本文件         if (is_file($data)) $ret = require($data);         else throw new \Exception('不是可请求的文件!');         break;     case 'test':         sleep(rand(1, 5));         $ret = ucfirst(strval($data)). '.PID:'. getmypid();         break;     case 'multi': //多进程处理模式         $results = $childs = [];         $fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . 'pipe.'. posix_getpid();         if (!file_exists($fifo)) {             if (!posix_mkfifo($fifo, 0666)) { //开启进程数据通信管道                 throw new Exception('make pipe failed!');             }         }         //$shmid = shmop_open(ftok(__FILE__, 'h'), 'c', 0644, 4096); //共享内存         //shmop_write($shmid, serialize([]), 0);         //$data = unserialize(shmop_read($shmid, 0, 4096));         //shmop_delete($shmid);         //shmop_close($shmid);         foreach($data as $_op => $_datas) {             $_datas = (array)$_datas; //data 格式为数组表示一个 op 有多个执行数据             foreach($_datas as $_data) {                 $pid = pcntl_fork();                 if ($pid == 0) { //子进程中执行任务                     $_ret = execute_task($_op, $_data);                     $_pid = getmypid();                     $pipe = fopen($fifo, 'w'); //写                     //stream_set_blocking($pipe, false);                     $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => $_ret]);                     if (strlen($_ret) > 4096) //写入管道的数据最大4K                         $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => '[RESPONSE_TOO_LONG]']);                     //debug_log('write pipe: '.$_ret);                     fwrite($pipe, $_ret.PHP_EOL);                     fflush($pipe);                     fclose($pipe);                     exit(0); //退出子进程                 } elseif ($pid > 0) { //主进程中记录任务                     $childs[] = $pid;                     $results[$pid] = 0;                     debug_log('fork by child: '.$pid);                     //pcntl_wait($status, WNOHANG);                 } elseif ($pid == -1) {                     throw new Exception('could not fork at '. getmygid());                 }             }         }         $pipe = fopen($fifo, 'r+'); //读         stream_set_blocking($pipe, true); //阻塞模式,PID与读取的管道数据可能会不一致。         $n = 0;         while(count($childs) > 0) {             foreach($childs as $i => $pid) {                 $res = pcntl_waitpid($pid, $status, WNOHANG);                 if (-1 == $res || $res > 0) {                     $_ret = @unserialize(fgets($pipe)); //读取管道数据                     $results[$pid] = $_ret;                     unset($childs[$i]);                     debug_log('read child: '.$pid . ' - ' . json_encode($_ret, 64|256));                 }                 if ($n > 1000) posix_kill($pid, SIGTERM); //超时(10分钟)结束子进程             }             usleep(200000); $n++;         }         debug_log('child process completed.');         @fclose($pipe);         @unlink($fifo);         $ret = json_encode($results, 64|256);         break;     default:         throw new \Exception('没有可执行的任务!');         break;     }     $t2 = microtime(true);     $times = round(($t2 - $t1) * 1000, 2);     $log = sprintf('[%s] %s --> (%s) %sms', strtoupper($op),          @json_encode($data, 64|256), @strlen($ret)


The above is the detailed content of How does PHP use the command line to implement task processing in asynchronous multi-process mode (code). For more information, please follow other related articles on the PHP Chinese website!

Statement
This article is reproduced at:segmentfault. If there is any infringement, please contact admin@php.cn delete
php怎么把负数转为正整数php怎么把负数转为正整数Apr 19, 2022 pm 08:59 PM

php把负数转为正整数的方法:1、使用abs()函数将负数转为正数,使用intval()函数对正数取整,转为正整数,语法“intval(abs($number))”;2、利用“~”位运算符将负数取反加一,语法“~$number + 1”。

php怎么实现几秒后执行一个函数php怎么实现几秒后执行一个函数Apr 24, 2022 pm 01:12 PM

实现方法:1、使用“sleep(延迟秒数)”语句,可延迟执行函数若干秒;2、使用“time_nanosleep(延迟秒数,延迟纳秒数)”语句,可延迟执行函数若干秒和纳秒;3、使用“time_sleep_until(time()+7)”语句。

php怎么除以100保留两位小数php怎么除以100保留两位小数Apr 22, 2022 pm 06:23 PM

php除以100保留两位小数的方法:1、利用“/”运算符进行除法运算,语法“数值 / 100”;2、使用“number_format(除法结果, 2)”或“sprintf("%.2f",除法结果)”语句进行四舍五入的处理值,并保留两位小数。

php怎么根据年月日判断是一年的第几天php怎么根据年月日判断是一年的第几天Apr 22, 2022 pm 05:02 PM

判断方法:1、使用“strtotime("年-月-日")”语句将给定的年月日转换为时间戳格式;2、用“date("z",时间戳)+1”语句计算指定时间戳是一年的第几天。date()返回的天数是从0开始计算的,因此真实天数需要在此基础上加1。

php怎么替换nbsp空格符php怎么替换nbsp空格符Apr 24, 2022 pm 02:55 PM

方法:1、用“str_replace("&nbsp;","其他字符",$str)”语句,可将nbsp符替换为其他字符;2、用“preg_replace("/(\s|\&nbsp\;||\xc2\xa0)/","其他字符",$str)”语句。

php怎么判断有没有小数点php怎么判断有没有小数点Apr 20, 2022 pm 08:12 PM

php判断有没有小数点的方法:1、使用“strpos(数字字符串,'.')”语法,如果返回小数点在字符串中第一次出现的位置,则有小数点;2、使用“strrpos(数字字符串,'.')”语句,如果返回小数点在字符串中最后一次出现的位置,则有。

php怎么设置implode没有分隔符php怎么设置implode没有分隔符Apr 18, 2022 pm 05:39 PM

在PHP中,可以利用implode()函数的第一个参数来设置没有分隔符,该函数的第一个参数用于规定数组元素之间放置的内容,默认是空字符串,也可将第一个参数设置为空,语法为“implode(数组)”或者“implode("",数组)”。

php字符串有没有下标php字符串有没有下标Apr 24, 2022 am 11:49 AM

php字符串有下标。在PHP中,下标不仅可以应用于数组和对象,还可应用于字符串,利用字符串的下标和中括号“[]”可以访问指定索引位置的字符,并对该字符进行读写,语法“字符串名[下标值]”;字符串的下标值(索引值)只能是整数类型,起始值为0。

See all articles

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Article

R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
2 weeks agoBy尊渡假赌尊渡假赌尊渡假赌
Repo: How To Revive Teammates
4 weeks agoBy尊渡假赌尊渡假赌尊渡假赌
Hello Kitty Island Adventure: How To Get Giant Seeds
3 weeks agoBy尊渡假赌尊渡假赌尊渡假赌

Hot Tools

SublimeText3 Linux new version

SublimeText3 Linux new version

SublimeText3 Linux latest version

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

Atom editor mac version download

Atom editor mac version download

The most popular open source editor

WebStorm Mac version

WebStorm Mac version

Useful JavaScript development tools

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

Powerful PHP integrated development environment