찾다

PHP 高级编程之多线程

http://netkiller.github.io/journal/php.thread.html

Mr. Neo Chen (陈景峰), netkiller, BG7NYT


中国广东省深圳市龙华新区民治街道溪山美地
518131
+86 13113668890
+86 755 29812080

版权声明

转载请与作者联系,转载时请务必标明文章原始出处和作者信息及本声明。

文档出处:
http://netkiller.github.io
http://netkiller.sourceforge.net

微信扫描二维码进入 Netkiller 微信订阅号

QQ群:128659835 请注明“读者”

2015-10-26

摘要

2014-03-12 第一版

2014-05-15 第二版

2014-06-13 第三版

2014-07-24 第四版

2015-10-26 第五版

我的系列文档

Netkiller Architect 手札 Netkiller Developer 手札 Netkiller PHP 手札 Netkiller Python 手札 Netkiller Testing 手札
Netkiller Cryptography 手札 Netkiller Linux 手札 Netkiller Debian 手札 Netkiller CentOS 手札 Netkiller FreeBSD 手札
Netkiller Shell 手札 Netkiller Security 手札 Netkiller Web 手札 Netkiller Monitoring 手札 Netkiller Storage 手札
Netkiller Mail 手札 Netkiller Docbook 手札 Netkiller Project 手札 Netkiller Database 手札 Netkiller PostgreSQL 手札
Netkiller MySQL 手札 Netkiller NoSQL 手札 Netkiller LDAP 手札 Netkiller Network 手札 Netkiller Cisco IOS 手札
Netkiller H3C 手札 Netkiller Multimedia 手札 Netkiller Perl 手札 Netkiller Amateur Radio 手札 Netkiller DevOps 手札

您可以使用iBook阅读当前文档

目录

  • 1. 多线程环境安装
    • 1.1. PHP 5.5.9
    • 1.2. 安装 pthreads 扩展
  • 2. Thread
  • 3. Worker 与 Stackable
  • 4. 互斥锁
    • 4.1. 多线程与共享内存
  • 5. 线程同步
  • 6. 线程池
    • 6.1. pthreads Pool类
    • 6.2. 线程池的原理
    • 6.3. 动态队列线程池
    • 6.4. 等待线程池中的线程运行完毕
  • 7. 多线程文件安全读写(文件锁)
  • 8. 多线程与数据连接
    • 8.1. Worker 与 PDO
    • 8.2. Pool 与 PDO
    • 8.3. 数据库持久连接
    • 8.4. 涉及数据库更新
  • 9. Thread And ZeroMQ
    • 9.1. 数据库端
    • 9.2. 数据处理端
  • 10. 延伸阅读

1. 多线程环境安装

1.1. PHP 5.5.9

安装PHP 5.5.9

https://github.com/oscm/shell/blob/master/php/5.5.9.sh

./configure --prefix=/srv/php-5.5.9 \--with-config-file-path=/srv/php-5.5.9/etc \--with-config-file-scan-dir=/srv/php-5.5.9/etc/conf.d \--enable-fpm \--with-fpm-user=www \--with-fpm-group=www \--with-pear \--with-curl \--with-gd \--with-jpeg-dir \--with-png-dir \--with-freetype-dir \--with-zlib-dir \--with-iconv \--with-mcrypt \--with-mhash \--with-pdo-mysql \--with-mysql-sock=/var/lib/mysql/mysql.sock \--with-openssl \--with-xsl \--with-recode \--enable-sockets \--enable-soap \--enable-mbstring \--enable-gd-native-ttf \--enable-zip \--enable-xml \--enable-bcmath \--enable-calendar \--enable-shmop \--enable-dba \--enable-wddx \--enable-sysvsem \--enable-sysvshm \--enable-sysvmsg \--enable-opcache \--enable-pcntl \--enable-maintainer-zts \--disable-debug

编译必须启用zts支持否则无法安装 pthreads(--enable-maintainer-zts)

1.2. 安装 pthreads 扩展

安装https://github.com/oscm/shell/blob/master/php/pecl/pthreads.sh

# curl -s https://raw.github.com/oscm/shell/master/php/pecl/pthreads.sh | bash

查看pthreads是否已经安装

# php -m | grep pthreads

2. Thread

<?phpclass HelloWorld extends Thread {    public function __construct($world) {       $this->world = $world;    }    public function run() {        print_r(sprintf("Hello %s\n", $this->world));    }}$thread = new HelloWorld("World");if ($thread->start()) {    printf("Thread #%lu says: %s\n", $thread->getThreadId(), $thread->join());}?>

3. Worker 与 Stackable

