ホームページ  >  記事  >  バックエンド開発  >  PHPマルチスレッドの詳しい解説

PHPマルチスレッドの詳しい解説

WBOY
WBOYオリジナル
2016-06-23 13:24:001477ブラウズ

PHP 高度なプログラミング マルチスレッド

http://netkiller.github.io/journal/php.thread.html

Neo Chen (陈京峰) 氏、netkiller、

中国広東省深セン市龍華新区民志街西山美地
518131
+86 13113668890
+86 755 29812080
ba46dd73f0e5a249d2fb2422e27d6a91submit() はメモリが許す限りワイヤレスでタスクを送信できますが (php.ini 設定を参照)、同時に実行されるスレッドの数はサイズによって制御されます。

6.2. スレッドプールの原理

プールの動作原理を説明するために独自にクラスを実装します

<?phpclass Update extends Thread {    public $running = false;    public $row = array();    public function __construct($row) {	$this->row = $row;        $this->sql = null;    }    public function run() {	if(strlen($this->row['bankno']) > 100 ){		$bankno = safenet_decrypt($this->row['bankno']);	}else{		$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);		file_put_contents("bankno_error.log", $error, FILE_APPEND);	}	if( strlen($bankno) > 7 ){		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);		$this->sql = $sql;	}	printf("%s\n",$this->sql);    }}class Pool {	public $pool = array();	public function __construct($count) {		$this->count = $count;	}	public function push($row){		if(count($this->pool) < $this->count){			$this->pool[] = new Update($row);			return true;		}else{			return false;		}	}	public function start(){		foreach ( $this->pool as $id => $worker){			$this->pool[$id]->start();		}	}	public function join(){		foreach ( $this->pool as $id => $worker){               $this->pool[$id]->join();		}	}	public function clean(){		foreach ( $this->pool as $id => $worker){			if(! $worker->isRunning()){            	unset($this->pool[$id]);            }		}	}}try {	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',		PDO::MYSQL_ATTR_COMPRESS => true		)	);	$sql  = "select id,bankno from members order by id desc";	$row = $dbh->query($sql);	$pool = new Pool(5);	while($member = $row->fetch(PDO::FETCH_ASSOC))	{		while(true){			if($pool->push($member)){ //压入任务到池中				break;			}else{ //如果池已经满,就开始启动线程				$pool->start();				$pool->join();				$pool->clean();			}		}	}	$pool->start();    $pool->join();	$dbh = null;} catch (Exception $e) {    echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n";}?>

6.3. 動的キュースレッドプール

上記の例は、スレッドプールの起動時に start を実行するものですがいっぱいで、以下はこの例です。スレッド プールに空きができるとすぐに新しいスレッドを作成します。
<?phpclass Update extends Thread {    public $running = false;    public $row = array();    public function __construct($row) {	$this->row = $row;        $this->sql = null;	//print_r($this->row);    }    public function run() {	if(strlen($this->row['bankno']) > 100 ){		$bankno = safenet_decrypt($this->row['bankno']);	}else{		$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);		file_put_contents("bankno_error.log", $error, FILE_APPEND);	}	if( strlen($bankno) > 7 ){		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);		$this->sql = $sql;	}	printf("%s\n",$this->sql);    }}try {	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',		PDO::MYSQL_ATTR_COMPRESS => true		)	);	$sql     = "select id,bankno from members order by id desc limit 50";	$row = $dbh->query($sql);	$pool = array();	while($member = $row->fetch(PDO::FETCH_ASSOC))	{		$id 	= $member['id'];		while (true){			if(count($pool) < 5){				$pool[$id] = new Update($member);				$pool[$id]->start();				break;			}else{				foreach ( $pool as $name => $worker){					if(! $worker->isRunning()){						unset($pool[$name]);					}				}			}		}	}	$dbh = null;} catch (Exception $e) {    echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n";}?>

6.4. スレッド プール内のスレッドの実行が完了するのを待機しています

$pool->submit は、スレッド プールに送信されると、次のコードが実行されるのを待ちたい場合があります。スレッドが実行を完了し、スレッドの作業状況を収集します。
$mutex = Mutex::create();		$pool = new Pool ( self::MAXCONN , \ImportWorker::class, array($this->config, $mutex) );				$pool->collect(function($work){				return $work->isComplete();			});				foreach($tasks as $task){			$this->logger ( __CLASS__, sprintf("Task %s %s", $task->file, 'Processing') );			pcntl_signal_dispatch();						if(Signal::get() == SIGHUP){				Signal::reset();				break;			}						if(file_exists ($task->file)){				$handle = fopen($task->file, 'r');				$i = 0;				while (($row = fgetcsv($handle, 100000, ',')) !== false) {					$work[$i] =  new Import ( $task, $row );					$pool->submit ( $work[$i] );					$i++;					//$pool->submit ( new Import ( $task, $row ));				}						fclose($handle);								$waiting = true;				while($waiting){										for($i=0;$i<count($work);$i++){												if($work[$i]->isComplete()){							Counter::$completed++;						}						//printf("work %s:%s \n", count($work), Counter::$completed);						if(Counter::$completed == count($work)){							$waiting = false;							break;						}					}					sleep(1);				}				$this->completedTask($task);			}else{				$this->failedTask($task);			}			//printf("Ignore: %s\n", Counter::$ignore ) ;		}				$pool->shutdown ();		//Mutex::unlock($mutex);		Mutex::destroy($mutex);

while($waiting) ペアは実行を継続し、すべてのスレッドが完了するまで終了しません。

7. マルチスレッドのファイルの安全な読み取りと書き込み (ファイル ロック)

在多线程中读写文件但进程是有区别的,读取内容比较容易时间,但写入数据就需要保证同一时刻只能有一个进程操作,虽然通过互斥锁可以解决,但从安全的角度文件必须上锁。

文件锁种类。

LOCK_SH 取得共享锁定(读取的程序)。LOCK_EX 取得独占锁定(写入的程序。LOCK_UN 释放锁定(无论共享或独占)。LOCK_NB 如果不希望 flock() 在锁定时堵塞

共享锁例子

<?php$fp = fopen("/tmp/lock.txt", "r+");if (flock($fp, LOCK_EX)) {  // 进行排它型锁定    ftruncate($fp, 0);      // truncate file    fwrite($fp, "Write something here\n");    fflush($fp);            // flush output before releasing the lock    flock($fp, LOCK_UN);    // 释放锁定} else {    echo "Couldn't get the lock!";}fclose($fp);?>

共享锁例子2

<?php$fp = fopen('/tmp/lock.txt', 'r+');/* Activate the LOCK_NB option on an LOCK_EX operation */if(!flock($fp, LOCK_EX | LOCK_NB)) {    echo 'Unable to obtain lock';    exit(-1);}/* ... */fclose($fp);?>

8. 多线程与数据连接

多线程中操作数据库总结与注意事项 pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。

8.1. Worker 与 PDO

<?phpclass Work extends Stackable {        public function __construct() {        }        public function run() {                $dbh  = $this->worker->getConnection();                $sql     = "select id,name from members order by id desc limit 50";                $row = $dbh->query($sql);                while($member = $row->fetch(PDO::FETCH_ASSOC)){                        print_r($member);                }        }}class ExampleWorker extends Worker {        public static $dbh;        public function __construct($name) {        }        /*        * The run method should just prepare the environment for the work that is coming ...        */        public function run(){                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');        }        public function getConnection(){                return self::$dbh;        }}$worker = new ExampleWorker("My Worker Thread");$work=new Work();$worker->stack($work);$worker->start();$worker->shutdown();?>

8.2. Pool 与 PDO

在线程池中链接数据库

# cat pool.php<?phpclass ExampleWorker extends Worker {	public function __construct(Logging $logger) {		$this->logger = $logger;	}	protected $logger;}/* the collectable class implements machinery for Pool::collect */class Work extends Stackable {	public function __construct($number) {		$this->number = $number;	}	public function run() {                $dbhost = 'db.example.com';               // 数据库服务器                $dbuser = 'example.com';                 // 数据库用户名                $dbpw = 'password';                               // 数据库密码                $dbname = 'example_real';		$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',                        PDO::MYSQL_ATTR_COMPRESS => true,			PDO::ATTR_PERSISTENT => true                        )                );		$sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'";		#echo $sql;		$row = $dbh->query($sql);		$mt4_trades  = $row->fetch(PDO::FETCH_ASSOC);		if($mt4_trades){			$row = null;			$sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';";			$dbh->query($sql);			#printf("%s\n",$sql);		}		$dbh = null;		printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']);	}}class Logging extends Stackable {	protected  static $dbh;	public function __construct() {		$dbhost = 'db.example.com';			// 数据库服务器	        $dbuser = 'example.com';                 // 数据库用户名        	$dbpw = 'password';                               // 数据库密码		$dbname = 'example_real';			// 数据库名		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(			PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',			PDO::MYSQL_ATTR_COMPRESS => true			)		);	}	protected function log($message, $args = []) {		$args = func_get_args();		if (($message = array_shift($args))) {			echo vsprintf("{$message}\n", $args);		}	}	protected function getConnection(){                return self::$dbh;        }}$pool = new Pool(200, \ExampleWorker::class, [new Logging()]);$dbhost = 'db.example.com';                      // 数据库服务器$dbuser = 'example.com';                 // 数据库用户名$dbpw = 'password';                               // 数据库密码$dbname = 'db_example';$dbh    = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',                        PDO::MYSQL_ATTR_COMPRESS => true                        )                );$sql = "select `order`,name from accounts where deposit_time is null order by id desc";$row = $dbh->query($sql);while($account = $row->fetch(PDO::FETCH_ASSOC)){        $pool->submit(new Work($account));}$pool->shutdown();?>

进一步改进上面程序,我们使用单例模式 $this->worker->getInstance(); 全局仅仅做一次数据库连接,线程使用共享的数据库连接

<?phpclass ExampleWorker extends Worker {	#public function __construct(Logging $logger) {	#	$this->logger = $logger;	#}	#protected $logger;	protected  static $dbh;	public function __construct() {	}	public function run(){		$dbhost = 'db.example.com';			// 数据库服务器	    $dbuser = 'example.com';        	// 数据库用户名        $dbpw = 'password';             	// 数据库密码		$dbname = 'example';				// 数据库名		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(			PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',			PDO::MYSQL_ATTR_COMPRESS => true,			PDO::ATTR_PERSISTENT => true			)		);	}	protected function getInstance(){        return self::$dbh;    }}/* the collectable class implements machinery for Pool::collect */class Work extends Stackable {	public function __construct($data) {		$this->data = $data;		#print_r($data);	}	public function run() {		#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );		try {			$dbh  = $this->worker->getInstance();			#print_r($dbh);               		$id = $this->data['id'];			$mobile = safenet_decrypt($this->data['mobile']);			#printf("%d, %s \n", $id, $mobile);			if(strlen($mobile) > 11){				$mobile = substr($mobile, -11);			}			if($mobile == 'null'){			#	$sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'";			#	printf("%s\n",$sql);			#	$dbh->query($sql);				$mobile = '';				$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";			}else{				$sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id";			}			$sth = $dbh->prepare($sql);			$sth->bindValue(':mobile', $mobile);			$sth->bindValue(':id', $id);			$sth->execute();			#echo $sth->debugDumpParams();		}		catch(PDOException $e) {			$error = sprintf("%s,%s\n", $mobile, $id );			file_put_contents("mobile_error.log", $error, FILE_APPEND);		}		#$dbh = null;		printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);		#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);	}}$pool = new Pool(100, \ExampleWorker::class, []);#foreach (range(0, 100) as $number) {#	$pool->submit(new Work($number));#}$dbhost = 'db.example.com';                     // 数据库服务器$dbuser = 'example.com';                 		// 数据库用户名$dbpw = 'password';                             // 数据库密码$dbname = 'example';$dbh    = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',                        PDO::MYSQL_ATTR_COMPRESS => true                        )                );#print_r($dbh);#$sql = "select id, mobile from members where id < :id";#$sth = $dbh->prepare($sql);#$sth->bindValue(':id',300);#$sth->execute();#$result = $sth->fetchAll();#print_r($result);##$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";#$sth = $dbh->prepare($sql);#$sth->bindValue(':mobile', 'aa');#$sth->bindValue(':id','272');#echo $sth->execute();#echo $sth->queryString;#echo $sth->debugDumpParams();$sql = "select id, mobile from members order by id asc"; // limit 1000";$row = $dbh->query($sql);while($members = $row->fetch(PDO::FETCH_ASSOC)){        #$order =  $account['order'];        #printf("%s\n",$order);        //print_r($members);        $pool->submit(new Work($members));		#unset($account['order']);}$pool->shutdown();?>

8.3. 数据库持久连接

总的来说 pthreads 仍然处在发展中,仍有一些不足的地方,我们也可以看到pthreads的git在不断改进这个项目

数据库持久链接很重要,否则每个线程都会开启一次数据库连接,然后关闭,会导致很多链接超时。

<?php$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(    PDO::ATTR_PERSISTENT => true));?>

但有些场景数据库持久链接适得其反,所以根据你的场景选择链接方式

<?php$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(    PDO::ATTR_PERSISTENT => false));?>

由于现成持续链接数据,有时可能因为数据库或者网络原因导致数据无法连接,程序抛出异常或终止,所以使用单例并不保险。

protected function getInstance(){	return self::$dbh;}

为单例增加重新连接功能

class SenderWorker extends Worker {	protected $config;	protected static $dbh;	protected static $amqp;		public function __construct($config) {		$this->config = $config;		$this->logger = new Logger();	}	public function run() {	}	private function connect(){		try {			$dbhost = $this->config['database']['host'];			$dbport = $this->config['database']['port'];			$dbuser = $this->config['database']['user'];			$dbpass = $this->config['database']['password'];			$dbname = $this->config['database']['dbname'];			self::$dbh = new PDO ( "mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array (					PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',					PDO::MYSQL_ATTR_COMPRESS => true					/*PDO::ATTR_PERSISTENT => true*/			) );			self::$dbh->setAttribute ( PDO::ATTR_ERRMODE, PDO::ERRMODE_WARNING );		} catch ( PDOException $e ) {			$this->logger ( 'Exception worker', $e->getMessage( ) );		} catch ( Exception $e ) {			$this->logger ( 'Exception worker', $e->getMessage( ) );		}	}	protected function getInstance() {		if(!self::$dbh) {			$this->connect();			$this->logger ( 'Database', sprintf("Connect database %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) );		}else{			$this->logger ( 'Database', sprintf("Get instance database %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) );		}				if(self::$dbh){			return self::$dbh;		}else{			$this->logger ( 'Database', sprintf("Connect database is error %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) );			$this->logger ( 'Error', sprintf("Worker is shutdown %s", $this->getThreadId ()) );			$this->shutdown();		}	}		public function logger($type, $message) {		$this->logger->logger($type, $message);	}		public function getAmqpInstance(){		if(!self::$amqp){			self::$amqp = new AMQPConnection(array(				'host' 	=> $this->config['amqp']['host'], 				'port' 	=> $this->config['amqp']['port'], 				'vhost'	=> $this->config['amqp']['vhost'], 				'login' => $this->config['amqp']['login'], 				'password' => $this->config['amqp']['password']			));			$this->logger ( 'AMQP', sprintf("Connect amqp %s, %s", $this->config['amqp']['host'], $this->getThreadId ()) );		}else{			$this->logger ( 'AMQP', sprintf("Get instance amqp %s, %s", $this->config['amqp']['host'], $this->getThreadId ()) );		}			return self::$amqp;	}}

每次调用 getInstance() 会判断当前数据库是否已经链接,如果链接丢失,将重新链接数据库。

8.4. 涉及数据库更新

多线程编程中对数据库更新操作需要注意的是,有些场景,你需要控制同一时刻只能有一个线程对数据库做Update, Delete, Insert,否则数据容易出错。

例如下面的操作,你会发现程序运行完成后数据字段没有任何变化。这是因为线程间相互覆盖对方之前更新的数据。

$sql = "update import set succeed = succeed+1 where status = :status and id = :id";

解决方法有两种,一种是外部实现排他锁,一种是在数据库内部实现,通过事物处理,解决线程资源争夺,相互覆盖的问题。

private function updateSucceed($task){		$dbh = $this->worker->getInstance();		$dbh->beginTransaction();		$sql = "update import set succeed = succeed+1 where status = :status and id = :id";		$sth = $dbh->prepare ( $sql );		$sth->bindValue ( ':id', $task->id );		$sth->bindValue ( ':status', 'Processing' );		$status = $sth->execute ();		$dbh->commit();		return $status;	}

9. Thread And ZeroMQ

应用场景,我使用触发器监控数据库某个表,一旦发现有改变就通知程序处理数据

9.1. 数据库端

首先安装ZeroMQ 与 ZeroMQ for MySQL UDF 然后创建触发器。 https://github.com/netkiller/mysql-zmq-plugin

CREATE DEFINER=`dba`@`192.168.%` PROCEDURE `Table_Example`(IN `TICKET` INT, IN `LOGIN` INT, IN `CMD` INT, IN `VOLUME` INT)	LANGUAGE SQL	NOT DETERMINISTIC	READS SQL DATA	SQL SECURITY DEFINER	COMMENT '交易监控'BEGIN	DECLARE Example CHAR(1) DEFAULT 'N';	IF CMD IN ('0','1') THEN		IF VOLUME >=10 AND VOLUME <=90 THEN			select coding into Example from example.members where username = LOGIN and coding = 'Y';			IF Example = 'Y' THEN				select zmq_client('tcp://192.168.2.15:5555', CONCAT(TICKET, ',', LOGIN, ',', VOLUME));			END IF;		END IF;	END IF;ENDCREATE DEFINER=`dba`@`192.168.6.20` TRIGGER `Table_AFTER_INSERT` AFTER INSERT ON `MT4_TRADES` FOR EACH ROW BEGIN	call Table_Example(NEW.TICKET,NEW.LOGIN,NEW.CMD,NEW.VOLUME);END

9.2. 数据处理端

<?phpclass ExampleWorker extends Worker {	#public function __construct(Logging $logger) {	#	$this->logger = $logger;	#}	#protected $logger;	protected  static $dbh;	public function __construct() {	}	public function run(){		$dbhost = '192.168.2.1';			// 数据库服务器		$dbport = 3306;	    $dbuser = 'www';        			// 数据库用户名        $dbpass = 'password';             	// 数据库密码		$dbname = 'example';				// 数据库名		self::$dbh  = new PDO("mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array(			/* PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', */			PDO::MYSQL_ATTR_COMPRESS => true,			PDO::ATTR_PERSISTENT => true			)		);	}	protected function getInstance(){        return self::$dbh;    }}/* the collectable class implements machinery for Pool::collect */class Fee extends Stackable {	public function __construct($msg) {		$trades = explode(",", $msg);		$this->data = $trades;		print_r($trades);	}	public function run() {		#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );		try {			$dbh  = $this->worker->getInstance();			$insert = "INSERT INTO coding_fee(ticket, login, volume, `status`) VALUES(:ticket, :login, :volume,'N')";			$sth = $dbh->prepare($insert);			$sth->bindValue(':ticket', $this->data[0]);			$sth->bindValue(':login', $this->data[1]);			$sth->bindValue(':volume', $this->data[2]);			$sth->execute();			//$sth = null;			//$dbh = null;			/* 业务实现在此处 */			$update = "UPDATE coding_fee SET `status` = 'Y' WHERE ticket = :ticket and `status` = 'N'";			$sth = $dbh->prepare($update);			$sth->bindValue(':ticket', $this->data[0]);			$sth->execute();			//echo $sth->queryString;		}		catch(PDOException $e) {			$error = sprintf("%s,%s\n", $mobile, $id );			file_put_contents("mobile_error.log", $error, FILE_APPEND);		}		#$dbh = null;		//printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);		#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);	}}class Example {	/* config */	const LISTEN = "tcp://192.168.2.15:5555";	const MAXCONN = 100;	const pidfile = __CLASS__;	const uid	= 80;	const gid	= 80;	protected $pool = NULL;	protected $zmq = NULL;	public function __construct() {		$this->pidfile = '/var/run/'.self::pidfile.'.pid';	}	private function daemon(){		if (file_exists($this->pidfile)) {			echo "The file $this->pidfile exists.\n";			exit();		}		$pid = pcntl_fork();		if ($pid == -1) {			 die('could not fork');		} else if ($pid) {			 // we are the parent			 //pcntl_wait($status); //Protect against Zombie children			exit($pid);		} else {			// we are the child			file_put_contents($this->pidfile, getmypid());			posix_setuid(self::uid);			posix_setgid(self::gid);			return(getmypid());		}	}	private function start(){		$pid = $this->daemon();		$this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []);		$this->zmq = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);		$this->zmq->bind(self::LISTEN);		/* Loop receiving and echoing back */		while ($message = $this->zmq->recv()) {			if($message){					$this->pool->submit(new Fee($message));					$this->zmq->send('TRUE');			}else{					$this->zmq->send('FALSE');			}		}		$pool->shutdown();	}	private function stop(){		if (file_exists($this->pidfile)) {			$pid = file_get_contents($this->pidfile);			posix_kill($pid, 9);			unlink($this->pidfile);		}	}	private function help($proc){		printf("%s start | stop | help \n", $proc);	}	public function main($argv){		if(count($argv) < 2){			printf("please input help parameter\n");			exit();		}		if($argv[1] === 'stop'){			$this->stop();		}else if($argv[1] === 'start'){			$this->start();		}else{			$this->help($argv[0]);		}	}}$example = new Example();$example->main($argv);

使用方法

# php example.php start# php example.php stop# php example.php help

此程序涉及守候进程实现$this->daemon()运行后转到后台运行,进程ID保存,进程的互斥(不允许同时启动两个进程),线程池连接数以及线程任务等等

10. 延伸阅读

PHP高级编程之消息队列

PHP高级编程之守护进程

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。