搜尋
首頁後端開發php教程PHP 多线程详解

PHP 高级编程之多线程

http://netkiller.github.io/journal/php.thread.html

Mr. Neo Chen (陈景峰), netkiller, BG7NYT


中国广东省深圳市龙华新区民治街道溪山美地
518131
+86 13113668890
+86 755 29812080

版权声明

转载请与作者联系,转载时请务必标明文章原始出处和作者信息及本声明。

文档出处:
http://netkiller.github.io
http://netkiller.sourceforge.net

微信扫描二维码进入 Netkiller 微信订阅号

QQ群:128659835 请注明“读者”

2015-10-26

摘要

2014-03-12 第一版

2014-05-15 第二版

2014-06-13 第三版

2014-07-24 第四版

2015-10-26 第五版

我的系列文档

Netkiller Architect 手札 Netkiller Developer 手札 Netkiller PHP 手札 Netkiller Python 手札 Netkiller Testing 手札
Netkiller Cryptography 手札 Netkiller Linux 手札 Netkiller Debian 手札 Netkiller CentOS 手札 Netkiller FreeBSD 手札
Netkiller Shell 手札 Netkiller Security 手札 Netkiller Web 手札 Netkiller Monitoring 手札 Netkiller Storage 手札
Netkiller Mail 手札 Netkiller Docbook 手札 Netkiller Project 手札 Netkiller Database 手札 Netkiller PostgreSQL 手札
Netkiller MySQL 手札 Netkiller NoSQL 手札 Netkiller LDAP 手札 Netkiller Network 手札 Netkiller Cisco IOS 手札
Netkiller H3C 手札 Netkiller Multimedia 手札 Netkiller Perl 手札 Netkiller Amateur Radio 手札 Netkiller DevOps 手札

您可以使用iBook阅读当前文档

目录

  • 1. 多线程环境安装
    • 1.1. PHP 5.5.9
    • 1.2. 安装 pthreads 扩展
  • 2. Thread
  • 3. Worker 与 Stackable
  • 4. 互斥锁
    • 4.1. 多线程与共享内存
  • 5. 线程同步
  • 6. 线程池
    • 6.1. pthreads Pool类
    • 6.2. 线程池的原理
    • 6.3. 动态队列线程池
    • 6.4. 等待线程池中的线程运行完毕
  • 7. 多线程文件安全读写(文件锁)
  • 8. 多线程与数据连接
    • 8.1. Worker 与 PDO
    • 8.2. Pool 与 PDO
    • 8.3. 数据库持久连接
    • 8.4. 涉及数据库更新
  • 9. Thread And ZeroMQ
    • 9.1. 数据库端
    • 9.2. 数据处理端
  • 10. 延伸阅读

1. 多线程环境安装

1.1. PHP 5.5.9

安装PHP 5.5.9

https://github.com/oscm/shell/blob/master/php/5.5.9.sh

./configure --prefix=/srv/php-5.5.9 \--with-config-file-path=/srv/php-5.5.9/etc \--with-config-file-scan-dir=/srv/php-5.5.9/etc/conf.d \--enable-fpm \--with-fpm-user=www \--with-fpm-group=www \--with-pear \--with-curl \--with-gd \--with-jpeg-dir \--with-png-dir \--with-freetype-dir \--with-zlib-dir \--with-iconv \--with-mcrypt \--with-mhash \--with-pdo-mysql \--with-mysql-sock=/var/lib/mysql/mysql.sock \--with-openssl \--with-xsl \--with-recode \--enable-sockets \--enable-soap \--enable-mbstring \--enable-gd-native-ttf \--enable-zip \--enable-xml \--enable-bcmath \--enable-calendar \--enable-shmop \--enable-dba \--enable-wddx \--enable-sysvsem \--enable-sysvshm \--enable-sysvmsg \--enable-opcache \--enable-pcntl \--enable-maintainer-zts \--disable-debug

编译必须启用zts支持否则无法安装 pthreads(--enable-maintainer-zts)

1.2. 安装 pthreads 扩展

安装https://github.com/oscm/shell/blob/master/php/pecl/pthreads.sh

# curl -s https://raw.github.com/oscm/shell/master/php/pecl/pthreads.sh | bash

查看pthreads是否已经安装

# php -m | grep pthreads

2. Thread

<?phpclass HelloWorld extends Thread {    public function __construct($world) {       $this->world = $world;    }    public function run() {        print_r(sprintf("Hello %s\n", $this->world));    }}$thread = new HelloWorld("World");if ($thread->start()) {    printf("Thread #%lu says: %s\n", $thread->getThreadId(), $thread->join());}?>

3. Worker 与 Stackable