<?phpclass SQLQuery extends Stackable {        public function __construct($sql) {                $this->sql = $sql;        }        public function run() {                $dbh  = $this->worker->getConnection();                $row = $dbh->query($this->sql);                while($member = $row->fetch(PDO::FETCH_ASSOC)){                        print_r($member);                }        }}class ExampleWorker extends Worker {        public static $dbh;        public function __construct($name) {        }        /*        * The run method should just prepare the environment for the work that is coming ...        */        public function run(){                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');        }        public function getConnection(){                return self::$dbh;        }}$worker = new ExampleWorker("My Worker Thread");$work=new SQLQuery('select * from members order by id desc limit 5');$worker->stack($work);$table1 = new SQLQuery('select * from demousers limit 2');$worker->stack($table1);$worker->start();$worker->shutdown();?>

4. 互斥锁

什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。

下面我们举一个例子,一个简单的计数器程序,说明有无互斥锁情况下的不同。

<?php$counter = 0;//$handle=fopen("php://memory", "rw");//$handle=fopen("php://temp", "rw");$handle=fopen("/tmp/counter.txt", "w");fwrite($handle, $counter );fclose($handle);class CounterThread extends Thread {	public function __construct($mutex = null){		$this->mutex = $mutex;        $this->handle = fopen("/tmp/counter.txt", "w+");    }	public function __destruct(){		fclose($this->handle);	}    public function run() {		if($this->mutex)			$locked=Mutex::lock($this->mutex);		$counter = intval(fgets($this->handle));		$counter++;		rewind($this->handle);		fputs($this->handle, $counter );		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);		if($this->mutex)			Mutex::unlock($this->mutex);    }}//没有互斥锁for ($i=0;$i<50;$i++){	$threads[$i] = new CounterThread();	$threads[$i]->start();}//加入互斥锁$mutex = Mutex::create(true);for ($i=0;$i<50;$i++){	$threads[$i] = new CounterThread($mutex);	$threads[$i]->start();}Mutex::unlock($mutex);for ($i=0;$i<50;$i++){	$threads[$i]->join();}Mutex::destroy($mutex);?>

我们使用文件/tmp/counter.txt保存计数器值,每次打开该文件将数值加一,然后写回文件。当多个线程同时操作一个文件的时候,就会线程运行先后取到的数值不同,写回的数值也不同,最终计数器的数值会混乱。

没有加入锁的结果是计数始终被覆盖,最终结果是2

而加入互斥锁后,只有其中的一个进程完成加一工作并释放锁,其他线程才能得到解锁信号,最终顺利完成计数器累加操作

上面例子也可以通过对文件加锁实现,这里主要讲的是多线程锁,后面会涉及文件锁。

4.1. 多线程与共享内存

在共享内存的例子中,没有使用任何锁,仍然可能正常工作,可能工作内存操作本身具备锁的功能。

<?php$tmp = tempnam(__FILE__, 'PHP');$key = ftok($tmp, 'a');$shmid = shm_attach($key);$counter = 0;shm_put_var( $shmid, 1, $counter );class CounterThread extends Thread {	public function __construct($shmid){        $this->shmid = $shmid;    }    public function run() {		$counter = shm_get_var( $this->shmid, 1 );		$counter++;		shm_put_var( $this->shmid, 1, $counter );		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);    }}for ($i=0;$i<100;$i++){	$threads[] = new CounterThread($shmid);}for ($i=0;$i<100;$i++){	$threads[$i]->start();}for ($i=0;$i<100;$i++){	$threads[$i]->join();}shm_remove( $shmid );shm_detach( $shmid );?>

5. 线程同步

有些场景我们不希望 thread->start() 就开始运行程序,而是希望线程等待我们的命令。

$thread->wait();测作用是 thread->start()后线程并不会立即运行,只有收到 $thread->notify(); 发出的信号后才运行

<?php$tmp = tempnam(__FILE__, 'PHP');$key = ftok($tmp, 'a');$shmid = shm_attach($key);$counter = 0;shm_put_var( $shmid, 1, $counter );class CounterThread extends Thread {	public function __construct($shmid){        $this->shmid = $shmid;    }    public function run() {        $this->synchronized(function($thread){            $thread->wait();        }, $this);		$counter = shm_get_var( $this->shmid, 1 );		$counter++;		shm_put_var( $this->shmid, 1, $counter );		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);    }}for ($i=0;$i<100;$i++){	$threads[] = new CounterThread($shmid);}for ($i=0;$i<100;$i++){	$threads[$i]->start();}for ($i=0;$i<100;$i++){	$threads[$i]->synchronized(function($thread){		$thread->notify();	}, $threads[$i]);}for ($i=0;$i<100;$i++){	$threads[$i]->join();}shm_remove( $shmid );shm_detach( $shmid );?>

6. 线程池

6.1. pthreads Pool类

pthreads 提供的 Pool class 例子

