PHP 高度なプログラミング マルチスレッド
http://netkiller.github.io/journal/php.thread.html
Neo Chen (陈京峰) 氏、netkiller、
中国広東省深セン市龍華新区民志街西山美地
518131
+86 13113668890
+86 755 29812080
ba46dd73f0e5a249d2fb2422e27d6a91submit() はメモリが許す限りワイヤレスでタスクを送信できますが (php.ini 設定を参照)、同時に実行されるスレッドの数はサイズによって制御されます。
6.2. スレッドプールの原理
プールの動作原理を説明するために独自にクラスを実装します
<?phpclass Update extends Thread { public $running = false; public $row = array(); public function __construct($row) { $this->row = $row; $this->sql = null; } public function run() { if(strlen($this->row['bankno']) > 100 ){ $bankno = safenet_decrypt($this->row['bankno']); }else{ $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']); file_put_contents("bankno_error.log", $error, FILE_APPEND); } if( strlen($bankno) > 7 ){ $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql; } printf("%s\n",$this->sql); }}class Pool { public $pool = array(); public function __construct($count) { $this->count = $count; } public function push($row){ if(count($this->pool) < $this->count){ $this->pool[] = new Update($row); return true; }else{ return false; } } public function start(){ foreach ( $this->pool as $id => $worker){ $this->pool[$id]->start(); } } public function join(){ foreach ( $this->pool as $id => $worker){ $this->pool[$id]->join(); } } public function clean(){ foreach ( $this->pool as $id => $worker){ if(! $worker->isRunning()){ unset($this->pool[$id]); } } }}try { $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select id,bankno from members order by id desc"; $row = $dbh->query($sql); $pool = new Pool(5); while($member = $row->fetch(PDO::FETCH_ASSOC)) { while(true){ if($pool->push($member)){ //压入任务到池中 break; }else{ //如果池已经满,就开始启动线程 $pool->start(); $pool->join(); $pool->clean(); } } } $pool->start(); $pool->join(); $dbh = null;} catch (Exception $e) { echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n";}?>
6.3. 動的キュースレッドプール
<?phpclass Update extends Thread { public $running = false; public $row = array(); public function __construct($row) { $this->row = $row; $this->sql = null; //print_r($this->row); } public function run() { if(strlen($this->row['bankno']) > 100 ){ $bankno = safenet_decrypt($this->row['bankno']); }else{ $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']); file_put_contents("bankno_error.log", $error, FILE_APPEND); } if( strlen($bankno) > 7 ){ $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql; } printf("%s\n",$this->sql); }}try { $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select id,bankno from members order by id desc limit 50"; $row = $dbh->query($sql); $pool = array(); while($member = $row->fetch(PDO::FETCH_ASSOC)) { $id = $member['id']; while (true){ if(count($pool) < 5){ $pool[$id] = new Update($member); $pool[$id]->start(); break; }else{ foreach ( $pool as $name => $worker){ if(! $worker->isRunning()){ unset($pool[$name]); } } } } } $dbh = null;} catch (Exception $e) { echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n";}?>
6.4. スレッド プール内のスレッドの実行が完了するのを待機しています
$mutex = Mutex::create(); $pool = new Pool ( self::MAXCONN , \ImportWorker::class, array($this->config, $mutex) ); $pool->collect(function($work){ return $work->isComplete(); }); foreach($tasks as $task){ $this->logger ( __CLASS__, sprintf("Task %s %s", $task->file, 'Processing') ); pcntl_signal_dispatch(); if(Signal::get() == SIGHUP){ Signal::reset(); break; } if(file_exists ($task->file)){ $handle = fopen($task->file, 'r'); $i = 0; while (($row = fgetcsv($handle, 100000, ',')) !== false) { $work[$i] = new Import ( $task, $row ); $pool->submit ( $work[$i] ); $i++; //$pool->submit ( new Import ( $task, $row )); } fclose($handle); $waiting = true; while($waiting){ for($i=0;$i<count($work);$i++){ if($work[$i]->isComplete()){ Counter::$completed++; } //printf("work %s:%s \n", count($work), Counter::$completed); if(Counter::$completed == count($work)){ $waiting = false; break; } } sleep(1); } $this->completedTask($task); }else{ $this->failedTask($task); } //printf("Ignore: %s\n", Counter::$ignore ) ; } $pool->shutdown (); //Mutex::unlock($mutex); Mutex::destroy($mutex);
while($waiting) ペアは実行を継続し、すべてのスレッドが完了するまで終了しません。
在多线程中读写文件但进程是有区别的,读取内容比较容易时间,但写入数据就需要保证同一时刻只能有一个进程操作,虽然通过互斥锁可以解决,但从安全的角度文件必须上锁。
文件锁种类。
LOCK_SH 取得共享锁定(读取的程序)。LOCK_EX 取得独占锁定(写入的程序。LOCK_UN 释放锁定(无论共享或独占)。LOCK_NB 如果不希望 flock() 在锁定时堵塞
共享锁例子
<?php$fp = fopen("/tmp/lock.txt", "r+");if (flock($fp, LOCK_EX)) { // 进行排它型锁定 ftruncate($fp, 0); // truncate file fwrite($fp, "Write something here\n"); fflush($fp); // flush output before releasing the lock flock($fp, LOCK_UN); // 释放锁定} else { echo "Couldn't get the lock!";}fclose($fp);?>
共享锁例子2
<?php$fp = fopen('/tmp/lock.txt', 'r+');/* Activate the LOCK_NB option on an LOCK_EX operation */if(!flock($fp, LOCK_EX | LOCK_NB)) { echo 'Unable to obtain lock'; exit(-1);}/* ... */fclose($fp);?>
多线程中操作数据库总结与注意事项 pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。
<?phpclass Work extends Stackable { public function __construct() { } public function run() { $dbh = $this->worker->getConnection(); $sql = "select id,name from members order by id desc limit 50"; $row = $dbh->query($sql); while($member = $row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } }}class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } /* * The run method should just prepare the environment for the work that is coming ... */ public function run(){ self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456'); } public function getConnection(){ return self::$dbh; }}$worker = new ExampleWorker("My Worker Thread");$work=new Work();$worker->stack($work);$worker->start();$worker->shutdown();?>
在线程池中链接数据库
# cat pool.php<?phpclass ExampleWorker extends Worker { public function __construct(Logging $logger) { $this->logger = $logger; } protected $logger;}/* the collectable class implements machinery for Pool::collect */class Work extends Stackable { public function __construct($number) { $this->number = $number; } public function run() { $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example_real'; $dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); $sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'"; #echo $sql; $row = $dbh->query($sql); $mt4_trades = $row->fetch(PDO::FETCH_ASSOC); if($mt4_trades){ $row = null; $sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';"; $dbh->query($sql); #printf("%s\n",$sql); } $dbh = null; printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']); }}class Logging extends Stackable { protected static $dbh; public function __construct() { $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example_real'; // 数据库名 self::$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); } protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf("{$message}\n", $args); } } protected function getConnection(){ return self::$dbh; }}$pool = new Pool(200, \ExampleWorker::class, [new Logging()]);$dbhost = 'db.example.com'; // 数据库服务器$dbuser = 'example.com'; // 数据库用户名$dbpw = 'password'; // 数据库密码$dbname = 'db_example';$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) );$sql = "select `order`,name from accounts where deposit_time is null order by id desc";$row = $dbh->query($sql);while($account = $row->fetch(PDO::FETCH_ASSOC)){ $pool->submit(new Work($account));}$pool->shutdown();?>
进一步改进上面程序,我们使用单例模式 $this->worker->getInstance(); 全局仅仅做一次数据库连接,线程使用共享的数据库连接
<?phpclass ExampleWorker extends Worker { #public function __construct(Logging $logger) { # $this->logger = $logger; #} #protected $logger; protected static $dbh; public function __construct() { } public function run(){ $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example'; // 数据库名 self::$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); } protected function getInstance(){ return self::$dbh; }}/* the collectable class implements machinery for Pool::collect */class Work extends Stackable { public function __construct($data) { $this->data = $data; #print_r($data); } public function run() { #$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() ); try { $dbh = $this->worker->getInstance(); #print_r($dbh); $id = $this->data['id']; $mobile = safenet_decrypt($this->data['mobile']); #printf("%d, %s \n", $id, $mobile); if(strlen($mobile) > 11){ $mobile = substr($mobile, -11); } if($mobile == 'null'){ # $sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'"; # printf("%s\n",$sql); # $dbh->query($sql); $mobile = ''; $sql = "UPDATE members_digest SET mobile = :mobile where id = :id"; }else{ $sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id"; } $sth = $dbh->prepare($sql); $sth->bindValue(':mobile', $mobile); $sth->bindValue(':id', $id); $sth->execute(); #echo $sth->debugDumpParams(); } catch(PDOException $e) { $error = sprintf("%s,%s\n", $mobile, $id ); file_put_contents("mobile_error.log", $error, FILE_APPEND); } #$dbh = null; printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id); #printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number); }}$pool = new Pool(100, \ExampleWorker::class, []);#foreach (range(0, 100) as $number) {# $pool->submit(new Work($number));#}$dbhost = 'db.example.com'; // 数据库服务器$dbuser = 'example.com'; // 数据库用户名$dbpw = 'password'; // 数据库密码$dbname = 'example';$dbh = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) );#print_r($dbh);#$sql = "select id, mobile from members where id < :id";#$sth = $dbh->prepare($sql);#$sth->bindValue(':id',300);#$sth->execute();#$result = $sth->fetchAll();#print_r($result);##$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";#$sth = $dbh->prepare($sql);#$sth->bindValue(':mobile', 'aa');#$sth->bindValue(':id','272');#echo $sth->execute();#echo $sth->queryString;#echo $sth->debugDumpParams();$sql = "select id, mobile from members order by id asc"; // limit 1000";$row = $dbh->query($sql);while($members = $row->fetch(PDO::FETCH_ASSOC)){ #$order = $account['order']; #printf("%s\n",$order); //print_r($members); $pool->submit(new Work($members)); #unset($account['order']);}$pool->shutdown();?>
总的来说 pthreads 仍然处在发展中,仍有一些不足的地方,我们也可以看到pthreads的git在不断改进这个项目
数据库持久链接很重要,否则每个线程都会开启一次数据库连接,然后关闭,会导致很多链接超时。
<?php$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array( PDO::ATTR_PERSISTENT => true));?>
但有些场景数据库持久链接适得其反,所以根据你的场景选择链接方式
<?php$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array( PDO::ATTR_PERSISTENT => false));?>
由于现成持续链接数据,有时可能因为数据库或者网络原因导致数据无法连接,程序抛出异常或终止,所以使用单例并不保险。
protected function getInstance(){ return self::$dbh;}
为单例增加重新连接功能
class SenderWorker extends Worker { protected $config; protected static $dbh; protected static $amqp; public function __construct($config) { $this->config = $config; $this->logger = new Logger(); } public function run() { } private function connect(){ try { $dbhost = $this->config['database']['host']; $dbport = $this->config['database']['port']; $dbuser = $this->config['database']['user']; $dbpass = $this->config['database']['password']; $dbname = $this->config['database']['dbname']; self::$dbh = new PDO ( "mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array ( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true /*PDO::ATTR_PERSISTENT => true*/ ) ); self::$dbh->setAttribute ( PDO::ATTR_ERRMODE, PDO::ERRMODE_WARNING ); } catch ( PDOException $e ) { $this->logger ( 'Exception worker', $e->getMessage( ) ); } catch ( Exception $e ) { $this->logger ( 'Exception worker', $e->getMessage( ) ); } } protected function getInstance() { if(!self::$dbh) { $this->connect(); $this->logger ( 'Database', sprintf("Connect database %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) ); }else{ $this->logger ( 'Database', sprintf("Get instance database %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) ); } if(self::$dbh){ return self::$dbh; }else{ $this->logger ( 'Database', sprintf("Connect database is error %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) ); $this->logger ( 'Error', sprintf("Worker is shutdown %s", $this->getThreadId ()) ); $this->shutdown(); } } public function logger($type, $message) { $this->logger->logger($type, $message); } public function getAmqpInstance(){ if(!self::$amqp){ self::$amqp = new AMQPConnection(array( 'host' => $this->config['amqp']['host'], 'port' => $this->config['amqp']['port'], 'vhost' => $this->config['amqp']['vhost'], 'login' => $this->config['amqp']['login'], 'password' => $this->config['amqp']['password'] )); $this->logger ( 'AMQP', sprintf("Connect amqp %s, %s", $this->config['amqp']['host'], $this->getThreadId ()) ); }else{ $this->logger ( 'AMQP', sprintf("Get instance amqp %s, %s", $this->config['amqp']['host'], $this->getThreadId ()) ); } return self::$amqp; }}
每次调用 getInstance() 会判断当前数据库是否已经链接,如果链接丢失,将重新链接数据库。
多线程编程中对数据库更新操作需要注意的是,有些场景,你需要控制同一时刻只能有一个线程对数据库做Update, Delete, Insert,否则数据容易出错。
例如下面的操作,你会发现程序运行完成后数据字段没有任何变化。这是因为线程间相互覆盖对方之前更新的数据。
$sql = "update import set succeed = succeed+1 where status = :status and id = :id";
解决方法有两种,一种是外部实现排他锁,一种是在数据库内部实现,通过事物处理,解决线程资源争夺,相互覆盖的问题。
private function updateSucceed($task){ $dbh = $this->worker->getInstance(); $dbh->beginTransaction(); $sql = "update import set succeed = succeed+1 where status = :status and id = :id"; $sth = $dbh->prepare ( $sql ); $sth->bindValue ( ':id', $task->id ); $sth->bindValue ( ':status', 'Processing' ); $status = $sth->execute (); $dbh->commit(); return $status; }
应用场景,我使用触发器监控数据库某个表,一旦发现有改变就通知程序处理数据
首先安装ZeroMQ 与 ZeroMQ for MySQL UDF 然后创建触发器。 https://github.com/netkiller/mysql-zmq-plugin
CREATE DEFINER=`dba`@`192.168.%` PROCEDURE `Table_Example`(IN `TICKET` INT, IN `LOGIN` INT, IN `CMD` INT, IN `VOLUME` INT) LANGUAGE SQL NOT DETERMINISTIC READS SQL DATA SQL SECURITY DEFINER COMMENT '交易监控'BEGIN DECLARE Example CHAR(1) DEFAULT 'N'; IF CMD IN ('0','1') THEN IF VOLUME >=10 AND VOLUME <=90 THEN select coding into Example from example.members where username = LOGIN and coding = 'Y'; IF Example = 'Y' THEN select zmq_client('tcp://192.168.2.15:5555', CONCAT(TICKET, ',', LOGIN, ',', VOLUME)); END IF; END IF; END IF;ENDCREATE DEFINER=`dba`@`192.168.6.20` TRIGGER `Table_AFTER_INSERT` AFTER INSERT ON `MT4_TRADES` FOR EACH ROW BEGIN call Table_Example(NEW.TICKET,NEW.LOGIN,NEW.CMD,NEW.VOLUME);END
<?phpclass ExampleWorker extends Worker { #public function __construct(Logging $logger) { # $this->logger = $logger; #} #protected $logger; protected static $dbh; public function __construct() { } public function run(){ $dbhost = '192.168.2.1'; // 数据库服务器 $dbport = 3306; $dbuser = 'www'; // 数据库用户名 $dbpass = 'password'; // 数据库密码 $dbname = 'example'; // 数据库名 self::$dbh = new PDO("mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array( /* PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', */ PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); } protected function getInstance(){ return self::$dbh; }}/* the collectable class implements machinery for Pool::collect */class Fee extends Stackable { public function __construct($msg) { $trades = explode(",", $msg); $this->data = $trades; print_r($trades); } public function run() { #$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() ); try { $dbh = $this->worker->getInstance(); $insert = "INSERT INTO coding_fee(ticket, login, volume, `status`) VALUES(:ticket, :login, :volume,'N')"; $sth = $dbh->prepare($insert); $sth->bindValue(':ticket', $this->data[0]); $sth->bindValue(':login', $this->data[1]); $sth->bindValue(':volume', $this->data[2]); $sth->execute(); //$sth = null; //$dbh = null; /* 业务实现在此处 */ $update = "UPDATE coding_fee SET `status` = 'Y' WHERE ticket = :ticket and `status` = 'N'"; $sth = $dbh->prepare($update); $sth->bindValue(':ticket', $this->data[0]); $sth->execute(); //echo $sth->queryString; } catch(PDOException $e) { $error = sprintf("%s,%s\n", $mobile, $id ); file_put_contents("mobile_error.log", $error, FILE_APPEND); } #$dbh = null; //printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id); #printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number); }}class Example { /* config */ const LISTEN = "tcp://192.168.2.15:5555"; const MAXCONN = 100; const pidfile = __CLASS__; const uid = 80; const gid = 80; protected $pool = NULL; protected $zmq = NULL; public function __construct() { $this->pidfile = '/var/run/'.self::pidfile.'.pid'; } private function daemon(){ if (file_exists($this->pidfile)) { echo "The file $this->pidfile exists.\n"; exit(); } $pid = pcntl_fork(); if ($pid == -1) { die('could not fork'); } else if ($pid) { // we are the parent //pcntl_wait($status); //Protect against Zombie children exit($pid); } else { // we are the child file_put_contents($this->pidfile, getmypid()); posix_setuid(self::uid); posix_setgid(self::gid); return(getmypid()); } } private function start(){ $pid = $this->daemon(); $this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []); $this->zmq = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP); $this->zmq->bind(self::LISTEN); /* Loop receiving and echoing back */ while ($message = $this->zmq->recv()) { if($message){ $this->pool->submit(new Fee($message)); $this->zmq->send('TRUE'); }else{ $this->zmq->send('FALSE'); } } $pool->shutdown(); } private function stop(){ if (file_exists($this->pidfile)) { $pid = file_get_contents($this->pidfile); posix_kill($pid, 9); unlink($this->pidfile); } } private function help($proc){ printf("%s start | stop | help \n", $proc); } public function main($argv){ if(count($argv) < 2){ printf("please input help parameter\n"); exit(); } if($argv[1] === 'stop'){ $this->stop(); }else if($argv[1] === 'start'){ $this->start(); }else{ $this->help($argv[0]); } }}$example = new Example();$example->main($argv);
使用方法
# php example.php start# php example.php stop# php example.php help
此程序涉及守候进程实现$this->daemon()运行后转到后台运行,进程ID保存,进程的互斥(不允许同时启动两个进程),线程池连接数以及线程任务等等
PHP高级编程之消息队列
PHP高级编程之守护进程