<?phpclass SQLQuery extends Stackable {        public function __construct($sql) {                $this->sql = $sql;        }        public function run() {                $dbh  = $this->worker->getConnection();                $row = $dbh->query($this->sql);                while($member = $row->fetch(PDO::FETCH_ASSOC)){                        print_r($member);                }        }}class ExampleWorker extends Worker {        public static $dbh;        public function __construct($name) {        }        /*        * The run method should just prepare the environment for the work that is coming ...        */        public function run(){                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');        }        public function getConnection(){                return self::$dbh;        }}$worker = new ExampleWorker("My Worker Thread");$work=new SQLQuery('select * from members order by id desc limit 5');$worker->stack($work);$table1 = new SQLQuery('select * from demousers limit 2');$worker->stack($table1);$worker->start();$worker->shutdown();?>

4. 互斥锁

什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。

下面我们举一个例子,一个简单的计数器程序,说明有无互斥锁情况下的不同。

<?php$counter = 0;//$handle=fopen("php://memory", "rw");//$handle=fopen("php://temp", "rw");$handle=fopen("/tmp/counter.txt", "w");fwrite($handle, $counter );fclose($handle);class CounterThread extends Thread {	public function __construct($mutex = null){		$this->mutex = $mutex;        $this->handle = fopen("/tmp/counter.txt", "w+");    }	public function __destruct(){		fclose($this->handle);	}    public function run() {		if($this->mutex)			$locked=Mutex::lock($this->mutex);		$counter = intval(fgets($this->handle));		$counter++;		rewind($this->handle);		fputs($this->handle, $counter );		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);		if($this->mutex)			Mutex::unlock($this->mutex);    }}//没有互斥锁for ($i=0;$i<50;$i++){	$threads[$i] = new CounterThread();	$threads[$i]->start();}//加入互斥锁$mutex = Mutex::create(true);for ($i=0;$i<50;$i++){	$threads[$i] = new CounterThread($mutex);	$threads[$i]->start();}Mutex::unlock($mutex);for ($i=0;$i<50;$i++){	$threads[$i]->join();}Mutex::destroy($mutex);?>

我们使用文件/tmp/counter.txt保存计数器值,每次打开该文件将数值加一,然后写回文件。当多个线程同时操作一个文件的时候,就会线程运行先后取到的数值不同,写回的数值也不同,最终计数器的数值会混乱。

没有加入锁的结果是计数始终被覆盖,最终结果是2

而加入互斥锁后,只有其中的一个进程完成加一工作并释放锁,其他线程才能得到解锁信号,最终顺利完成计数器累加操作

上面例子也可以通过对文件加锁实现,这里主要讲的是多线程锁,后面会涉及文件锁。

4.1. 多线程与共享内存

在共享内存的例子中,没有使用任何锁,仍然可能正常工作,可能工作内存操作本身具备锁的功能。

<?php$tmp = tempnam(__FILE__, 'PHP');$key = ftok($tmp, 'a');$shmid = shm_attach($key);$counter = 0;shm_put_var( $shmid, 1, $counter );class CounterThread extends Thread {	public function __construct($shmid){        $this->shmid = $shmid;    }    public function run() {		$counter = shm_get_var( $this->shmid, 1 );		$counter++;		shm_put_var( $this->shmid, 1, $counter );		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);    }}for ($i=0;$i<100;$i++){	$threads[] = new CounterThread($shmid);}for ($i=0;$i<100;$i++){	$threads[$i]->start();}for ($i=0;$i<100;$i++){	$threads[$i]->join();}shm_remove( $shmid );shm_detach( $shmid );?>

5. 线程同步

有些场景我们不希望 thread->start() 就开始运行程序,而是希望线程等待我们的命令。

$thread->wait();测作用是 thread->start()后线程并不会立即运行,只有收到 $thread->notify(); 发出的信号后才运行

<?php$tmp = tempnam(__FILE__, 'PHP');$key = ftok($tmp, 'a');$shmid = shm_attach($key);$counter = 0;shm_put_var( $shmid, 1, $counter );class CounterThread extends Thread {	public function __construct($shmid){        $this->shmid = $shmid;    }    public function run() {        $this->synchronized(function($thread){            $thread->wait();        }, $this);		$counter = shm_get_var( $this->shmid, 1 );		$counter++;		shm_put_var( $this->shmid, 1, $counter );		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);    }}for ($i=0;$i<100;$i++){	$threads[] = new CounterThread($shmid);}for ($i=0;$i<100;$i++){	$threads[$i]->start();}for ($i=0;$i<100;$i++){	$threads[$i]->synchronized(function($thread){		$thread->notify();	}, $threads[$i]);}for ($i=0;$i<100;$i++){	$threads[$i]->join();}shm_remove( $shmid );shm_detach( $shmid );?>