<?phpclass WebWorker extends Worker {	public function __construct(SafeLog $logger) {		$this->logger = $logger;	}	protected $loger;}class WebWork extends Stackable {	public function isComplete() {		return $this->complete;	}	public function run() {		$this->worker			->logger			->log("%s executing in Thread #%lu",				  __CLASS__, $this->worker->getThreadId());		$this->complete = true;	}	protected $complete;}class SafeLog extends Stackable {	protected function log($message, $args = []) {		$args = func_get_args();		if (($message = array_shift($args))) {			echo vsprintf(				"{$message}\n", $args);		}	}}$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);$pool->submit($w=new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->shutdown();$pool->collect(function($work){	return $work->isComplete();});var_dump($pool);

现在我来详细讲解线程池,官方文档比较少,很多经验是笔者工作中摸索出来的。

Pool 构造方法第一次参数 size, 手册解释是 Pool 对象可容纳的 Worker 对象的最大数量,但我实际使用发现 size 并不是控制pool压入任务的数量,而是同时并发的线程数。

$pool->submit()是可以无线提交任务的,只要内存允许(参考php.ini配置),但同时执行的线程数由size控制。

6.2. 线程池的原理

我们自行实现一个类来解释Pool工作原理

<?phpclass Update extends Thread {    public $running = false;    public $row = array();    public function __construct($row) {	$this->row = $row;        $this->sql = null;    }    public function run() {	if(strlen($this->row['bankno']) > 100 ){		$bankno = safenet_decrypt($this->row['bankno']);	}else{		$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);		file_put_contents("bankno_error.log", $error, FILE_APPEND);	}	if( strlen($bankno) > 7 ){		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);		$this->sql = $sql;	}	printf("%s\n",$this->sql);    }}class Pool {	public $pool = array();	public function __construct($count) {		$this->count = $count;	}	public function push($row){		if(count($this->pool) < $this->count){			$this->pool[] = new Update($row);			return true;		}else{			return false;		}	}	public function start(){		foreach ( $this->pool as $id => $worker){			$this->pool[$id]->start();		}	}	public function join(){		foreach ( $this->pool as $id => $worker){               $this->pool[$id]->join();		}	}	public function clean(){		foreach ( $this->pool as $id => $worker){			if(! $worker->isRunning()){            	unset($this->pool[$id]);            }		}	}}try {	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',		PDO::MYSQL_ATTR_COMPRESS => true		)	);	$sql  = "select id,bankno from members order by id desc";	$row = $dbh->query($sql);	$pool = new Pool(5);	while($member = $row->fetch(PDO::FETCH_ASSOC))	{		while(true){			if($pool->push($member)){ //压入任务到池中				break;			}else{ //如果池已经满,就开始启动线程				$pool->start();				$pool->join();				$pool->clean();			}		}	}	$pool->start();    $pool->join();	$dbh = null;} catch (Exception $e) {    echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n";}?>

6.3. 动态队列线程池

上面的例子是当线程池满后执行start统一启动,下面的例子是只要线程池中有空闲便立即创建新线程。

<?phpclass Update extends Thread {    public $running = false;    public $row = array();    public function __construct($row) {	$this->row = $row;        $this->sql = null;	//print_r($this->row);    }    public function run() {	if(strlen($this->row['bankno']) > 100 ){		$bankno = safenet_decrypt($this->row['bankno']);	}else{		$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);		file_put_contents("bankno_error.log", $error, FILE_APPEND);	}	if( strlen($bankno) > 7 ){		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);		$this->sql = $sql;	}	printf("%s\n",$this->sql);    }}try {	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',		PDO::MYSQL_ATTR_COMPRESS => true		)	);	$sql     = "select id,bankno from members order by id desc limit 50";	$row = $dbh->query($sql);	$pool = array();	while($member = $row->fetch(PDO::FETCH_ASSOC))	{		$id 	= $member['id'];		while (true){			if(count($pool) < 5){				$pool[$id] = new Update($member);				$pool[$id]->start();				break;			}else{				foreach ( $pool as $name => $worker){					if(! $worker->isRunning()){						unset($pool[$name]);					}				}			}		}	}	$dbh = null;} catch (Exception $e) {    echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n";}?>

6.4. 等待线程池中的线程运行完毕

$pool->submit 是非阻塞,提交到线程池中,就会运行下面代码,有时我们希望等待线程执行完毕,收集线程的工作状况。

$mutex = Mutex::create();		$pool = new Pool ( self::MAXCONN , \ImportWorker::class, array($this->config, $mutex) );				$pool->collect(function($work){				return $work->isComplete();			});				foreach($tasks as $task){			$this->logger ( __CLASS__, sprintf("Task %s %s", $task->file, 'Processing') );			pcntl_signal_dispatch();						if(Signal::get() == SIGHUP){				Signal::reset();				break;			}						if(file_exists ($task->file)){				$handle = fopen($task->file, 'r');				$i = 0;				while (($row = fgetcsv($handle, 100000, ',')) !== false) {					$work[$i] =  new Import ( $task, $row );					$pool->submit ( $work[$i] );					$i++;					//$pool->submit ( new Import ( $task, $row ));				}						fclose($handle);								$waiting = true;				while($waiting){										for($i=0;$i<count($work);$i++){												if($work[$i]->isComplete()){							Counter::$completed++;						}						//printf("work %s:%s \n", count($work), Counter::$completed);						if(Counter::$completed == count($work)){							$waiting = false;							break;						}					}					sleep(1);				}				$this->completedTask($task);			}else{				$this->failedTask($task);			}			//printf("Ignore: %s\n", Counter::$ignore ) ;		}				$pool->shutdown ();		//Mutex::unlock($mutex);		Mutex::destroy($mutex);

while($waiting) 对持续运行,直到所有线程都完成后才会退出。

7. 多线程文件安全读写(文件锁)

在多线程中读写文件但进程是有区别的,读取内容比较容易时间,但写入数据就需要保证同一时刻只能有一个进程操作,虽然通过互斥锁可以解决,但从安全的角度文件必须上锁。

文件锁种类。

LOCK_SH 取得共享锁定(读取的程序)。LOCK_EX 取得独占锁定(写入的程序。LOCK_UN 释放锁定(无论共享或独占)。LOCK_NB 如果不希望 flock() 在锁定时堵塞

共享锁例子

<?php$fp = fopen("/tmp/lock.txt", "r+");if (flock($fp, LOCK_EX)) {  // 进行排它型锁定    ftruncate($fp, 0);      // truncate file    fwrite($fp, "Write something here\n");    fflush($fp);            // flush output before releasing the lock    flock($fp, LOCK_UN);    // 释放锁定} else {    echo "Couldn't get the lock!";}fclose($fp);?>

共享锁例子2

<?php$fp = fopen('/tmp/lock.txt', 'r+');/* Activate the LOCK_NB option on an LOCK_EX operation */if(!flock($fp, LOCK_EX | LOCK_NB)) {    echo 'Unable to obtain lock';    exit(-1);}/* ... */fclose($fp);?>

8. 多线程与数据连接

多线程中操作数据库总结与注意事项 pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。

8.1. Worker 与 PDO

<?phpclass Work extends Stackable {        public function __construct() {        }        public function run() {                $dbh  = $this->worker->getConnection();                $sql     = "select id,name from members order by id desc limit 50";                $row = $dbh->query($sql);                while($member = $row->fetch(PDO::FETCH_ASSOC)){                        print_r($member);                }        }}class ExampleWorker extends Worker {        public static $dbh;        public function __construct($name) {        }        /*        * The run method should just prepare the environment for the work that is coming ...        */        public function run(){                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');        }        public function getConnection(){                return self::$dbh;        }}$worker = new ExampleWorker("My Worker Thread");$work=new Work();$worker->stack($work);$worker->start();$worker->shutdown();?>

8.2. Pool 与 PDO

在线程池中链接数据库

# cat pool.php<?phpclass ExampleWorker extends Worker {	public function __construct(Logging $logger) {		$this->logger = $logger;	}	protected $logger;}/* the collectable class implements machinery for Pool::collect */class Work extends Stackable {	public function __construct($number) {		$this->number = $number;	}	public function run() {                $dbhost = 'db.example.com';               // 数据库服务器                $dbuser = 'example.com';                 // 数据库用户名                $dbpw = 'password';                               // 数据库密码                $dbname = 'example_real';		$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',                        PDO::MYSQL_ATTR_COMPRESS => true,			PDO::ATTR_PERSISTENT => true                        )                );		$sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'";		#echo $sql;		$row = $dbh->query($sql);		$mt4_trades  = $row->fetch(PDO::FETCH_ASSOC);		if($mt4_trades){			$row = null;			$sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';";			$dbh->query($sql);			#printf("%s\n",$sql);		}		$dbh = null;		printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']);	}}class Logging extends Stackable {	protected  static $dbh;	public function __construct() {		$dbhost = 'db.example.com';			// 数据库服务器	        $dbuser = 'example.com';                 // 数据库用户名        	$dbpw = 'password';                               // 数据库密码		$dbname = 'example_real';			// 数据库名		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(			PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',			PDO::MYSQL_ATTR_COMPRESS => true			)		);	}	protected function log($message, $args = []) {		$args = func_get_args();		if (($message = array_shift($args))) {			echo vsprintf("{$message}\n", $args);		}	}	protected function getConnection(){                return self::$dbh;        }}$pool = new Pool(200, \ExampleWorker::class, [new Logging()]);$dbhost = 'db.example.com';                      // 数据库服务器$dbuser = 'example.com';                 // 数据库用户名$dbpw = 'password';                               // 数据库密码$dbname = 'db_example';$dbh    = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',                        PDO::MYSQL_ATTR_COMPRESS => true                        )                );$sql = "select `order`,name from accounts where deposit_time is null order by id desc";$row = $dbh->query($sql);while($account = $row->fetch(PDO::FETCH_ASSOC)){        $pool->submit(new Work($account));}$pool->shutdown();?>

进一步改进上面程序,我们使用单例模式 $this->worker->getInstance(); 全局仅仅做一次数据库连接,线程使用共享的数据库连接

<?phpclass ExampleWorker extends Worker {	#public function __construct(Logging $logger) {	#	$this->logger = $logger;	#}	#protected $logger;	protected  static $dbh;	public function __construct() {	}	public function run(){		$dbhost = 'db.example.com';			// 数据库服务器	    $dbuser = 'example.com';        	// 数据库用户名        $dbpw = 'password';             	// 数据库密码		$dbname = 'example';				// 数据库名		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(			PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',			PDO::MYSQL_ATTR_COMPRESS => true,			PDO::ATTR_PERSISTENT => true			)		);	}	protected function getInstance(){        return self::$dbh;    }}/* the collectable class implements machinery for Pool::collect */class Work extends Stackable {	public function __construct($data) {		$this->data = $data;		#print_r($data);	}	public function run() {		#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );		try {			$dbh  = $this->worker->getInstance();			#print_r($dbh);               		$id = $this->data['id'];			$mobile = safenet_decrypt($this->data['mobile']);			#printf("%d, %s \n", $id, $mobile);			if(strlen($mobile) > 11){				$mobile = substr($mobile, -11);			}			if($mobile == 'null'){			#	$sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'";			#	printf("%s\n",$sql);			#	$dbh->query($sql);				$mobile = '';				$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";			}else{				$sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id";			}			$sth = $dbh->prepare($sql);			$sth->bindValue(':mobile', $mobile);			$sth->bindValue(':id', $id);			$sth->execute();			#echo $sth->debugDumpParams();		}		catch(PDOException $e) {			$error = sprintf("%s,%s\n", $mobile, $id );			file_put_contents("mobile_error.log", $error, FILE_APPEND);		}		#$dbh = null;		printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);		#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);	}}$pool = new Pool(100, \ExampleWorker::class, []);#foreach (range(0, 100) as $number) {#	$pool->submit(new Work($number));#}$dbhost = 'db.example.com';                     // 数据库服务器$dbuser = 'example.com';                 		// 数据库用户名$dbpw = 'password';                             // 数据库密码$dbname = 'example';$dbh    = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',                        PDO::MYSQL_ATTR_COMPRESS => true                        )                );#print_r($dbh);#$sql = "select id, mobile from members where id < :id";#$sth = $dbh->prepare($sql);#$sth->bindValue(':id',300);#$sth->execute();#$result = $sth->fetchAll();#print_r($result);##$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";#$sth = $dbh->prepare($sql);#$sth->bindValue(':mobile', 'aa');#$sth->bindValue(':id','272');#echo $sth->execute();#echo $sth->queryString;#echo $sth->debugDumpParams();$sql = "select id, mobile from members order by id asc"; // limit 1000";$row = $dbh->query($sql);while($members = $row->fetch(PDO::FETCH_ASSOC)){        #$order =  $account['order'];        #printf("%s\n",$order);        //print_r($members);        $pool->submit(new Work($members));		#unset($account['order']);}$pool->shutdown();?>

8.3. 数据库持久连接

总的来说 pthreads 仍然处在发展中,仍有一些不足的地方,我们也可以看到pthreads的git在不断改进这个项目

数据库持久链接很重要,否则每个线程都会开启一次数据库连接,然后关闭,会导致很多链接超时。

<?php$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(    PDO::ATTR_PERSISTENT => true));?>

但有些场景数据库持久链接适得其反,所以根据你的场景选择链接方式

<?php$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(    PDO::ATTR_PERSISTENT => false));?>

由于现成持续链接数据,有时可能因为数据库或者网络原因导致数据无法连接,程序抛出异常或终止,所以使用单例并不保险。

protected function getInstance(){	return self::$dbh;}

为单例增加重新连接功能

class SenderWorker extends Worker {	protected $config;	protected static $dbh;	protected static $amqp;		public function __construct($config) {		$this->config = $config;		$this->logger = new Logger();	}	public function run() {	}	private function connect(){		try {			$dbhost = $this->config['database']['host'];			$dbport = $this->config['database']['port'];			$dbuser = $this->config['database']['user'];			$dbpass = $this->config['database']['password'];			$dbname = $this->config['database']['dbname'];			self::$dbh = new PDO ( "mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array (					PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',					PDO::MYSQL_ATTR_COMPRESS => true					/*PDO::ATTR_PERSISTENT => true*/			) );			self::$dbh->setAttribute ( PDO::ATTR_ERRMODE, PDO::ERRMODE_WARNING );		} catch ( PDOException $e ) {			$this->logger ( 'Exception worker', $e->getMessage( ) );		} catch ( Exception $e ) {			$this->logger ( 'Exception worker', $e->getMessage( ) );		}	}	protected function getInstance() {		if(!self::$dbh) {			$this->connect();			$this->logger ( 'Database', sprintf("Connect database %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) );		}else{			$this->logger ( 'Database', sprintf("Get instance database %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) );		}				if(self::$dbh){			return self::$dbh;		}else{			$this->logger ( 'Database', sprintf("Connect database is error %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) );			$this->logger ( 'Error', sprintf("Worker is shutdown %s", $this->getThreadId ()) );			$this->shutdown();		}	}		public function logger($type, $message) {		$this->logger->logger($type, $message);	}		public function getAmqpInstance(){		if(!self::$amqp){			self::$amqp = new AMQPConnection(array(				'host' 	=> $this->config['amqp']['host'], 				'port' 	=> $this->config['amqp']['port'], 				'vhost'	=> $this->config['amqp']['vhost'], 				'login' => $this->config['amqp']['login'], 				'password' => $this->config['amqp']['password']			));			$this->logger ( 'AMQP', sprintf("Connect amqp %s, %s", $this->config['amqp']['host'], $this->getThreadId ()) );		}else{			$this->logger ( 'AMQP', sprintf("Get instance amqp %s, %s", $this->config['amqp']['host'], $this->getThreadId ()) );		}			return self::$amqp;	}}

每次调用 getInstance() 会判断当前数据库是否已经链接,如果链接丢失,将重新链接数据库。

8.4. 涉及数据库更新

多线程编程中对数据库更新操作需要注意的是,有些场景,你需要控制同一时刻只能有一个线程对数据库做Update, Delete, Insert,否则数据容易出错。

例如下面的操作,你会发现程序运行完成后数据字段没有任何变化。这是因为线程间相互覆盖对方之前更新的数据。

$sql = "update import set succeed = succeed+1 where status = :status and id = :id";

解决方法有两种,一种是外部实现排他锁,一种是在数据库内部实现,通过事物处理,解决线程资源争夺,相互覆盖的问题。

private function updateSucceed($task){		$dbh = $this->worker->getInstance();		$dbh->beginTransaction();		$sql = "update import set succeed = succeed+1 where status = :status and id = :id";		$sth = $dbh->prepare ( $sql );		$sth->bindValue ( ':id', $task->id );		$sth->bindValue ( ':status', 'Processing' );		$status = $sth->execute ();		$dbh->commit();		return $status;	}

9. Thread And ZeroMQ

应用场景,我使用触发器监控数据库某个表,一旦发现有改变就通知程序处理数据

9.1. 数据库端

首先安装ZeroMQ 与 ZeroMQ for MySQL UDF 然后创建触发器。 https://github.com/netkiller/mysql-zmq-plugin

CREATE DEFINER=`dba`@`192.168.%` PROCEDURE `Table_Example`(IN `TICKET` INT, IN `LOGIN` INT, IN `CMD` INT, IN `VOLUME` INT)	LANGUAGE SQL	NOT DETERMINISTIC	READS SQL DATA	SQL SECURITY DEFINER	COMMENT '交易监控'BEGIN	DECLARE Example CHAR(1) DEFAULT 'N';	IF CMD IN ('0','1') THEN		IF VOLUME >=10 AND VOLUME <=90 THEN			select coding into Example from example.members where username = LOGIN and coding = 'Y';			IF Example = 'Y' THEN				select zmq_client('tcp://192.168.2.15:5555', CONCAT(TICKET, ',', LOGIN, ',', VOLUME));			END IF;		END IF;	END IF;ENDCREATE DEFINER=`dba`@`192.168.6.20` TRIGGER `Table_AFTER_INSERT` AFTER INSERT ON `MT4_TRADES` FOR EACH ROW BEGIN	call Table_Example(NEW.TICKET,NEW.LOGIN,NEW.CMD,NEW.VOLUME);END

9.2. 数据处理端

<?phpclass ExampleWorker extends Worker {	#public function __construct(Logging $logger) {	#	$this->logger = $logger;	#}	#protected $logger;	protected  static $dbh;	public function __construct() {	}	public function run(){		$dbhost = '192.168.2.1';			// 数据库服务器		$dbport = 3306;	    $dbuser = 'www';        			// 数据库用户名        $dbpass = 'password';             	// 数据库密码		$dbname = 'example';				// 数据库名		self::$dbh  = new PDO("mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array(			/* PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', */			PDO::MYSQL_ATTR_COMPRESS => true,			PDO::ATTR_PERSISTENT => true			)		);	}	protected function getInstance(){        return self::$dbh;    }}/* the collectable class implements machinery for Pool::collect */class Fee extends Stackable {	public function __construct($msg) {		$trades = explode(",", $msg);		$this->data = $trades;		print_r($trades);	}	public function run() {		#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );		try {			$dbh  = $this->worker->getInstance();			$insert = "INSERT INTO coding_fee(ticket, login, volume, `status`) VALUES(:ticket, :login, :volume,'N')";			$sth = $dbh->prepare($insert);			$sth->bindValue(':ticket', $this->data[0]);			$sth->bindValue(':login', $this->data[1]);			$sth->bindValue(':volume', $this->data[2]);			$sth->execute();			//$sth = null;			//$dbh = null;			/* 业务实现在此处 */			$update = "UPDATE coding_fee SET `status` = 'Y' WHERE ticket = :ticket and `status` = 'N'";			$sth = $dbh->prepare($update);			$sth->bindValue(':ticket', $this->data[0]);			$sth->execute();			//echo $sth->queryString;		}		catch(PDOException $e) {			$error = sprintf("%s,%s\n", $mobile, $id );			file_put_contents("mobile_error.log", $error, FILE_APPEND);		}		#$dbh = null;		//printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);		#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);	}}class Example {	/* config */	const LISTEN = "tcp://192.168.2.15:5555";	const MAXCONN = 100;	const pidfile = __CLASS__;	const uid	= 80;	const gid	= 80;	protected $pool = NULL;	protected $zmq = NULL;	public function __construct() {		$this->pidfile = '/var/run/'.self::pidfile.'.pid';	}	private function daemon(){		if (file_exists($this->pidfile)) {			echo "The file $this->pidfile exists.\n";			exit();		}		$pid = pcntl_fork();		if ($pid == -1) {			 die('could not fork');		} else if ($pid) {			 // we are the parent			 //pcntl_wait($status); //Protect against Zombie children			exit($pid);		} else {			// we are the child			file_put_contents($this->pidfile, getmypid());			posix_setuid(self::uid);			posix_setgid(self::gid);			return(getmypid());		}	}	private function start(){		$pid = $this->daemon();		$this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []);		$this->zmq = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);		$this->zmq->bind(self::LISTEN);		/* Loop receiving and echoing back */		while ($message = $this->zmq->recv()) {			if($message){					$this->pool->submit(new Fee($message));					$this->zmq->send('TRUE');			}else{					$this->zmq->send('FALSE');			}		}		$pool->shutdown();	}	private function stop(){		if (file_exists($this->pidfile)) {			$pid = file_get_contents($this->pidfile);			posix_kill($pid, 9);			unlink($this->pidfile);		}	}	private function help($proc){		printf("%s start | stop | help \n", $proc);	}	public function main($argv){		if(count($argv) < 2){			printf("please input help parameter\n");			exit();		}		if($argv[1] === 'stop'){			$this->stop();		}else if($argv[1] === 'start'){			$this->start();		}else{			$this->help($argv[0]);		}	}}$example = new Example();$example->main($argv);

使用方法

# php example.php start# php example.php stop# php example.php help

此程序涉及守候进程实现$this->daemon()运行后转到后台运行,进程ID保存,进程的互斥(不允许同时启动两个进程),线程池连接数以及线程任务等等

10. 延伸阅读

PHP高级编程之消息队列

PHP高级编程之守护进程

성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
PHP vs. Python : 핵심 기능 및 기능PHP vs. Python : 핵심 기능 및 기능Apr 13, 2025 am 12:16 AM

PHP와 Python은 각각 고유 한 장점이 있으며 다양한 시나리오에 적합합니다. 1.PHP는 웹 개발에 적합하며 내장 웹 서버 및 풍부한 기능 라이브러리를 제공합니다. 2. Python은 간결한 구문과 강력한 표준 라이브러리가있는 데이터 과학 및 기계 학습에 적합합니다. 선택할 때 프로젝트 요구 사항에 따라 결정해야합니다.

PHP : 웹 개발의 핵심 언어PHP : 웹 개발의 핵심 언어Apr 13, 2025 am 12:08 AM

PHP는 서버 측에서 널리 사용되는 스크립팅 언어이며 특히 웹 개발에 적합합니다. 1.PHP는 HTML을 포함하고 HTTP 요청 및 응답을 처리 할 수 ​​있으며 다양한 데이터베이스를 지원할 수 있습니다. 2.PHP는 강력한 커뮤니티 지원 및 오픈 소스 리소스를 통해 동적 웹 컨텐츠, 프로세스 양식 데이터, 액세스 데이터베이스 등을 생성하는 데 사용됩니다. 3. PHP는 해석 된 언어이며, 실행 프로세스에는 어휘 분석, 문법 분석, 편집 및 실행이 포함됩니다. 4. PHP는 사용자 등록 시스템과 같은 고급 응용 프로그램을 위해 MySQL과 결합 할 수 있습니다. 5. PHP를 디버깅 할 때 error_reporting () 및 var_dump ()와 같은 함수를 사용할 수 있습니다. 6. 캐싱 메커니즘을 사용하여 PHP 코드를 최적화하고 데이터베이스 쿼리를 최적화하며 내장 기능을 사용하십시오. 7

PHP : 많은 웹 사이트의 기초PHP : 많은 웹 사이트의 기초Apr 13, 2025 am 12:07 AM

PHP가 많은 웹 사이트에서 선호되는 기술 스택 인 이유에는 사용 편의성, 강력한 커뮤니티 지원 및 광범위한 사용이 포함됩니다. 1) 배우고 사용하기 쉽고 초보자에게 적합합니다. 2) 거대한 개발자 커뮤니티와 풍부한 자원이 있습니다. 3) WordPress, Drupal 및 기타 플랫폼에서 널리 사용됩니다. 4) 웹 서버와 밀접하게 통합하여 개발 배포를 단순화합니다.

과대 광고 : 오늘 PHP의 역할을 평가합니다과대 광고 : 오늘 PHP의 역할을 평가합니다Apr 12, 2025 am 12:17 AM

PHP는 현대적인 프로그래밍, 특히 웹 개발 분야에서 강력하고 널리 사용되는 도구로 남아 있습니다. 1) PHP는 사용하기 쉽고 데이터베이스와 완벽하게 통합되며 많은 개발자에게 가장 먼저 선택됩니다. 2) 동적 컨텐츠 생성 및 객체 지향 프로그래밍을 지원하여 웹 사이트를 신속하게 작성하고 유지 관리하는 데 적합합니다. 3) 데이터베이스 쿼리를 캐싱하고 최적화함으로써 PHP의 성능을 향상시킬 수 있으며, 광범위한 커뮤니티와 풍부한 생태계는 오늘날의 기술 스택에 여전히 중요합니다.

PHP의 약한 참고 자료는 무엇이며 언제 유용합니까?PHP의 약한 참고 자료는 무엇이며 언제 유용합니까?Apr 12, 2025 am 12:13 AM

PHP에서는 약한 참조가 약한 회의 클래스를 통해 구현되며 쓰레기 수집가가 물체를 되 찾는 것을 방해하지 않습니다. 약한 참조는 캐싱 시스템 및 이벤트 리스너와 같은 시나리오에 적합합니다. 물체의 생존을 보장 할 수 없으며 쓰레기 수집이 지연 될 수 있음에 주목해야합니다.

PHP의 __invoke 마법 방법을 설명하십시오.PHP의 __invoke 마법 방법을 설명하십시오.Apr 12, 2025 am 12:07 AM

\ _ \ _ 호출 메소드를 사용하면 객체를 함수처럼 호출 할 수 있습니다. 1. 객체를 호출 할 수 있도록 메소드를 호출하는 \ _ \ _ 정의하십시오. 2. $ obj (...) 구문을 사용할 때 PHP는 \ _ \ _ invoke 메소드를 실행합니다. 3. 로깅 및 계산기, 코드 유연성 및 가독성 향상과 같은 시나리오에 적합합니다.

동시성에 대해 PHP 8.1의 섬유를 설명하십시오.동시성에 대해 PHP 8.1의 섬유를 설명하십시오.Apr 12, 2025 am 12:05 AM

섬유는 PHP8.1에 도입되어 동시 처리 기능을 향상시켰다. 1) 섬유는 코 루틴과 유사한 가벼운 동시성 모델입니다. 2) 개발자는 작업의 실행 흐름을 수동으로 제어 할 수 있으며 I/O 집약적 작업을 처리하는 데 적합합니다. 3) 섬유를 사용하면보다 효율적이고 반응이 좋은 코드를 작성할 수 있습니다.

