検索

PHP 高度なプログラミング マルチスレッド

http://netkiller.github.io/journal/php.thread.html

Neo Chen (陈京峰) 氏、netkiller、

中国広東省深セン市龍華新区民志街西山美地
518131
+86 13113668890
+86 755 29812080

著作権声明

転載の際は、必ず記事の出典、著者情報、本声明を明記の上、著者にご連絡ください。

ドキュメントソース: http://netkiller.github.io http://netkiller.sourceforge.net
WeChat で QR コードをスキャンして Netkiller WeChat サブスクリプション アカウントに入力します

QQ グループ: 128659835 「読者」と明記してください

2015-10-26

概要

2014-03-12 初版

2014-05-15 第 2 版

2014-06-13 第 3 版

2014-07-24 第 4 版

2015-10-26 第 5 版

私の一連のドキュメント

Netkiller アーキテクト マニュアル Netkiller 開発者マニュアル Netkiller PHP マニュアル Netkiller Python マニュアル Netkiller 暗号化ハンドブック Netkiller シェル マニュアル Netkiller メール アプリ Netkiller Docbook マニュアル Netkillerプロジェクト マニュアル Netkiller データベース マニュアル Netkiller PostgreSQL マニュアル Netkiller MySQL マニュアル Netkiller NoSQL マニュアル Netkiller LDAP マニュアル Netkiller ネットワーク マニュアル Netkiller Cisco IOS マニュアル Netkiller H3C マニュアル Netkillerマルチメディアマニュアル Netkiller Perl マニュアル Netkiller アマチュア無線マニュアル Netkiller DevOps マニュアル
Netkiller テスト ハンドブック
Netkiller Linux ハンドブック Netkiller Debian マニュアル Netkiller CentOS マニュアル Netkiller FreeBSD マニュアル
Netkiller セキュリティ マニュアル Netkiller Web マニュアル キラー監視アプリ Netkiller ストレージ アプリ

iBook を使用して現在のドキュメントを読むことができます

ディレクトリ

  • 1. マルチスレッド環境のインストール
    • 1.1. PHP 5.5.9
    • 1.2. pthreads 拡張機能をインストールします
  • 2. スレッド
  • 3 . ワーカーとスタッカブル
  • 4. ミューテックスロック
    • 4.1. マルチスレッドと共有メモリ
  • 5. スレッド同期
  • 6. スレッドプール
    • 6.1.プールクラス
    • 6.2の原則スレッド プール
    • 6.3. 動的キュー スレッド プール
    • 6.4. スレッド プール内のスレッドの実行が完了するのを待機します
  • 7. マルチスレッドのファイルの安全な読み取りと書き込み (ファイル ロック)
  • 8. マルチスレッド8.1. ワーカーと PDO
    • 8.2. プールと PDO
    • 8.3. データベースの更新を伴う
    • 9. スレッドと ZeroMQ
    • 9.1. データベース側
    • 9.2.データ処理側
  • 10. 拡張読み込み
    • 1. マルチスレッド環境のインストール
  • 1.1. PHP 5.5.9

PHP 5.5.9 をインストール

https://github.com /oscm/shell/blob/master/php /5.5.9.sh
./configure --prefix=/srv/php-5.5.9 \--with-config-file-path=/srv/php-5.5.9/etc \--with-config-file-scan-dir=/srv/php-5.5.9/etc/conf.d \--enable-fpm \--with-fpm-user=www \--with-fpm-group=www \--with-pear \--with-curl \--with-gd \--with-jpeg-dir \--with-png-dir \--with-freetype-dir \--with-zlib-dir \--with-iconv \--with-mcrypt \--with-mhash \--with-pdo-mysql \--with-mysql-sock=/var/lib/mysql/mysql.sock \--with-openssl \--with-xsl \--with-recode \--enable-sockets \--enable-soap \--enable-mbstring \--enable-gd-native-ttf \--enable-zip \--enable-xml \--enable-bcmath \--enable-calendar \--enable-shmop \--enable-dba \--enable-wddx \--enable-sysvsem \--enable-sysvshm \--enable-sysvmsg \--enable-opcache \--enable-pcntl \--enable-maintainer-zts \--disable-debug

コンパイルでは zts サポートを有効にする必要があります。そうしないと、pthreads (--enable-maintainer-zts) をインストールできません

1.2. pthreads 拡張機能をインストールします

https://github.com/oscm/shell /blob/master/php/pecl/pthreads.sh をインストールします

# curl -s https://raw.github.com/oscm/shell/master/php/pecl/pthreads.sh | bash

pthreads がインストールされているか確認します

# php -m | grep pthreads

2. スレッド