6. 线程池

6.1. pthreads Pool类

pthreads 提供的 Pool class 例子

<?phpclass WebWorker extends Worker {	public function __construct(SafeLog $logger) {		$this->logger = $logger;	}	protected $loger;}class WebWork extends Stackable {	public function isComplete() {		return $this->complete;	}	public function run() {		$this->worker			->logger			->log("%s executing in Thread #%lu",				  __CLASS__, $this->worker->getThreadId());		$this->complete = true;	}	protected $complete;}class SafeLog extends Stackable {	protected function log($message, $args = []) {		$args = func_get_args();		if (($message = array_shift($args))) {			echo vsprintf(				"{$message}\n", $args);		}	}}$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);$pool->submit($w=new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->shutdown();$pool->collect(function($work){	return $work->isComplete();});var_dump($pool);

现在我来详细讲解线程池,官方文档比较少,很多经验是笔者工作中摸索出来的。

Pool 构造方法第一次参数 size, 手册解释是 Pool 对象可容纳的 Worker 对象的最大数量,但我实际使用发现 size 并不是控制pool压入任务的数量,而是同时并发的线程数。

$pool->submit()是可以无线提交任务的,只要内存允许(参考php.ini配置),但同时执行的线程数由size控制。

6.2. 线程池的原理

我们自行实现一个类来解释Pool工作原理

<?phpclass Update extends Thread {    public $running = false;    public $row = array();    public function __construct($row) {	$this->row = $row;        $this->sql = null;    }    public function run() {	if(strlen($this->row['bankno']) > 100 ){		$bankno = safenet_decrypt($this->row['bankno']);	}else{		$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);		file_put_contents("bankno_error.log", $error, FILE_APPEND);	}	if( strlen($bankno) > 7 ){		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);		$this->sql = $sql;	}	printf("%s\n",$this->sql);    }}class Pool {	public $pool = array();	public function __construct($count) {		$this->count = $count;	}	public function push($row){		if(count($this->pool) < $this->count){			$this->pool[] = new Update($row);			return true;		}else{			return false;		}	}	public function start(){		foreach ( $this->pool as $id => $worker){			$this->pool[$id]->start();		}	}	public function join(){		foreach ( $this->pool as $id => $worker){               $this->pool[$id]->join();		}	}	public function clean(){		foreach ( $this->pool as $id => $worker){			if(! $worker->isRunning()){            	unset($this->pool[$id]);            }		}	}}try {	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',		PDO::MYSQL_ATTR_COMPRESS => true		)	);	$sql  = "select id,bankno from members order by id desc";	$row = $dbh->query($sql);	$pool = new Pool(5);	while($member = $row->fetch(PDO::FETCH_ASSOC))	{		while(true){			if($pool->push($member)){ //压入任务到池中				break;			}else{ //如果池已经满,就开始启动线程				$pool->start();				$pool->join();				$pool->clean();			}		}	}	$pool->start();    $pool->join();	$dbh = null;} catch (Exception $e) {    echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n";}?>

6.3. 动态队列线程池

上面的例子是当线程池满后执行start统一启动,下面的例子是只要线程池中有空闲便立即创建新线程。

<?phpclass Update extends Thread {    public $running = false;    public $row = array();    public function __construct($row) {	$this->row = $row;        $this->sql = null;	//print_r($this->row);    }    public function run() {	if(strlen($this->row['bankno']) > 100 ){		$bankno = safenet_decrypt($this->row['bankno']);	}else{		$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);		file_put_contents("bankno_error.log", $error, FILE_APPEND);	}	if( strlen($bankno) > 7 ){		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);		$this->sql = $sql;	}	printf("%s\n",$this->sql);    }}try {	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',		PDO::MYSQL_ATTR_COMPRESS => true		)	);	$sql     = "select id,bankno from members order by id desc limit 50";	$row = $dbh->query($sql);	$pool = array();	while($member = $row->fetch(PDO::FETCH_ASSOC))	{		$id 	= $member['id'];		while (true){			if(count($pool) < 5){				$pool[$id] = new Update($member);				$pool[$id]->start();				break;			}else{				foreach ( $pool as $name => $worker){					if(! $worker->isRunning()){						unset($pool[$name]);					}				}			}		}	}	$dbh = null;} catch (Exception $e) {    echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n";}?>

6.4. 等待线程池中的线程运行完毕

$pool->submit 是非阻塞,提交到线程池中,就会运行下面代码,有时我们希望等待线程执行完毕,收集线程的工作状况。

$mutex = Mutex::create();		$pool = new Pool ( self::MAXCONN , \ImportWorker::class, array($this->config, $mutex) );				$pool->collect(function($work){				return $work->isComplete();			});				foreach($tasks as $task){			$this->logger ( __CLASS__, sprintf("Task %s %s", $task->file, 'Processing') );			pcntl_signal_dispatch();						if(Signal::get() == SIGHUP){				Signal::reset();				break;			}						if(file_exists ($task->file)){				$handle = fopen($task->file, 'r');				$i = 0;				while (($row = fgetcsv($handle, 100000, ',')) !== false) {					$work[$i] =  new Import ( $task, $row );					$pool->submit ( $work[$i] );					$i++;					//$pool->submit ( new Import ( $task, $row ));				}						fclose($handle);								$waiting = true;				while($waiting){										for($i=0;$i<count($work);$i++){												if($work[$i]->isComplete()){							Counter::$completed++;						}						//printf("work %s:%s \n", count($work), Counter::$completed);						if(Counter::$completed == count($work)){							$waiting = false;							break;						}					}					sleep(1);				}				$this->completedTask($task);			}else{				$this->failedTask($task);			}			//printf("Ignore: %s\n", Counter::$ignore ) ;		}				$pool->shutdown ();		//Mutex::unlock($mutex);		Mutex::destroy($mutex);

while($waiting) 对持续运行,直到所有线程都完成后才会退出。

7. 多线程文件安全读写(文件锁)

在多线程中读写文件但进程是有区别的,读取内容比较容易时间,但写入数据就需要保证同一时刻只能有一个进程操作,虽然通过互斥锁可以解决,但从安全的角度文件必须上锁。

文件锁种类。

LOCK_SH 取得共享锁定(读取的程序)。LOCK_EX 取得独占锁定(写入的程序。LOCK_UN 释放锁定(无论共享或独占)。LOCK_NB 如果不希望 flock() 在锁定时堵塞

共享锁例子

<?php$fp = fopen("/tmp/lock.txt", "r+");if (flock($fp, LOCK_EX)) {  // 进行排它型锁定    ftruncate($fp, 0);      // truncate file    fwrite($fp, "Write something here\n");    fflush($fp);            // flush output before releasing the lock    flock($fp, LOCK_UN);    // 释放锁定} else {    echo "Couldn't get the lock!";}fclose($fp);?>

共享锁例子2

<?php$fp = fopen('/tmp/lock.txt', 'r+');/* Activate the LOCK_NB option on an LOCK_EX operation */if(!flock($fp, LOCK_EX | LOCK_NB)) {    echo 'Unable to obtain lock';    exit(-1);}/* ... */fclose($fp);?>

8. 多线程与数据连接

多线程中操作数据库总结与注意事项 pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。

8.1. Worker 与 PDO

<?phpclass Work extends Stackable {        public function __construct() {        }        public function run() {                $dbh  = $this->worker->getConnection();                $sql     = "select id,name from members order by id desc limit 50";                $row = $dbh->query($sql);                while($member = $row->fetch(PDO::FETCH_ASSOC)){                        print_r($member);                }        }}class ExampleWorker extends Worker {        public static $dbh;        public function __construct($name) {        }        /*        * The run method should just prepare the environment for the work that is coming ...        */        public function run(){                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');        }        public function getConnection(){                return self::$dbh;        }}$worker = new ExampleWorker("My Worker Thread");$work=new Work();$worker->stack($work);$worker->start();$worker->shutdown();?>

8.2. Pool 与 PDO

在线程池中链接数据库

# cat pool.php<?phpclass ExampleWorker extends Worker {	public function __construct(Logging $logger) {		$this->logger = $logger;	}	protected $logger;}/* the collectable class implements machinery for Pool::collect */class Work extends Stackable {	public function __construct($number) {		$this->number = $number;	}	public function run() {                $dbhost = 'db.example.com';               // 数据库服务器                $dbuser = 'example.com';                 // 数据库用户名                $dbpw = 'password';                               // 数据库密码                $dbname = 'example_real';		$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',                        PDO::MYSQL_ATTR_COMPRESS => true,			PDO::ATTR_PERSISTENT => true                        )                );		$sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'";		#echo $sql;		$row = $dbh->query($sql);		$mt4_trades  = $row->fetch(PDO::FETCH_ASSOC);		if($mt4_trades){			$row = null;			$sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';";			$dbh->query($sql);			#printf("%s\n",$sql);		}		$dbh = null;		printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']);	}}class Logging extends Stackable {	protected  static $dbh;	public function __construct() {		$dbhost = 'db.example.com';			// 数据库服务器	        $dbuser = 'example.com';                 // 数据库用户名        	$dbpw = 'password';                               // 数据库密码		$dbname = 'example_real';			// 数据库名		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(			PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',			PDO::MYSQL_ATTR_COMPRESS => true			)		);	}	protected function log($message, $args = []) {		$args = func_get_args();		if (($message = array_shift($args))) {			echo vsprintf("{$message}\n", $args);		}	}	protected function getConnection(){                return self::$dbh;        }}$pool = new Pool(200, \ExampleWorker::class, [new Logging()]);$dbhost = 'db.example.com';                      // 数据库服务器$dbuser = 'example.com';                 // 数据库用户名$dbpw = 'password';                               // 数据库密码$dbname = 'db_example';$dbh    = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',                        PDO::MYSQL_ATTR_COMPRESS => true                        )                );$sql = "select `order`,name from accounts where deposit_time is null order by id desc";$row = $dbh->query($sql);while($account = $row->fetch(PDO::FETCH_ASSOC)){        $pool->submit(new Work($account));}$pool->shutdown();?>

进一步改进上面程序,我们使用单例模式 $this->worker->getInstance(); 全局仅仅做一次数据库连接,线程使用共享的数据库连接

<?phpclass ExampleWorker extends Worker {	#public function __construct(Logging $logger) {	#	$this->logger = $logger;	#}	#protected $logger;	protected  static $dbh;	public function __construct() {	}	public function run(){		$dbhost = 'db.example.com';			// 数据库服务器	    $dbuser = 'example.com';        	// 数据库用户名        $dbpw = 'password';             	// 数据库密码		$dbname = 'example';				// 数据库名		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(			PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',			PDO::MYSQL_ATTR_COMPRESS => true,			PDO::ATTR_PERSISTENT => true			)		);	}	protected function getInstance(){        return self::$dbh;    }}/* the collectable class implements machinery for Pool::collect */class Work extends Stackable {	public function __construct($data) {		$this->data = $data;		#print_r($data);	}	public function run() {		#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );		try {			$dbh  = $this->worker->getInstance();			#print_r($dbh);               		$id = $this->data['id'];			$mobile = safenet_decrypt($this->data['mobile']);			#printf("%d, %s \n", $id, $mobile);			if(strlen($mobile) > 11){				$mobile = substr($mobile, -11);			}			if($mobile == 'null'){			#	$sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'";			#	printf("%s\n",$sql);			#	$dbh->query($sql);				$mobile = '';				$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";			}else{				$sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id";			}			$sth = $dbh->prepare($sql);			$sth->bindValue(':mobile', $mobile);			$sth->bindValue(':id', $id);			$sth->execute();			#echo $sth->debugDumpParams();		}		catch(PDOException $e) {			$error = sprintf("%s,%s\n", $mobile, $id );			file_put_contents("mobile_error.log", $error, FILE_APPEND);		}		#$dbh = null;		printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);		#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);	}}$pool = new Pool(100, \ExampleWorker::class, []);#foreach (range(0, 100) as $number) {#	$pool->submit(new Work($number));#}$dbhost = 'db.example.com';                     // 数据库服务器$dbuser = 'example.com';                 		// 数据库用户名$dbpw = 'password';                             // 数据库密码$dbname = 'example';$dbh    = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',                        PDO::MYSQL_ATTR_COMPRESS => true                        )                );#print_r($dbh);#$sql = "select id, mobile from members where id < :id";#$sth = $dbh->prepare($sql);#$sth->bindValue(':id',300);#$sth->execute();#$result = $sth->fetchAll();#print_r($result);##$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";#$sth = $dbh->prepare($sql);#$sth->bindValue(':mobile', 'aa');#$sth->bindValue(':id','272');#echo $sth->execute();#echo $sth->queryString;#echo $sth->debugDumpParams();$sql = "select id, mobile from members order by id asc"; // limit 1000";$row = $dbh->query($sql);while($members = $row->fetch(PDO::FETCH_ASSOC)){        #$order =  $account['order'];        #printf("%s\n",$order);        //print_r($members);        $pool->submit(new Work($members));		#unset($account['order']);}$pool->shutdown();?>

8.3. 数据库持久连接

总的来说 pthreads 仍然处在发展中,仍有一些不足的地方,我们也可以看到pthreads的git在不断改进这个项目

数据库持久链接很重要,否则每个线程都会开启一次数据库连接,然后关闭,会导致很多链接超时。

<?php$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(    PDO::ATTR_PERSISTENT => true));?>

但有些场景数据库持久链接适得其反,所以根据你的场景选择链接方式

<?php$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(    PDO::ATTR_PERSISTENT => false));?>

由于现成持续链接数据,有时可能因为数据库或者网络原因导致数据无法连接,程序抛出异常或终止,所以使用单例并不保险。

protected function getInstance(){	return self::$dbh;}

为单例增加重新连接功能

class SenderWorker extends Worker {	protected $config;	protected static $dbh;	protected static $amqp;		public function __construct($config) {		$this->config = $config;		$this->logger = new Logger();	}	public function run() {	}	private function connect(){		try {			$dbhost = $this->config['database']['host'];			$dbport = $this->config['database']['port'];			$dbuser = $this->config['database']['user'];			$dbpass = $this->config['database']['password'];			$dbname = $this->config['database']['dbname'];			self::$dbh = new PDO ( "mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array (					PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',					PDO::MYSQL_ATTR_COMPRESS => true					/*PDO::ATTR_PERSISTENT => true*/			) );			self::$dbh->setAttribute ( PDO::ATTR_ERRMODE, PDO::ERRMODE_WARNING );		} catch ( PDOException $e ) {			$this->logger ( 'Exception worker', $e->getMessage( ) );		} catch ( Exception $e ) {			$this->logger ( 'Exception worker', $e->getMessage( ) );		}	}	protected function getInstance() {		if(!self::$dbh) {			$this->connect();			$this->logger ( 'Database', sprintf("Connect database %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) );		}else{			$this->logger ( 'Database', sprintf("Get instance database %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) );		}				if(self::$dbh){			return self::$dbh;		}else{			$this->logger ( 'Database', sprintf("Connect database is error %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) );			$this->logger ( 'Error', sprintf("Worker is shutdown %s", $this->getThreadId ()) );			$this->shutdown();		}	}		public function logger($type, $message) {		$this->logger->logger($type, $message);	}		public function getAmqpInstance(){		if(!self::$amqp){			self::$amqp = new AMQPConnection(array(				'host' 	=> $this->config['amqp']['host'], 				'port' 	=> $this->config['amqp']['port'], 				'vhost'	=> $this->config['amqp']['vhost'], 				'login' => $this->config['amqp']['login'], 				'password' => $this->config['amqp']['password']			));			$this->logger ( 'AMQP', sprintf("Connect amqp %s, %s", $this->config['amqp']['host'], $this->getThreadId ()) );		}else{			$this->logger ( 'AMQP', sprintf("Get instance amqp %s, %s", $this->config['amqp']['host'], $this->getThreadId ()) );		}			return self::$amqp;	}}

每次调用 getInstance() 会判断当前数据库是否已经链接,如果链接丢失,将重新链接数据库。

8.4. 涉及数据库更新

多线程编程中对数据库更新操作需要注意的是,有些场景,你需要控制同一时刻只能有一个线程对数据库做Update, Delete, Insert,否则数据容易出错。

例如下面的操作,你会发现程序运行完成后数据字段没有任何变化。这是因为线程间相互覆盖对方之前更新的数据。

$sql = "update import set succeed = succeed+1 where status = :status and id = :id";

解决方法有两种,一种是外部实现排他锁,一种是在数据库内部实现,通过事物处理,解决线程资源争夺,相互覆盖的问题。

private function updateSucceed($task){		$dbh = $this->worker->getInstance();		$dbh->beginTransaction();		$sql = "update import set succeed = succeed+1 where status = :status and id = :id";		$sth = $dbh->prepare ( $sql );		$sth->bindValue ( ':id', $task->id );		$sth->bindValue ( ':status', 'Processing' );		$status = $sth->execute ();		$dbh->commit();		return $status;	}

9. Thread And ZeroMQ

应用场景,我使用触发器监控数据库某个表,一旦发现有改变就通知程序处理数据

9.1. 数据库端

首先安装ZeroMQ 与 ZeroMQ for MySQL UDF 然后创建触发器。 https://github.com/netkiller/mysql-zmq-plugin

CREATE DEFINER=`dba`@`192.168.%` PROCEDURE `Table_Example`(IN `TICKET` INT, IN `LOGIN` INT, IN `CMD` INT, IN `VOLUME` INT)	LANGUAGE SQL	NOT DETERMINISTIC	READS SQL DATA	SQL SECURITY DEFINER	COMMENT '交易监控'BEGIN	DECLARE Example CHAR(1) DEFAULT 'N';	IF CMD IN ('0','1') THEN		IF VOLUME >=10 AND VOLUME <=90 THEN			select coding into Example from example.members where username = LOGIN and coding = 'Y';			IF Example = 'Y' THEN				select zmq_client('tcp://192.168.2.15:5555', CONCAT(TICKET, ',', LOGIN, ',', VOLUME));			END IF;		END IF;	END IF;ENDCREATE DEFINER=`dba`@`192.168.6.20` TRIGGER `Table_AFTER_INSERT` AFTER INSERT ON `MT4_TRADES` FOR EACH ROW BEGIN	call Table_Example(NEW.TICKET,NEW.LOGIN,NEW.CMD,NEW.VOLUME);END

9.2. 数据处理端

<?phpclass ExampleWorker extends Worker {	#public function __construct(Logging $logger) {	#	$this->logger = $logger;	#}	#protected $logger;	protected  static $dbh;	public function __construct() {	}	public function run(){		$dbhost = '192.168.2.1';			// 数据库服务器		$dbport = 3306;	    $dbuser = 'www';        			// 数据库用户名        $dbpass = 'password';             	// 数据库密码		$dbname = 'example';				// 数据库名		self::$dbh  = new PDO("mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array(			/* PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', */			PDO::MYSQL_ATTR_COMPRESS => true,			PDO::ATTR_PERSISTENT => true			)		);	}	protected function getInstance(){        return self::$dbh;    }}/* the collectable class implements machinery for Pool::collect */class Fee extends Stackable {	public function __construct($msg) {		$trades = explode(",", $msg);		$this->data = $trades;		print_r($trades);	}	public function run() {		#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );		try {			$dbh  = $this->worker->getInstance();			$insert = "INSERT INTO coding_fee(ticket, login, volume, `status`) VALUES(:ticket, :login, :volume,'N')";			$sth = $dbh->prepare($insert);			$sth->bindValue(':ticket', $this->data[0]);			$sth->bindValue(':login', $this->data[1]);			$sth->bindValue(':volume', $this->data[2]);			$sth->execute();			//$sth = null;			//$dbh = null;			/* 业务实现在此处 */			$update = "UPDATE coding_fee SET `status` = 'Y' WHERE ticket = :ticket and `status` = 'N'";			$sth = $dbh->prepare($update);			$sth->bindValue(':ticket', $this->data[0]);			$sth->execute();			//echo $sth->queryString;		}		catch(PDOException $e) {			$error = sprintf("%s,%s\n", $mobile, $id );			file_put_contents("mobile_error.log", $error, FILE_APPEND);		}		#$dbh = null;		//printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);		#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);	}}class Example {	/* config */	const LISTEN = "tcp://192.168.2.15:5555";	const MAXCONN = 100;	const pidfile = __CLASS__;	const uid	= 80;	const gid	= 80;	protected $pool = NULL;	protected $zmq = NULL;	public function __construct() {		$this->pidfile = '/var/run/'.self::pidfile.'.pid';	}	private function daemon(){		if (file_exists($this->pidfile)) {			echo "The file $this->pidfile exists.\n";			exit();		}		$pid = pcntl_fork();		if ($pid == -1) {			 die('could not fork');		} else if ($pid) {			 // we are the parent			 //pcntl_wait($status); //Protect against Zombie children			exit($pid);		} else {			// we are the child			file_put_contents($this->pidfile, getmypid());			posix_setuid(self::uid);			posix_setgid(self::gid);			return(getmypid());		}	}	private function start(){		$pid = $this->daemon();		$this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []);		$this->zmq = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);		$this->zmq->bind(self::LISTEN);		/* Loop receiving and echoing back */		while ($message = $this->zmq->recv()) {			if($message){					$this->pool->submit(new Fee($message));					$this->zmq->send('TRUE');			}else{					$this->zmq->send('FALSE');			}		}		$pool->shutdown();	}	private function stop(){		if (file_exists($this->pidfile)) {			$pid = file_get_contents($this->pidfile);			posix_kill($pid, 9);			unlink($this->pidfile);		}	}	private function help($proc){		printf("%s start | stop | help \n", $proc);	}	public function main($argv){		if(count($argv) < 2){			printf("please input help parameter\n");			exit();		}		if($argv[1] === 'stop'){			$this->stop();		}else if($argv[1] === 'start'){			$this->start();		}else{			$this->help($argv[0]);		}	}}$example = new Example();$example->main($argv);

使用方法

# php example.php start# php example.php stop# php example.php help

此程序涉及守候进程实现$this->daemon()运行后转到后台运行,进程ID保存,进程的互斥(不允许同时启动两个进程),线程池连接数以及线程任务等等

10. 延伸阅读

PHP高级编程之消息队列

PHP高级编程之守护进程

陳述
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
如何檢查PHP會話是否已經開始?如何檢查PHP會話是否已經開始?Apr 30, 2025 am 12:20 AM

在PHP中,可以使用session_status()或session_id()來檢查會話是否已啟動。 1)使用session_status()函數,如果返回PHP_SESSION_ACTIVE,則會話已啟動。 2)使用session_id()函數,如果返回非空字符串,則會話已啟動。這兩種方法都能有效地檢查會話狀態,選擇使用哪種方法取決於PHP版本和個人偏好。

描述一個場景,其中使用會話在Web應用程序中至關重要。描述一個場景,其中使用會話在Web應用程序中至關重要。Apr 30, 2025 am 12:16 AM

sessionsarevitalinwebapplications,尤其是在commercePlatform之前。

如何管理PHP中的並發會話訪問?如何管理PHP中的並發會話訪問?Apr 30, 2025 am 12:11 AM

在PHP中管理並發會話訪問可以通過以下方法:1.使用數據庫存儲會話數據,2.採用Redis或Memcached,3.實施會話鎖定策略。這些方法有助於確保數據一致性和提高並發性能。

使用PHP會話的局限性是什麼?使用PHP會話的局限性是什麼?Apr 30, 2025 am 12:04 AM

PHPsessionshaveseverallimitations:1)Storageconstraintscanleadtoperformanceissues;2)Securityvulnerabilitieslikesessionfixationattacksexist;3)Scalabilityischallengingduetoserver-specificstorage;4)Sessionexpirationmanagementcanbeproblematic;5)Datapersis

解釋負載平衡如何影響會話管理以及如何解決。解釋負載平衡如何影響會話管理以及如何解決。Apr 29, 2025 am 12:42 AM

負載均衡會影響會話管理,但可以通過會話複製、會話粘性和集中式會話存儲解決。 1.會話複製在服務器間複製會話數據。 2.會話粘性將用戶請求定向到同一服務器。 3.集中式會話存儲使用獨立服務器如Redis存儲會話數據,確保數據共享。

說明會話鎖定的概念。說明會話鎖定的概念。Apr 29, 2025 am 12:39 AM

Sessionlockingisatechniqueusedtoensureauser'ssessionremainsexclusivetooneuseratatime.Itiscrucialforpreventingdatacorruptionandsecuritybreachesinmulti-userapplications.Sessionlockingisimplementedusingserver-sidelockingmechanisms,suchasReentrantLockinJ

有其他PHP會議的選擇嗎?有其他PHP會議的選擇嗎?Apr 29, 2025 am 12:36 AM

PHP會話的替代方案包括Cookies、Token-basedAuthentication、Database-basedSessions和Redis/Memcached。 1.Cookies通過在客戶端存儲數據來管理會話,簡單但安全性低。 2.Token-basedAuthentication使用令牌驗證用戶,安全性高但需額外邏輯。 3.Database-basedSessions將數據存儲在數據庫中,擴展性好但可能影響性能。 4.Redis/Memcached使用分佈式緩存提高性能和擴展性,但需額外配

在PHP的上下文中定義'會話劫持”一詞。在PHP的上下文中定義'會話劫持”一詞。Apr 29, 2025 am 12:33 AM

Sessionhijacking是指攻擊者通過獲取用戶的sessionID來冒充用戶。防範方法包括:1)使用HTTPS加密通信;2)驗證sessionID的來源;3)使用安全的sessionID生成算法;4)定期更新sessionID。

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

