PHP高级编程之守护进程,实现优雅重启
2014-09-01 发表
2015-08-31 更新
2015-10-20 更新,增加优雅重启
例如 apache, nginx, mysql 都是守护进程
很多程序以服务形式存在,他没有终端或UI交互,它可能采用其他方式与其他程序交互,如TCP/UDP Socket, UNIX Socket, fifo。程序一旦启动便进入后台,直到满足条件他便开始处理任务。
以我当前的需求为例,我需要运行一个程序,然后监听某端口,持续接受服务端发起的数据,然后对数据分析处理,再将结果写入到数据库中; 我采用ZeroMQ实现数据收发。
例 1. 多线程守护进程例示
<?phpclass ExampleWorker extends Worker { #public function __construct(Logging $logger) { # $this->logger = $logger; #} #protected $logger; protected static $dbh; public function __construct() { } public function run(){ $dbhost = ''; // 数据库服务器 $dbport = 3306; $dbuser = 'www'; // 数据库用户名 $dbpass = 'qwer123'; // 数据库密码 $dbname = 'example'; // 数据库名 self::$dbh = new PDO("mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array( /* PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', */ PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); } protected function getInstance(){ return self::$dbh; }}/* the collectable class implements machinery for Pool::collect */class Fee extends Stackable { public function __construct($msg) { $trades = explode(",", $msg); $this->data = $trades; print_r($trades); } public function run() { #$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() ); try { $dbh = $this->worker->getInstance(); $insert = "INSERT INTO fee(ticket, login, volume, `status`) VALUES(:ticket, :login, :volume,'N')"; $sth = $dbh->prepare($insert); $sth->bindValue(':ticket', $this->data[0]); $sth->bindValue(':login', $this->data[1]); $sth->bindValue(':volume', $this->data[2]); $sth->execute(); $sth = null; /* ...... */ $update = "UPDATE fee SET `status` = 'Y' WHERE ticket = :ticket and `status` = 'N'"; $sth = $dbh->prepare($update); $sth->bindValue(':ticket', $this->data[0]); $sth->execute(); //echo $sth->queryString; //$dbh = null; } catch(PDOException $e) { $error = sprintf("%s,%s\n", $mobile, $id ); file_put_contents("mobile_error.log", $error, FILE_APPEND); } }}class Example { /* config */ const LISTEN = "tcp://"; const MAXCONN = 100; const pidfile = __CLASS__; const uid = 80; const gid = 80; protected $pool = NULL; protected $zmq = NULL; public function __construct() { $this->pidfile = '/var/run/'.self::pidfile.'.pid'; } private function daemon(){ if (file_exists($this->pidfile)) { echo "The file $this->pidfile exists.\n"; exit(); } $pid = pcntl_fork(); if ($pid == -1) { die('could not fork'); } else if ($pid) { // we are the parent //pcntl_wait($status); //Protect against Zombie children exit($pid); } else { // we are the child file_put_contents($this->pidfile, getmypid()); posix_setuid(self::uid); posix_setgid(self::gid); return(getmypid()); } } private function start(){ $pid = $this->daemon(); $this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []); $this->zmq = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP); $this->zmq->bind(self::LISTEN); /* Loop receiving and echoing back */ while ($message = $this->zmq->recv()) { //print_r($message); //if($trades){ $this->pool->submit(new Fee($message)); $this->zmq->send('TRUE'); //}else{ // $this->zmq->send('FALSE'); //} } $pool->shutdown(); } private function stop(){ if (file_exists($this->pidfile)) { $pid = file_get_contents($this->pidfile); posix_kill($pid, 9); unlink($this->pidfile); } } private function help($proc){ printf("%s start | stop | help \n", $proc); } public function main($argv){ if(count($argv) < 2){ printf("please input help parameter\n"); exit(); } if($argv[1] === 'stop'){ $this->stop(); }else if($argv[1] === 'start'){ $this->start(); }else{ $this->help($argv[0]); } }}$cgse = new Example();$cgse->main($argv);
例 2. 消息队列与守护进程
<?phpdeclare(ticks = 1);require_once( __DIR__.'/autoload.class.php' );umask(077); class EDM { protected $queue; public function __construct() { global $argc, $argv; $this->argc = $argc; $this->argv = $argv; $this->pidfile = $this->argv[0].".pid"; $this->config = new Config('mq'); $this->logging = new Logging(__DIR__.'/log/'.$this->argv[0].'.'.date('Y-m-d').'.log'); //.H:i:s //print_r( $this->config->getArray('mq') ); //pcntl_signal(SIGHUP, array(&$this,"restart")); } protected function msgqueue(){ $exchangeName = 'email'; //交换机名 $queueName = 'email'; //队列名 $routeKey = 'email'; //路由key //创建连接和channel $connection = new AMQPConnection($this->config->getArray('mq')); if (!$connection->connect()) { die("Cannot connect to the broker!\n"); } $this->channel = new AMQPChannel($connection); $this->exchange = new AMQPExchange($this->channel); $this->exchange->setName($exchangeName); $this->exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $this->exchange->setFlags(AMQP_DURABLE); //持久化 $this->exchange->declare(); //echo "Exchange Status:".$this->exchange->declare()."\n"; //创建队列 $this->queue = new AMQPQueue($this->channel); $this->queue->setName($queueName); $this->queue->setFlags(AMQP_DURABLE); //持久化 $this->queue->declare(); //echo "Message Total:".$this->queue->declare()."\n"; //绑定交换机与队列,并指定路由键 $bind = $this->queue->bind($exchangeName, $routeKey); //echo 'Queue Bind: '.$bind."\n"; //阻塞模式接收消息 while(true){ //$this->queue->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 $this->queue->consume(function($envelope, $queue) { $msg = $envelope->getBody(); $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 $this->logging->info('('.'+'.')'.$msg); //$this->logging->debug("Message Total:".$this->queue->declare()); }); $this->channel->qos(0,1); //echo "Message Total:".$this->queue->declare()."\n"; } $conn->disconnect(); } protected function start(){ if (file_exists($this->pidfile)) { printf("%s already running\n", $this->argv[0]); exit(0); } $this->logging->warning("start"); $pid = pcntl_fork(); if ($pid == -1) { die('could not fork'); } else if ($pid) { //pcntl_wait($status); //等待子进程中断,防止子进程成为僵尸进程。 exit(0); } else { posix_setsid(); //printf("pid: %s\n", posix_getpid()); file_put_contents($this->pidfile, posix_getpid()); //posix_kill(posix_getpid(), SIGHUP); $this->msgqueue(); } } protected function stop(){ if (file_exists($this->pidfile)) { $pid = file_get_contents($this->pidfile); posix_kill($pid, SIGTERM); //posix_kill($pid, SIGKILL); unlink($this->pidfile); $this->logging->warning("stop"); }else{ printf("%s haven't running\n", $this->argv[0]); } } protected function restart(){ $this->stop(); $this->start(); } protected function status(){ if (file_exists($this->pidfile)) { $pid = file_get_contents($this->pidfile); printf("%s already running, pid = %s\n", $this->argv[0], $pid); }else{ printf("%s haven't running\n", $this->argv[0]); } } protected function usage(){ printf("Usage: %s {start | stop | restart | status}\n", $this->argv[0]); } public function main(){ //print_r($this->argv); if($this->argc != 2){ $this->usage(); }else{ if($this->argv[1] == 'start'){ $this->start(); }else if($this->argv[1] == 'stop'){ $this->stop(); }else if($this->argv[1] == 'restart'){ $this->restart(); }else if($this->argv[1] == 'status'){ $this->status(); }else{ $this->usage(); } } }}$edm = New EDM();$edm->main();
private function daemon(){ if (file_exists($this->pidfile)) { echo "The file $this->pidfile exists.\n"; exit(); } $pid = pcntl_fork(); if ($pid == -1) { die('could not fork'); } else if ($pid) { // we are the parent //pcntl_wait($status); //Protect against Zombie children exit($pid); } else { // we are the child file_put_contents($this->pidfile, getmypid()); posix_setuid(self::uid); posix_setgid(self::gid); return(getmypid()); } }
程序停止,只需读取pid文件,然后调用posix_kill($pid, 9); 最后将该文件删除。
private function stop(){ if (file_exists($this->pidfile)) { $pid = file_get_contents($this->pidfile); posix_kill($pid, 9); unlink($this->pidfile); } }
protected function getInstance(){ return self::$dbh;}
stop/start 或者 restart都会退出进程,重新启动,导致进程ID改变,同时瞬间退出导致业务闪断。所以很多守护进程都会提供一个reload功能,者就是所谓的优雅重启。
reload 实现原理是给进程发送SIGHUP信号,可以通过kill命令发送 kill -s SIGHUP 64881,也可以通过库函数实现 posix_kill(posix_getpid(), SIGUSR1);
<?phppcntl_signal(SIGTERM, function($signo) { echo "\n This signal is called. [$signo] \n"; Status::$state = -1;});pcntl_signal(SIGHUP, function($signo) { echo "\n This signal is called. [$signo] \n"; Status::$state = 1; Status::$ini = parse_ini_file('test.ini');});class Status{ public static $state = 0; public static $ini = null;}$pid = pcntl_fork();if ($pid == -1) { die('could not fork');}if($pid) { // parent} else { $loop = true; Status::$ini = parse_ini_file('test.ini'); while($loop) { print_r(Status::$ini); while(true) { // Dispatching... pcntl_signal_dispatch(); if(Status::$state == -1) { // Do something and end loop. $loop = false; break; } if(Status::$state == 1) { printf("This program is reload.\r\n"); Status::$state = 0; break; } echo '.'; sleep(1); } echo "\n"; } echo "Finish \n"; exit();}
[root@netkiller pcntl]# cat test.ini [db]host=
# php signal.reload.php Array( [host] => [port] => 3306)
[root@netkiller pcntl]# cat test.ini [db]host=
[root@netkiller pcntl]# ps ax | grep reload64881 pts/0 S 0:00 php -c /srv/php/etc/php-cli.ini signal.reload.php65073 pts/1 S+ 0:00 grep --color=auto reload[root@netkiller pcntl]# kill -s SIGHUP 64881[root@netkiller pcntl]# ps ax | grep reload64881 pts/0 S 0:00 php -c /srv/php/etc/php-cli.ini signal.reload.php65093 pts/1 S+ 0:00 grep --color=auto reload
This signal is called. [1] This program is reload.Array( [host] => [port] => 3306 [user] => test)
#!/bin/shLOGFILE=/var/log/$(basename $0 .sh).logPATTERN="my.php"RECOVERY="/path/to/my.php start"while truedo TIMEPOINT=$(date -d "today" +"%Y-%m-%d_%H:%M:%S") PROC=$(pgrep -o -f ${PATTERN}) #echo ${PROC} if [ -z "${PROC}" ]; then ${RECOVERY} >> $LOGFILE echo "[${TIMEPOINT}] ${PATTERN} ${RECOVERY}" >> $LOGFILE #else #echo "[${TIMEPOINT}] ${PATTERN} ${PROC}" >> $LOGFILE fisleep 5done &