


The content of this article is about how PHP uses the command line to implement task processing (code) in asynchronous multi-process mode. It has certain reference value. Friends in need can refer to it. I hope it will be helpful to you. help.
Using PHP to implement asynchronous tasks has always been a problem. Among the existing solutions: PHP’s well-known asynchronous frameworks include swoole and Workerman, but they cannot be used directly in the web environment, even if forced To build a web environment, asynchronous calls are also implemented using multi-process mode. But sometimes there is really no need to start the service and let the server wait for client messages, not to mention that the server code cannot be changed in the middle. This article will introduce how to implement multi-process and asynchronous calls in the web environment in the CLI environment without using any framework or third-party library.
Asynchronous calls in the web environment
There are two commonly used methods
1. Use socket connection
This method is a typical C/S architecture and requires server support.
// 1. 创建socket套接字 $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); // 2. 进行socket连接 socket_connect($socket, '127.0.0.1', '3939'); //socket_set_nonblock($socket); // 以非阻塞模式运行,由于在客户端不实用,所以这里不考虑 // 3. 向服务端发送请求 socket_write($socket, $request, strlen($request)); // 4. 接受服务端的回应消息(忽略非阻塞的情况,如果服务端不是提供异步服务,那这一步可以省略) $recv = socket_read($socket, 2048); // 5. 关闭socket连接 socket_close($socket);
2. Use popen to open the process pipe
This method uses operating system commands and is directly executed by the operating system.
The asynchronous calls discussed in this article use this method.
$sf = '/path/to/cli_async_task.php'; //要执行的脚本文件 $op = 'call'; //脚本文件接收的参数1 $data = base64_encode(serialize(['TestTask', 'arg1', 'arg2'])); //脚本文件接收的参数2 pclose(popen("php '$sf' --op $op --data $data &", 'r')); //打开之后接着就关闭进程管道,让该进程以守护模式运行 echo PHP_EOL.'异步任务已执行。'.PHP_EOL;
The advantage of this method is that it can be solved in one step and the current process does not require any overhead.
The disadvantage is also obvious: the running status of the task script cannot be tracked.
So the highlight will be the script file that executes the task. The following will introduce the implementation of task processing and multi-processing.
Multi-process task processing in the CLI
environment
Note: Multi-process mode only supports Linux, not Windows! !
Here we will introduce each step from 0 (without using any framework or class library), At the end there will be a complete code .
1. Create a script
The aspect that cannot be ignored in any script is error handling. So when writing a task processing script, the first step is to write an error handling method.
In PHP, we call the three functions set_exception_handler set_error_handler register_shutdown_function, and then write a custom processing method.
The next step is to define the automatic loading function spl_autoload_register to avoid the trouble of require / include every time a new class is used.
Define log operation method.
Define task processing method.
Read the parameters from the command line and start executing the task.
2. Multi-process processing
PHP creates multiple processes by using the pcntl_fork function, which will fork a copy of the current process (shadow clone technique), so there is There are two processes, the current process is the main process (the ontology), and the forked process is the child process (the shadow clone). It should be noted that the code environment of the two processes is the same, and both processes have executed to the pcntl_fork function location. The difference is that the process number obtained by getmypid is different. The most important difference is that when the pcntl_fork function is called, the return value obtained by the child process is 0, while the main process obtains the process number pid of the child process.
Okay, when we know who the child process is, we can let the child process perform tasks.
So how does the main process know the status of the child process?
Use pcntl_wait. This function has two parameters $status and $options. $status is a reference type, used to store the status of the child process. $options has two optional constants WNOHANG|WUNTRACED, which means returning immediately without waiting for the child process to end and waiting for the child process respectively. The process ends. Obviously using WUNTRACED will block the main process. (You can also use the pcntl_waitpid function to obtain the specific pid child process status)
In multiple processes, what the main process has to do is to manage the status of each child process, otherwise the child process is likely to be unable to exit and become a zombie process.
About message communication between multiple processes
This section needs to involve specific business logic, so I can only briefly mention it. Without considering the use of third-party services such as redis
, PHP can natively implement pipeline communication and shared memory. The implementation is relatively simple, but the disadvantage is that the usable data capacity is limited, and data can only be exchanged using simple text protocols.
How to manually end all process tasks
如果多进程处理不当,很可能导致进程任务卡死,甚至占用过多系统资源,此时只能手动结束进程。
除了一个个的根据进程号来结束,还有一个快速的方法是首先在任务脚本里自定义进程名称,就是调用cli_set_process_title函数,然后在命令行输入:ps aux|grep cli_async_worker |grep -v grep|awk '{print $2}'|xargs kill -9 (里面的 cli_async_worker 就是自定义的进程名称),这样就可以快速结束多进程任务了。
以下是完整的任务执行脚本代码:
可能无法直接使用,需要修改的地方有:
脚本目录和日志目录常量
自动加载任务类的方法(默认是加载脚本目录中以
Task
结尾的文件)其他的如:错误和日志处理方式和文本格式就随意吧...
如果命名管道文件设置有错误,可能导致进程假死,你可能需要手动删除进程管道通信的代码。
多进程的例子:execAsyncTask('multi', [ 'test' => ['a', 'b', 'c'], 'grab' => [['url' => 'https://www.baidu.com', 'callback' => 'http://localhost']] ]);。执行情况可以在日志文件中查看。execAsyncTask函数参考【__使用popen打开进程管道__】。
<?php error_reporting(E_ALL ^ E_NOTICE ^ E_USER_WARNING); @ini_set('display_errors', 0); @ini_set('date.timezone', 'PRC'); chdir(__DIR__); /* 任务脚本目录 */ defined('TASK_PATH') or define('TASK_PATH', realpath(__DIR__ .'/tasks')); /* 任务日志目录 */ defined('TASK_LOGS_PATH') or define('TASK_LOGS_PATH', __DIR__ .'/tasks/logs'); if (!is_dir(TASK_LOGS_PATH)) @mkdir(TASK_LOGS_PATH, 0777, true); set_exception_handler(function($e) { $time = date('H:i:s', time()); $msg = sprintf(''. '<h3>[%s] %s (%s)'. "\n". '<pre class="brush:php;toolbar:false">%s', $time, $e->getMessage(), $e->getCode(), $e->getTraceAsString() ); file_put_contents(TASK_LOGS_PATH .'/exception-'.date('Ymd').'.log', $msg.PHP_EOL, FILE_APPEND|LOCK_EX); }); set_error_handler(function($errno, $errmsg, $filename, $line) { if (!(error_reporting() & $errno)) return; ob_start(); debug_print_backtrace(); $backtrace = ob_get_contents(); ob_end_clean(); $datetime = date('Y-m-d H:i:s', time()); $msg = $header) { if (!is_numeric($_k)) $header = sprintf('%s: %s', $_k, $header); $_headers .= $header . "\r\n"; } } $headers = "Connection: close\r\n" . $_headers; $opts = array( 'http' => array( 'method' => strtoupper(@$job['method'] ?: 'get'), 'content' => @$job['data'] ?: null, 'header' => $headers, 'user_agent' => @$job['args']['user_agent'] ?: 'HTTPGRAB/1.0 (compatible)', 'proxy' => @$job['args']['proxy'] ?: null, 'timeout' => intval(@$job['args']['timeout'] ?: 120), 'protocol_version' => @$job['args']['protocol_version'] ?: '1.1', 'max_redirects' => 3, 'ignore_errors' => true ) ); $ret = @file_get_contents($url, false, stream_context_create($opts)); //debug_log($url.' -->'.strlen($ret)); if ($ret and isset($job['callback'])) { $postdata = http_build_query(array( 'msg_id' => @$job['msg_id'] ?: 0, 'url' => @$job['url'], 'result' => $ret )); $opts = array( 'http' => array( 'method' => 'POST', 'header' => 'Content-type:application/x-www-form-urlencoded'. "\r\n", 'content' => $postdata, 'timeout' => 30 ) ); file_get_contents($job['callback'], false, stream_context_create($opts)); //debug_log(json_encode(@$http_response_header)); //debug_log($job['callback'].' -->'.$ret2); } return $ret; } function clean($tmpdirs, $expires=3600*24*7) { $ret = []; foreach ((array)$tmpdirs as $tmpdir) { $ret[$tmpdir] = 0; foreach (glob($tmpdir.DIRECTORY_SEPARATOR.'*') as $_file) { if (fileatime($_file) open($file, \ZipArchive::CREATE)) { return false; } _backup_dir($zip, $dest); $zip->close(); return $file; } function _backup_dir($zip, $dest, $sub='') { $dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $dir = opendir($dest); if (!$dir) return false; while (false !== ($file = readdir($dir))) { if (is_file($dest . $file)) { $zip->addFile($dest . $file, $sub . $file); } else { if ($file != '.' and $file != '..' and is_dir($dest . $file)) { //$zip->addEmptyDir($sub . $file . DIRECTORY_SEPARATOR); _backup_dir($zip, $dest . $file, $file); } } } closedir($dir); return true; } function execute_task($op, $data) { debug_log('Start...'); $t1 = microtime(true); switch($op) { case 'call': //执行任务脚本类 $cmd = $data; if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd; elseif (is_array($cmd)) { if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0]; } $ret = call($cmd); break; case 'grab': //抓取网页 if (is_string($data)) $data = ['url' => $data]; if (is_array($data)) $ret = grab($data); else throw new \Exception('无效的命令参数!'); break; case 'clean': //清理缓存文件夹:dirs 需要清理的文件夹列表,expires 过期时间(秒,默认7天) if (isset($data['dirs'])) { $ret = clean($data['dirs'], @$data['expires']); } else { $ret = clean($data); } break; case 'backup': //备份文件:zip 备份到哪个zip文件,dest 需要备份的文件夹 if (isset($data['zip']) and is_dir($data['dest'])) $ret = backup($data['zip'], $data['dest']); else throw new \Exception('没有指定需要备份的文件!'); break; case 'require': //加载脚本文件 if (is_file($data)) $ret = require($data); else throw new \Exception('不是可请求的文件!'); break; case 'test': sleep(rand(1, 5)); $ret = ucfirst(strval($data)). '.PID:'. getmypid(); break; case 'multi': //多进程处理模式 $results = $childs = []; $fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . 'pipe.'. posix_getpid(); if (!file_exists($fifo)) { if (!posix_mkfifo($fifo, 0666)) { //开启进程数据通信管道 throw new Exception('make pipe failed!'); } } //$shmid = shmop_open(ftok(__FILE__, 'h'), 'c', 0644, 4096); //共享内存 //shmop_write($shmid, serialize([]), 0); //$data = unserialize(shmop_read($shmid, 0, 4096)); //shmop_delete($shmid); //shmop_close($shmid); foreach($data as $_op => $_datas) { $_datas = (array)$_datas; //data 格式为数组表示一个 op 有多个执行数据 foreach($_datas as $_data) { $pid = pcntl_fork(); if ($pid == 0) { //子进程中执行任务 $_ret = execute_task($_op, $_data); $_pid = getmypid(); $pipe = fopen($fifo, 'w'); //写 //stream_set_blocking($pipe, false); $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => $_ret]); if (strlen($_ret) > 4096) //写入管道的数据最大4K $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => '[RESPONSE_TOO_LONG]']); //debug_log('write pipe: '.$_ret); fwrite($pipe, $_ret.PHP_EOL); fflush($pipe); fclose($pipe); exit(0); //退出子进程 } elseif ($pid > 0) { //主进程中记录任务 $childs[] = $pid; $results[$pid] = 0; debug_log('fork by child: '.$pid); //pcntl_wait($status, WNOHANG); } elseif ($pid == -1) { throw new Exception('could not fork at '. getmygid()); } } } $pipe = fopen($fifo, 'r+'); //读 stream_set_blocking($pipe, true); //阻塞模式,PID与读取的管道数据可能会不一致。 $n = 0; while(count($childs) > 0) { foreach($childs as $i => $pid) { $res = pcntl_waitpid($pid, $status, WNOHANG); if (-1 == $res || $res > 0) { $_ret = @unserialize(fgets($pipe)); //读取管道数据 $results[$pid] = $_ret; unset($childs[$i]); debug_log('read child: '.$pid . ' - ' . json_encode($_ret, 64|256)); } if ($n > 1000) posix_kill($pid, SIGTERM); //超时(10分钟)结束子进程 } usleep(200000); $n++; } debug_log('child process completed.'); @fclose($pipe); @unlink($fifo); $ret = json_encode($results, 64|256); break; default: throw new \Exception('没有可执行的任务!'); break; } $t2 = microtime(true); $times = round(($t2 - $t1) * 1000, 2); $log = sprintf('[%s] %s --> (%s) %sms', strtoupper($op), @json_encode($data, 64|256), @strlen($ret)
The above is the detailed content of How does PHP use the command line to implement task processing in asynchronous multi-process mode (code). For more information, please follow other related articles on the PHP Chinese website!