DVWA

DVWA

Damn Vulnerable Web App (DVWA) 是一個PHP/MySQL的Web應用程序,非常容易受到攻擊。它的主要目標是成為安全專業人員在合法環境中測試自己的技能和工具的輔助工具,幫助Web開發人員更好地理解保護網路應用程式的過程,並幫助教師/學生在課堂環境中教授/學習Web應用程式安全性。 DVWA的目標是透過簡單直接的介面練習一些最常見的Web漏洞,難度各不相同。請注意,該軟體中

VSCode Windows 64位元 下載

VSCode Windows 64位元 下載

微軟推出的免費、功能強大的一款IDE編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

SecLists

SecLists

SecLists是最終安全測試人員的伙伴。它是一個包含各種類型清單的集合,這些清單在安全評估過程中經常使用,而且都在一個地方。 SecLists透過方便地提供安全測試人員可能需要的所有列表,幫助提高安全測試的效率和生產力。清單類型包括使用者名稱、密碼、URL、模糊測試有效載荷、敏感資料模式、Web shell等等。測試人員只需將此儲存庫拉到新的測試機上,他就可以存取所需的每種類型的清單。

mPDF

mPDF

mPDF是一個PHP庫,可以從UTF-8編碼的HTML產生PDF檔案。原作者Ian Back編寫mPDF以從他的網站上「即時」輸出PDF文件,並處理不同的語言。與原始腳本如HTML2FPDF相比,它的速度較慢,並且在使用Unicode字體時產生的檔案較大,但支援CSS樣式等,並進行了大量增強。支援幾乎所有語言,包括RTL(阿拉伯語和希伯來語)和CJK(中日韓)。支援嵌套的區塊級元素(如P、DIV),