PHP 커뮤니티 : 자원, 지원 및 개발PHP 커뮤니티 : 자원, 지원 및 개발Apr 12, 2025 am 12:04 AM

PHP 커뮤니티는 개발자 성장을 돕기 위해 풍부한 자원과 지원을 제공합니다. 1) 자료에는 공식 문서, 튜토리얼, 블로그 및 Laravel 및 Symfony와 같은 오픈 소스 프로젝트가 포함됩니다. 2) 지원은 StackoverFlow, Reddit 및 Slack 채널을 통해 얻을 수 있습니다. 3) RFC에 따라 개발 동향을 배울 수 있습니다. 4) 적극적인 참여, 코드에 대한 기여 및 학습 공유를 통해 커뮤니티에 통합 될 수 있습니다.

See all articles

핫 AI 도구

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Clothoff.io

Clothoff.io

AI 옷 제거제

AI Hentai Generator

AI Hentai Generator

AI Hentai를 무료로 생성하십시오.

인기 기사

R.E.P.O. 에너지 결정과 그들이하는 일 (노란색 크리스탈)
3 몇 주 전By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 최고의 그래픽 설정
3 몇 주 전By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 아무도들을 수없는 경우 오디오를 수정하는 방법
3 몇 주 전By尊渡假赌尊渡假赌尊渡假赌
WWE 2K25 : Myrise에서 모든 것을 잠금 해제하는 방법
4 몇 주 전By尊渡假赌尊渡假赌尊渡假赌