php把负数转为正整数的方法:1、使用abs()函数将负数转为正数,使用intval()函数对正数取整,转为正整数,语法“intval(abs($number))”;2、利用“~”位运算符将负数取反加一,语法“~$number + 1”。

实现方法:1、使用“sleep(延迟秒数)”语句,可延迟执行函数若干秒;2、使用“time_nanosleep(延迟秒数,延迟纳秒数)”语句,可延迟执行函数若干秒和纳秒;3、使用“time_sleep_until(time()+7)”语句。

php除以100保留两位小数的方法:1、利用“/”运算符进行除法运算,语法“数值 / 100”;2、使用“number_format(除法结果, 2)”或“sprintf("%.2f",除法结果)”语句进行四舍五入的处理值,并保留两位小数。

判断方法:1、使用“strtotime("年-月-日")”语句将给定的年月日转换为时间戳格式;2、用“date("z",时间戳)+1”语句计算指定时间戳是一年的第几天。date()返回的天数是从0开始计算的,因此真实天数需要在此基础上加1。

方法:1、用“str_replace(" ","其他字符",$str)”语句,可将nbsp符替换为其他字符;2、用“preg_replace("/(\s|\ \;||\xc2\xa0)/","其他字符",$str)”语句。

php判断有没有小数点的方法:1、使用“strpos(数字字符串,'.')”语法,如果返回小数点在字符串中第一次出现的位置,则有小数点;2、使用“strrpos(数字字符串,'.')”语句,如果返回小数点在字符串中最后一次出现的位置,则有。

在PHP中,可以利用implode()函数的第一个参数来设置没有分隔符,该函数的第一个参数用于规定数组元素之间放置的内容,默认是空字符串,也可将第一个参数设置为空,语法为“implode(数组)”或者“implode("",数组)”。

php字符串有下标。在PHP中,下标不仅可以应用于数组和对象,还可应用于字符串,利用字符串的下标和中括号“[]”可以访问指定索引位置的字符,并对该字符进行读写,语法“字符串名[下标值]”;字符串的下标值(索引值)只能是整数类型,起始值为0。


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

SublimeText3 Linux new version
SublimeText3 Linux latest version

Notepad++7.3.1
Easy-to-use and free code editor

Atom editor mac version download
The most popular open source editor

WebStorm Mac version
Useful JavaScript development tools

ZendStudio 13.5.1 Mac
Powerful PHP integrated development environment