<?phpclass HelloWorld extends Thread {    public function __construct($world) {       $this->world = $world;    }    public function run() {        print_r(sprintf("Hello %s\n", $this->world));    }}$thread = new HelloWorld("World");if ($thread->start()) {    printf("Thread #%lu says: %s\n", $thread->getThreadId(), $thread->join());}?>

3. ワーカーとスタッカブル

<?phpclass SQLQuery extends Stackable {        public function __construct($sql) {                $this->sql = $sql;        }        public function run() {                $dbh  = $this->worker->getConnection();                $row = $dbh->query($this->sql);                while($member = $row->fetch(PDO::FETCH_ASSOC)){                        print_r($member);                }        }}class ExampleWorker extends Worker {        public static $dbh;        public function __construct($name) {        }        /*        * The run method should just prepare the environment for the work that is coming ...        */        public function run(){                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');        }        public function getConnection(){                return self::$dbh;        }}$worker = new ExampleWorker("My Worker Thread");$work=new SQLQuery('select * from members order by id desc limit 5');$worker->stack($work);$table1 = new SQLQuery('select * from demousers limit 2');$worker->stack($table1);$worker->start();$worker->shutdown();?>

4. ミューテックス ロック

ミューテックス ロックはどのような状況で使用されますか?複数のスレッドを制御する必要があり、同時に動作できるスレッドが 1 つだけである場合に使用できます。

以下に、ミューテックスロックの有無による違いを説明する簡単なカウンタープログラムの例を見てみましょう。
<?php$counter = 0;//$handle=fopen("php://memory", "rw");//$handle=fopen("php://temp", "rw");$handle=fopen("/tmp/counter.txt", "w");fwrite($handle, $counter );fclose($handle);class CounterThread extends Thread {	public function __construct($mutex = null){		$this->mutex = $mutex;        $this->handle = fopen("/tmp/counter.txt", "w+");    }	public function __destruct(){		fclose($this->handle);	}    public function run() {		if($this->mutex)			$locked=Mutex::lock($this->mutex);		$counter = intval(fgets($this->handle));		$counter++;		rewind($this->handle);		fputs($this->handle, $counter );		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);		if($this->mutex)			Mutex::unlock($this->mutex);    }}//没有互斥锁for ($i=0;$i<50;$i++){	$threads[$i] = new CounterThread();	$threads[$i]->start();}//加入互斥锁$mutex = Mutex::create(true);for ($i=0;$i<50;$i++){	$threads[$i] = new CounterThread($mutex);	$threads[$i]->start();}Mutex::unlock($mutex);for ($i=0;$i<50;$i++){	$threads[$i]->join();}Mutex::destroy($mutex);?>

ファイル /tmp/counter.txt を使用してカウンター値を保存し、ファイルを開くたびに値を 1 つ増やしてファイルに書き込みます。複数のスレッドが同時にファイルを操作すると、連続して実行するスレッドで取得する値が異なり、書き戻される値も異なり、最終的なカウンタ値が混乱してしまいます。

ロックを追加しない場合、カウントは常に上書きされ、最終結果は 2 になります

ミューテックス ロックを追加した後、プロセスの 1 つだけが追加作業を完了してロックを解放し、他のスレッドはロック解除信号を取得し、最終的にカウンタを正常に完了します。 累積操作

上記の例は、ファイルをロックすることによっても実装できます。ここでの主な焦点はマルチスレッド ロックであり、ファイル ロックについては後で説明します。

4.1. マルチスレッドと共有メモリ

共有メモリの例では、ロックは使用されていないため、作業メモリの操作自体がロックの機能を持っている可能性があります。

<?php$tmp = tempnam(__FILE__, 'PHP');$key = ftok($tmp, 'a');$shmid = shm_attach($key);$counter = 0;shm_put_var( $shmid, 1, $counter );class CounterThread extends Thread {	public function __construct($shmid){        $this->shmid = $shmid;    }    public function run() {		$counter = shm_get_var( $this->shmid, 1 );		$counter++;		shm_put_var( $this->shmid, 1, $counter );		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);    }}for ($i=0;$i<100;$i++){	$threads[] = new CounterThread($shmid);}for ($i=0;$i<100;$i++){	$threads[$i]->start();}for ($i=0;$i<100;$i++){	$threads[$i]->join();}shm_remove( $shmid );shm_detach( $shmid );?>

5. スレッドの同期

シナリオによっては、thread->start() でプログラムの実行を開始したくないが、スレッドがコマンドを待機するようにしたい場合があります。

$thread->wait(); のテスト関数は、スレッドが thread->start() の直後に実行されるのではなく、$thread->notify(); からのシグナルを受信した後にのみ実行されます。

<?php$tmp = tempnam(__FILE__, 'PHP');$key = ftok($tmp, 'a');$shmid = shm_attach($key);$counter = 0;shm_put_var( $shmid, 1, $counter );class CounterThread extends Thread {	public function __construct($shmid){        $this->shmid = $shmid;    }    public function run() {        $this->synchronized(function($thread){            $thread->wait();        }, $this);		$counter = shm_get_var( $this->shmid, 1 );		$counter++;		shm_put_var( $this->shmid, 1, $counter );		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);    }}for ($i=0;$i<100;$i++){	$threads[] = new CounterThread($shmid);}for ($i=0;$i<100;$i++){	$threads[$i]->start();}for ($i=0;$i<100;$i++){	$threads[$i]->synchronized(function($thread){		$thread->notify();	}, $threads[$i]);}for ($i=0;$i<100;$i++){	$threads[$i]->join();}shm_remove( $shmid );shm_detach( $shmid );?>