뜨거운 도구

MinGW - Windows용 미니멀리스트 GNU

MinGW - Windows용 미니멀리스트 GNU

이 프로젝트는 osdn.net/projects/mingw로 마이그레이션되는 중입니다. 계속해서 그곳에서 우리를 팔로우할 수 있습니다. MinGW: GCC(GNU Compiler Collection)의 기본 Windows 포트로, 기본 Windows 애플리케이션을 구축하기 위한 무료 배포 가능 가져오기 라이브러리 및 헤더 파일로 C99 기능을 지원하는 MSVC 런타임에 대한 확장이 포함되어 있습니다. 모든 MinGW 소프트웨어는 64비트 Windows 플랫폼에서 실행될 수 있습니다.

WebStorm Mac 버전

WebStorm Mac 버전

유용한 JavaScript 개발 도구

SecList

SecList

SecLists는 최고의 보안 테스터의 동반자입니다. 보안 평가 시 자주 사용되는 다양한 유형의 목록을 한 곳에 모아 놓은 것입니다. SecLists는 보안 테스터에게 필요할 수 있는 모든 목록을 편리하게 제공하여 보안 테스트를 더욱 효율적이고 생산적으로 만드는 데 도움이 됩니다. 목록 유형에는 사용자 이름, 비밀번호, URL, 퍼징 페이로드, 민감한 데이터 패턴, 웹 셸 등이 포함됩니다. 테스터는 이 저장소를 새로운 테스트 시스템으로 간단히 가져올 수 있으며 필요한 모든 유형의 목록에 액세스할 수 있습니다.

Dreamweaver Mac版

Dreamweaver Mac版

시각적 웹 개발 도구

안전한 시험 브라우저

안전한 시험 브라우저

안전한 시험 브라우저는 온라인 시험을 안전하게 치르기 위한 보안 브라우저 환경입니다. 이 소프트웨어는 모든 컴퓨터를 안전한 워크스테이션으로 바꿔줍니다. 이는 모든 유틸리티에 대한 액세스를 제어하고 학생들이 승인되지 않은 리소스를 사용하는 것을 방지합니다.