6. スレッド プール

6.1. pthreads プール クラス

pthreads が提供するプール クラスのサンプル

<?phpclass WebWorker extends Worker {	public function __construct(SafeLog $logger) {		$this->logger = $logger;	}	protected $loger;}class WebWork extends Stackable {	public function isComplete() {		return $this->complete;	}	public function run() {		$this->worker			->logger			->log("%s executing in Thread #%lu",				  __CLASS__, $this->worker->getThreadId());		$this->complete = true;	}	protected $complete;}class SafeLog extends Stackable {	protected function log($message, $args = []) {		$args = func_get_args();		if (($message = array_shift($args))) {			echo vsprintf(				"{$message}\n", $args);		}	}}$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);$pool->submit($w=new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->shutdown();$pool->collect(function($work){	return $work->isComplete();});var_dump($pool);

ここで、スレッド プールについて詳しく説明します。著者の仕事から経験が得られます。

Pool コンストラクター メソッドの最初のパラメーターのサイズ、マニュアルの説明では、Pool オブジェクトが収容できる Worker オブジェクトの最大数ですが、実際に使用すると、サイズはプールにプッシュされるタスクの数を制御しないことがわかりましたが、同時スレッドの数。

$pool->submit() はメモリが許す限りワイヤレスでタスクを送信できますが (php.ini 設定を参照)、同時に実行されるスレッドの数はサイズによって制御されます。

6.2. スレッドプールの原理

プールの動作原理を説明するために独自にクラスを実装します

<?phpclass Update extends Thread {    public $running = false;    public $row = array();    public function __construct($row) {	$this->row = $row;        $this->sql = null;    }    public function run() {	if(strlen($this->row['bankno']) > 100 ){		$bankno = safenet_decrypt($this->row['bankno']);	}else{		$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);		file_put_contents("bankno_error.log", $error, FILE_APPEND);	}	if( strlen($bankno) > 7 ){		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);		$this->sql = $sql;	}	printf("%s\n",$this->sql);    }}class Pool {	public $pool = array();	public function __construct($count) {		$this->count = $count;	}	public function push($row){		if(count($this->pool) < $this->count){			$this->pool[] = new Update($row);			return true;		}else{			return false;		}	}	public function start(){		foreach ( $this->pool as $id => $worker){			$this->pool[$id]->start();		}	}	public function join(){		foreach ( $this->pool as $id => $worker){               $this->pool[$id]->join();		}	}	public function clean(){		foreach ( $this->pool as $id => $worker){			if(! $worker->isRunning()){            	unset($this->pool[$id]);            }		}	}}try {	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',		PDO::MYSQL_ATTR_COMPRESS => true		)	);	$sql  = "select id,bankno from members order by id desc";	$row = $dbh->query($sql);	$pool = new Pool(5);	while($member = $row->fetch(PDO::FETCH_ASSOC))	{		while(true){			if($pool->push($member)){ //压入任务到池中				break;			}else{ //如果池已经满,就开始启动线程				$pool->start();				$pool->join();				$pool->clean();			}		}	}	$pool->start();    $pool->join();	$dbh = null;} catch (Exception $e) {    echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n";}?>

6.3. 動的キュースレッドプール

上記の例は、スレッドプールの起動時に start を実行するものですがいっぱいで、以下はこの例です。スレッド プールに空きができるとすぐに新しいスレッドを作成します。
<?phpclass Update extends Thread {    public $running = false;    public $row = array();    public function __construct($row) {	$this->row = $row;        $this->sql = null;	//print_r($this->row);    }    public function run() {	if(strlen($this->row['bankno']) > 100 ){		$bankno = safenet_decrypt($this->row['bankno']);	}else{		$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);		file_put_contents("bankno_error.log", $error, FILE_APPEND);	}	if( strlen($bankno) > 7 ){		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);		$this->sql = $sql;	}	printf("%s\n",$this->sql);    }}try {	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',		PDO::MYSQL_ATTR_COMPRESS => true		)	);	$sql     = "select id,bankno from members order by id desc limit 50";	$row = $dbh->query($sql);	$pool = array();	while($member = $row->fetch(PDO::FETCH_ASSOC))	{		$id 	= $member['id'];		while (true){			if(count($pool) < 5){				$pool[$id] = new Update($member);				$pool[$id]->start();				break;			}else{				foreach ( $pool as $name => $worker){					if(! $worker->isRunning()){						unset($pool[$name]);					}				}			}		}	}	$dbh = null;} catch (Exception $e) {    echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n";}?>

6.4. スレッド プール内のスレッドの実行が完了するのを待機しています

$pool->submit は、スレッド プールに送信されると、次のコードが実行されるのを待ちたい場合があります。スレッドが実行を完了し、スレッドの作業状況を収集します。
$mutex = Mutex::create();		$pool = new Pool ( self::MAXCONN , \ImportWorker::class, array($this->config, $mutex) );				$pool->collect(function($work){				return $work->isComplete();			});				foreach($tasks as $task){			$this->logger ( __CLASS__, sprintf("Task %s %s", $task->file, 'Processing') );			pcntl_signal_dispatch();						if(Signal::get() == SIGHUP){				Signal::reset();				break;			}						if(file_exists ($task->file)){				$handle = fopen($task->file, 'r');				$i = 0;				while (($row = fgetcsv($handle, 100000, ',')) !== false) {					$work[$i] =  new Import ( $task, $row );					$pool->submit ( $work[$i] );					$i++;					//$pool->submit ( new Import ( $task, $row ));				}						fclose($handle);								$waiting = true;				while($waiting){										for($i=0;$i<count($work);$i++){												if($work[$i]->isComplete()){							Counter::$completed++;						}						//printf("work %s:%s \n", count($work), Counter::$completed);						if(Counter::$completed == count($work)){							$waiting = false;							break;						}					}					sleep(1);				}				$this->completedTask($task);			}else{				$this->failedTask($task);			}			//printf("Ignore: %s\n", Counter::$ignore ) ;		}				$pool->shutdown ();		//Mutex::unlock($mutex);		Mutex::destroy($mutex);

while($waiting) ペアは実行を継続し、すべてのスレッドが完了するまで終了しません。

7. マルチスレッドのファイルの安全な読み取りと書き込み (ファイル ロック)

在多线程中读写文件但进程是有区别的,读取内容比较容易时间,但写入数据就需要保证同一时刻只能有一个进程操作,虽然通过互斥锁可以解决,但从安全的角度文件必须上锁。

文件锁种类。

LOCK_SH 取得共享锁定(读取的程序)。LOCK_EX 取得独占锁定(写入的程序。LOCK_UN 释放锁定(无论共享或独占)。LOCK_NB 如果不希望 flock() 在锁定时堵塞

共享锁例子

<?php$fp = fopen("/tmp/lock.txt", "r+");if (flock($fp, LOCK_EX)) {  // 进行排它型锁定    ftruncate($fp, 0);      // truncate file    fwrite($fp, "Write something here\n");    fflush($fp);            // flush output before releasing the lock    flock($fp, LOCK_UN);    // 释放锁定} else {    echo "Couldn't get the lock!";}fclose($fp);?>

共享锁例子2

<?php$fp = fopen('/tmp/lock.txt', 'r+');/* Activate the LOCK_NB option on an LOCK_EX operation */if(!flock($fp, LOCK_EX | LOCK_NB)) {    echo 'Unable to obtain lock';    exit(-1);}/* ... */fclose($fp);?>

8. 多线程与数据连接

多线程中操作数据库总结与注意事项 pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。

8.1. Worker 与 PDO

<?phpclass Work extends Stackable {        public function __construct() {        }        public function run() {                $dbh  = $this->worker->getConnection();                $sql     = "select id,name from members order by id desc limit 50";                $row = $dbh->query($sql);                while($member = $row->fetch(PDO::FETCH_ASSOC)){                        print_r($member);                }        }}class ExampleWorker extends Worker {        public static $dbh;        public function __construct($name) {        }        /*        * The run method should just prepare the environment for the work that is coming ...        */        public function run(){                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');        }        public function getConnection(){                return self::$dbh;        }}$worker = new ExampleWorker("My Worker Thread");$work=new Work();$worker->stack($work);$worker->start();$worker->shutdown();?>

8.2. Pool 与 PDO

在线程池中链接数据库

# cat pool.php<?phpclass ExampleWorker extends Worker {	public function __construct(Logging $logger) {		$this->logger = $logger;	}	protected $logger;}/* the collectable class implements machinery for Pool::collect */class Work extends Stackable {	public function __construct($number) {		$this->number = $number;	}	public function run() {                $dbhost = 'db.example.com';               // 数据库服务器                $dbuser = 'example.com';                 // 数据库用户名                $dbpw = 'password';                               // 数据库密码                $dbname = 'example_real';		$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',                        PDO::MYSQL_ATTR_COMPRESS => true,			PDO::ATTR_PERSISTENT => true                        )                );		$sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'";		#echo $sql;		$row = $dbh->query($sql);		$mt4_trades  = $row->fetch(PDO::FETCH_ASSOC);		if($mt4_trades){			$row = null;			$sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';";			$dbh->query($sql);			#printf("%s\n",$sql);		}		$dbh = null;		printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']);	}}class Logging extends Stackable {	protected  static $dbh;	public function __construct() {		$dbhost = 'db.example.com';			// 数据库服务器	        $dbuser = 'example.com';                 // 数据库用户名        	$dbpw = 'password';                               // 数据库密码		$dbname = 'example_real';			// 数据库名		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(			PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',			PDO::MYSQL_ATTR_COMPRESS => true			)		);	}	protected function log($message, $args = []) {		$args = func_get_args();		if (($message = array_shift($args))) {			echo vsprintf("{$message}\n", $args);		}	}	protected function getConnection(){                return self::$dbh;        }}$pool = new Pool(200, \ExampleWorker::class, [new Logging()]);$dbhost = 'db.example.com';                      // 数据库服务器$dbuser = 'example.com';                 // 数据库用户名$dbpw = 'password';                               // 数据库密码$dbname = 'db_example';$dbh    = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',                        PDO::MYSQL_ATTR_COMPRESS => true                        )                );$sql = "select `order`,name from accounts where deposit_time is null order by id desc";$row = $dbh->query($sql);while($account = $row->fetch(PDO::FETCH_ASSOC)){        $pool->submit(new Work($account));}$pool->shutdown();?>

进一步改进上面程序,我们使用单例模式 $this->worker->getInstance(); 全局仅仅做一次数据库连接,线程使用共享的数据库连接

<?phpclass ExampleWorker extends Worker {	#public function __construct(Logging $logger) {	#	$this->logger = $logger;	#}	#protected $logger;	protected  static $dbh;	public function __construct() {	}	public function run(){		$dbhost = 'db.example.com';			// 数据库服务器	    $dbuser = 'example.com';        	// 数据库用户名        $dbpw = 'password';             	// 数据库密码		$dbname = 'example';				// 数据库名		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(			PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',			PDO::MYSQL_ATTR_COMPRESS => true,			PDO::ATTR_PERSISTENT => true			)		);	}	protected function getInstance(){        return self::$dbh;    }}/* the collectable class implements machinery for Pool::collect */class Work extends Stackable {	public function __construct($data) {		$this->data = $data;		#print_r($data);	}	public function run() {		#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );		try {			$dbh  = $this->worker->getInstance();			#print_r($dbh);               		$id = $this->data['id'];			$mobile = safenet_decrypt($this->data['mobile']);			#printf("%d, %s \n", $id, $mobile);			if(strlen($mobile) > 11){				$mobile = substr($mobile, -11);			}			if($mobile == 'null'){			#	$sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'";			#	printf("%s\n",$sql);			#	$dbh->query($sql);				$mobile = '';				$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";			}else{				$sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id";			}			$sth = $dbh->prepare($sql);			$sth->bindValue(':mobile', $mobile);			$sth->bindValue(':id', $id);			$sth->execute();			#echo $sth->debugDumpParams();		}		catch(PDOException $e) {			$error = sprintf("%s,%s\n", $mobile, $id );			file_put_contents("mobile_error.log", $error, FILE_APPEND);		}		#$dbh = null;		printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);		#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);	}}$pool = new Pool(100, \ExampleWorker::class, []);#foreach (range(0, 100) as $number) {#	$pool->submit(new Work($number));#}$dbhost = 'db.example.com';                     // 数据库服务器$dbuser = 'example.com';                 		// 数据库用户名$dbpw = 'password';                             // 数据库密码$dbname = 'example';$dbh    = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',                        PDO::MYSQL_ATTR_COMPRESS => true                        )                );#print_r($dbh);#$sql = "select id, mobile from members where id < :id";#$sth = $dbh->prepare($sql);#$sth->bindValue(':id',300);#$sth->execute();#$result = $sth->fetchAll();#print_r($result);##$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";#$sth = $dbh->prepare($sql);#$sth->bindValue(':mobile', 'aa');#$sth->bindValue(':id','272');#echo $sth->execute();#echo $sth->queryString;#echo $sth->debugDumpParams();$sql = "select id, mobile from members order by id asc"; // limit 1000";$row = $dbh->query($sql);while($members = $row->fetch(PDO::FETCH_ASSOC)){        #$order =  $account['order'];        #printf("%s\n",$order);        //print_r($members);        $pool->submit(new Work($members));		#unset($account['order']);}$pool->shutdown();?>

8.3. 数据库持久连接

总的来说 pthreads 仍然处在发展中,仍有一些不足的地方,我们也可以看到pthreads的git在不断改进这个项目

数据库持久链接很重要,否则每个线程都会开启一次数据库连接,然后关闭,会导致很多链接超时。

<?php$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(    PDO::ATTR_PERSISTENT => true));?>

但有些场景数据库持久链接适得其反,所以根据你的场景选择链接方式

<?php$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(    PDO::ATTR_PERSISTENT => false));?>

由于现成持续链接数据,有时可能因为数据库或者网络原因导致数据无法连接,程序抛出异常或终止,所以使用单例并不保险。

protected function getInstance(){	return self::$dbh;}

为单例增加重新连接功能

class SenderWorker extends Worker {	protected $config;	protected static $dbh;	protected static $amqp;		public function __construct($config) {		$this->config = $config;		$this->logger = new Logger();	}	public function run() {	}	private function connect(){		try {			$dbhost = $this->config['database']['host'];			$dbport = $this->config['database']['port'];			$dbuser = $this->config['database']['user'];			$dbpass = $this->config['database']['password'];			$dbname = $this->config['database']['dbname'];			self::$dbh = new PDO ( "mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array (					PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',					PDO::MYSQL_ATTR_COMPRESS => true					/*PDO::ATTR_PERSISTENT => true*/			) );			self::$dbh->setAttribute ( PDO::ATTR_ERRMODE, PDO::ERRMODE_WARNING );		} catch ( PDOException $e ) {			$this->logger ( 'Exception worker', $e->getMessage( ) );		} catch ( Exception $e ) {			$this->logger ( 'Exception worker', $e->getMessage( ) );		}	}	protected function getInstance() {		if(!self::$dbh) {			$this->connect();			$this->logger ( 'Database', sprintf("Connect database %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) );		}else{			$this->logger ( 'Database', sprintf("Get instance database %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) );		}				if(self::$dbh){			return self::$dbh;		}else{			$this->logger ( 'Database', sprintf("Connect database is error %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) );			$this->logger ( 'Error', sprintf("Worker is shutdown %s", $this->getThreadId ()) );			$this->shutdown();		}	}		public function logger($type, $message) {		$this->logger->logger($type, $message);	}		public function getAmqpInstance(){		if(!self::$amqp){			self::$amqp = new AMQPConnection(array(				'host' 	=> $this->config['amqp']['host'], 				'port' 	=> $this->config['amqp']['port'], 				'vhost'	=> $this->config['amqp']['vhost'], 				'login' => $this->config['amqp']['login'], 				'password' => $this->config['amqp']['password']			));			$this->logger ( 'AMQP', sprintf("Connect amqp %s, %s", $this->config['amqp']['host'], $this->getThreadId ()) );		}else{			$this->logger ( 'AMQP', sprintf("Get instance amqp %s, %s", $this->config['amqp']['host'], $this->getThreadId ()) );		}			return self::$amqp;	}}

每次调用 getInstance() 会判断当前数据库是否已经链接,如果链接丢失,将重新链接数据库。

8.4. 涉及数据库更新

多线程编程中对数据库更新操作需要注意的是,有些场景,你需要控制同一时刻只能有一个线程对数据库做Update, Delete, Insert,否则数据容易出错。

例如下面的操作,你会发现程序运行完成后数据字段没有任何变化。这是因为线程间相互覆盖对方之前更新的数据。

$sql = "update import set succeed = succeed+1 where status = :status and id = :id";

解决方法有两种,一种是外部实现排他锁,一种是在数据库内部实现,通过事物处理,解决线程资源争夺,相互覆盖的问题。

private function updateSucceed($task){		$dbh = $this->worker->getInstance();		$dbh->beginTransaction();		$sql = "update import set succeed = succeed+1 where status = :status and id = :id";		$sth = $dbh->prepare ( $sql );		$sth->bindValue ( ':id', $task->id );		$sth->bindValue ( ':status', 'Processing' );		$status = $sth->execute ();		$dbh->commit();		return $status;	}

9. Thread And ZeroMQ

应用场景,我使用触发器监控数据库某个表,一旦发现有改变就通知程序处理数据

9.1. 数据库端

首先安装ZeroMQ 与 ZeroMQ for MySQL UDF 然后创建触发器。 https://github.com/netkiller/mysql-zmq-plugin

CREATE DEFINER=`dba`@`192.168.%` PROCEDURE `Table_Example`(IN `TICKET` INT, IN `LOGIN` INT, IN `CMD` INT, IN `VOLUME` INT)	LANGUAGE SQL	NOT DETERMINISTIC	READS SQL DATA	SQL SECURITY DEFINER	COMMENT '交易监控'BEGIN	DECLARE Example CHAR(1) DEFAULT 'N';	IF CMD IN ('0','1') THEN		IF VOLUME >=10 AND VOLUME <=90 THEN			select coding into Example from example.members where username = LOGIN and coding = 'Y';			IF Example = 'Y' THEN				select zmq_client('tcp://192.168.2.15:5555', CONCAT(TICKET, ',', LOGIN, ',', VOLUME));			END IF;		END IF;	END IF;ENDCREATE DEFINER=`dba`@`192.168.6.20` TRIGGER `Table_AFTER_INSERT` AFTER INSERT ON `MT4_TRADES` FOR EACH ROW BEGIN	call Table_Example(NEW.TICKET,NEW.LOGIN,NEW.CMD,NEW.VOLUME);END

9.2. 数据处理端

<?phpclass ExampleWorker extends Worker {	#public function __construct(Logging $logger) {	#	$this->logger = $logger;	#}	#protected $logger;	protected  static $dbh;	public function __construct() {	}	public function run(){		$dbhost = '192.168.2.1';			// 数据库服务器		$dbport = 3306;	    $dbuser = 'www';        			// 数据库用户名        $dbpass = 'password';             	// 数据库密码		$dbname = 'example';				// 数据库名		self::$dbh  = new PDO("mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array(			/* PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', */			PDO::MYSQL_ATTR_COMPRESS => true,			PDO::ATTR_PERSISTENT => true			)		);	}	protected function getInstance(){        return self::$dbh;    }}/* the collectable class implements machinery for Pool::collect */class Fee extends Stackable {	public function __construct($msg) {		$trades = explode(",", $msg);		$this->data = $trades;		print_r($trades);	}	public function run() {		#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );		try {			$dbh  = $this->worker->getInstance();			$insert = "INSERT INTO coding_fee(ticket, login, volume, `status`) VALUES(:ticket, :login, :volume,'N')";			$sth = $dbh->prepare($insert);			$sth->bindValue(':ticket', $this->data[0]);			$sth->bindValue(':login', $this->data[1]);			$sth->bindValue(':volume', $this->data[2]);			$sth->execute();			//$sth = null;			//$dbh = null;			/* 业务实现在此处 */			$update = "UPDATE coding_fee SET `status` = 'Y' WHERE ticket = :ticket and `status` = 'N'";			$sth = $dbh->prepare($update);			$sth->bindValue(':ticket', $this->data[0]);			$sth->execute();			//echo $sth->queryString;		}		catch(PDOException $e) {			$error = sprintf("%s,%s\n", $mobile, $id );			file_put_contents("mobile_error.log", $error, FILE_APPEND);		}		#$dbh = null;		//printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);		#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);	}}class Example {	/* config */	const LISTEN = "tcp://192.168.2.15:5555";	const MAXCONN = 100;	const pidfile = __CLASS__;	const uid	= 80;	const gid	= 80;	protected $pool = NULL;	protected $zmq = NULL;	public function __construct() {		$this->pidfile = '/var/run/'.self::pidfile.'.pid';	}	private function daemon(){		if (file_exists($this->pidfile)) {			echo "The file $this->pidfile exists.\n";			exit();		}		$pid = pcntl_fork();		if ($pid == -1) {			 die('could not fork');		} else if ($pid) {			 // we are the parent			 //pcntl_wait($status); //Protect against Zombie children			exit($pid);		} else {			// we are the child			file_put_contents($this->pidfile, getmypid());			posix_setuid(self::uid);			posix_setgid(self::gid);			return(getmypid());		}	}	private function start(){		$pid = $this->daemon();		$this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []);		$this->zmq = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);		$this->zmq->bind(self::LISTEN);		/* Loop receiving and echoing back */		while ($message = $this->zmq->recv()) {			if($message){					$this->pool->submit(new Fee($message));					$this->zmq->send('TRUE');			}else{					$this->zmq->send('FALSE');			}		}		$pool->shutdown();	}	private function stop(){		if (file_exists($this->pidfile)) {			$pid = file_get_contents($this->pidfile);			posix_kill($pid, 9);			unlink($this->pidfile);		}	}	private function help($proc){		printf("%s start | stop | help \n", $proc);	}	public function main($argv){		if(count($argv) < 2){			printf("please input help parameter\n");			exit();		}		if($argv[1] === 'stop'){			$this->stop();		}else if($argv[1] === 'start'){			$this->start();		}else{			$this->help($argv[0]);		}	}}$example = new Example();$example->main($argv);

使用方法

# php example.php start# php example.php stop# php example.php help

此程序涉及守候进程实现$this->daemon()运行后转到后台运行,进程ID保存,进程的互斥(不允许同时启动两个进程),线程池连接数以及线程任务等等

10. 延伸阅读

PHP高级编程之消息队列

PHP高级编程之守护进程

声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
どのデータをPHPセッションに保存できますか?どのデータをPHPセッションに保存できますか?May 02, 2025 am 12:17 AM

phpssionscanStorestrings、numbers、arrays、andobjects.1.strings:textdatalikeusernames.2.numbers:integersorfloatsforcounters.3.arrays:listslikeshoppingcarts.4.objects:complextructuresthataresialized。

どのようにPHPセッションを開始しますか?どのようにPHPセッションを開始しますか?May 02, 2025 am 12:16 AM

tostartaphpsession、outsession_start()atthescript'sbeginning.1)placeitbe foreanyouttosetthesscookie.2)usesionsionsionsionserdatalikelogintatussorshoppingcarts.3)再生セッションインドストップレベントフィックスアタック

セッションの再生とは何ですか?また、セキュリティをどのように改善しますか?セッションの再生とは何ですか?また、セキュリティをどのように改善しますか?May 02, 2025 am 12:15 AM

セッション再生とは、新しいセッションIDを生成し、セッション固定攻撃の場合にユーザーが機密操作を実行するときに古いIDを無効にすることを指します。実装の手順には次のものが含まれます。1。感度操作を検出、2。新しいセッションIDを生成する、3。古いセッションIDを破壊し、4。ユーザー側のセッション情報を更新します。

PHPセッションを使用する際のパフォーマンスの考慮事項は何ですか?PHPセッションを使用する際のパフォーマンスの考慮事項は何ですか?May 02, 2025 am 12:11 AM

PHPセッションは、アプリケーションのパフォーマンスに大きな影響を与えます。最適化方法には以下が含まれます。1。データベースを使用してセッションデータを保存して応答速度を向上させます。 2。セッションデータの使用を削減し、必要な情報のみを保存します。 3.非ブロッキングセッションプロセッサを使用して、同時実行機能を改善します。 4.セッションの有効期限を調整して、ユーザーエクスペリエンスとサーバーの負担のバランスを取ります。 5.永続的なセッションを使用して、データの読み取り時間と書き込み時間を減らします。

PHPセッションはCookieとどのように異なりますか?PHPセッションはCookieとどのように異なりますか?May 02, 2025 am 12:03 AM

phpsesionsareserver-side、whilecookiesareclient-side.1)Sessionsionsionsoredataontheserver、aremoresecure.2)cookiesstoredataontheclient、cookiestoresecure、andlimitedinsizeisize.sesionsionsionivationivationivationivationivationivationivationivate

PHPはユーザーのセッションをどのように識別しますか?PHPはユーザーのセッションをどのように識別しますか?May 01, 2025 am 12:23 AM

phpidentifiesauser'ssessionsingsinssessionCookiesIds.1)whensession_start()iscalled、phpgeneratesauniquesidstoredsored incoookienadphpsessidontheuser'sbrowser.2)thisidallowsphptortorieSessiondatadata fromthata

PHPセッションを保護するためのベストプラクティスは何ですか?PHPセッションを保護するためのベストプラクティスは何ですか?May 01, 2025 am 12:22 AM

PHPセッションのセキュリティは、次の測定を通じて達成できます。1。session_regenerate_id()を使用して、ユーザーがログインまたは重要な操作である場合にセッションIDを再生します。 2. HTTPSプロトコルを介して送信セッションIDを暗号化します。 3。Session_Save_Path()を使用して、セッションデータを保存し、権限を正しく設定するためのSecure Directoryを指定します。

PHPセッションファイルはデフォルトで保存されていますか?PHPセッションファイルはデフォルトで保存されていますか?May 01, 2025 am 12:15 AM

phpsessionFilesToredInthededirectoryspecifiedBysession.save_path、通常/tmponunix-likesystemsorc:\ windows \ temponwindows.tocustomizethis:1)uesession_save_path()tosetaCustomdirectory、ensuringit'swritadistradistradistradistradistra

See all articles

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

Video Face Swap

Video Face Swap

完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

ホットツール

WebStorm Mac版

WebStorm Mac版

便利なJavaScript開発ツール

SublimeText3 英語版

SublimeText3 英語版

推奨: Win バージョン、コードプロンプトをサポート!

EditPlus 中国語クラック版

EditPlus 中国語クラック版

サイズが小さく、構文の強調表示、コード プロンプト機能はサポートされていません

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

強力な PHP 統合開発環境

AtomエディタMac版ダウンロード

AtomエディタMac版ダウンロード

最も人気のあるオープンソースエディター