Rumah >pembangunan bahagian belakang >masalah PHP >Cara menggunakan pthread php
Cara menggunakan pthread php: 1. Pasang pthread melalui "pecl install pthreads"; 2. Gunakan kunci mutex apabila anda perlu mengawal beberapa utas dan hanya satu utas boleh berfungsi pada masa yang sama.
Persekitaran pengendalian artikel ini: sistem windows7, versi php7.0.2, komputer DELL G3
Pemasangan dan penggunaan php pthread berbilang benang
Memasang Pthreads pada asasnya memerlukan penyusunan semula PHP dan menambah parameter --enable-maintainer-zts, tetapi terdapat beberapa dokumen untuk menggunakan ini; akan terdapat banyak pepijat dan masalah yang tidak dijangka dalam persekitaran pengeluaran. Saya hanya boleh haha, jadi mari kita bermain dengan perkara ini untuk multi-threading sebenar, anda masih perlu menggunakan Python, C, dll.
Kebanyakan kod berikut datang dari Internet
1. Pemasangan Apa yang digunakan di sini ialah php-7.0.2./configure \ --prefix=/usr/local/php7 \ --with-config-file-path=/etc \ --with-config-file-scan-dir=/etc/php.d \ --enable-debug \ --enable-maintainer-zts \ --enable-pcntl \ --enable-fpm \ --enable-opcache \ --enable-embed=shared \ --enable-json=shared \ --enable-phpdbg \ --with-curl=shared \ --with-mysql=/usr/local/mysql \ --with-mysqli=/usr/local/mysql/bin/mysql_config \ --with-pdo-mysql make && make installPasang pthread
pecl install pthreads2 🎜>
<?php #1 $thread = new class extends Thread { public function run() { echo "Hello World {$this->getThreadId()}\n"; } }; $thread->start() && $thread->join(); #2 class workerThread extends Thread { public function __construct($i){ $this->i=$i; } public function run(){ while(true){ echo $this->i."\n"; sleep(1); } } } for($i=0;$i<50;$i++){ $workers[$i]=new workerThread($i); $workers[$i]->start(); } ?>
4. Kunci pengecualian bersama
<?php class SQLQuery extends Stackable { public function __construct($sql) { $this->sql = $sql; } public function run() { $dbh = $this->worker->getConnection(); $row = $dbh->query($this->sql); while($member = $row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } } } class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } public function run(){ self::$dbh = new PDO('mysql:host=10.0.0.30;dbname=testdb','root','123456'); } public function getConnection(){ return self::$dbh; } } $worker = new ExampleWorker("My Worker Thread"); $sql1 = new SQLQuery('select * from test order by id desc limit 1,5'); $worker->stack($sql1); $sql2 = new SQLQuery('select * from test order by id desc limit 5,5'); $worker->stack($sql2); $worker->start(); $worker->shutdown(); ?>Dalam keadaan apakah kunci mutex akan digunakan? Ia boleh digunakan apabila anda perlu mengawal beberapa utas dan hanya satu utas boleh berfungsi pada masa yang sama. Program kaunter ringkas yang menggambarkan perbezaan dengan dan tanpa mutex
Memori berbilang benang dan dikongsi
<?php $counter = 0; $handle=fopen("/tmp/counter.txt", "w"); fwrite($handle, $counter ); fclose($handle); class CounterThread extends Thread { public function __construct($mutex = null){ $this->mutex = $mutex; $this->handle = fopen("/tmp/counter.txt", "w+"); } public function __destruct(){ fclose($this->handle); } public function run() { if($this->mutex) $locked=Mutex::lock($this->mutex); $counter = intval(fgets($this->handle)); $counter++; rewind($this->handle); fputs($this->handle, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); if($this->mutex) Mutex::unlock($this->mutex); } } //没有互斥锁 for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread(); $threads[$i]->start(); } //加入互斥锁 $mutex = Mutex::create(true); for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread($mutex); $threads[$i]->start(); } Mutex::unlock($mutex); for ($i=0;$i<50;$i++){ $threads[$i]->join(); } Mutex::destroy($mutex); ?>Dalam contoh memori kongsi, tiada kunci digunakan, ia masih boleh berfungsi. biasanya, mungkin operasi memori yang berfungsi itu sendiri mempunyai fungsi mengunci
5 Penyegerakan benang
<?php $tmp = tempnam(__FILE__, 'PHP'); $key = ftok($tmp, 'a'); $shmid = shm_attach($key); $counter = 0; shm_put_var( $shmid, 1, $counter ); class CounterThread extends Thread { public function __construct($shmid){ $this->shmid = $shmid; } public function run() { $counter = shm_get_var( $this->shmid, 1 ); $counter++; shm_put_var( $this->shmid, 1, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); } } for ($i=0;$i<100;$i++){ $threads[] = new CounterThread($shmid); } for ($i=0;$i<100;$i++){ $threads[$i]->start(); } for ($i=0;$i<100;$i++){ $threads[$i]->join(); } shm_remove( $shmid ); shm_detach( $shmid ); ?>Dalam sesetengah senario kami tidak mahu thread->start() mula menjalankan thread. program, tetapi Semoga benang sedang menunggu arahan kami. Fungsi ujian $thread->wait(); ialah thread tidak akan dijalankan serta-merta selepas thread->start(). >6. Kumpulan benangKelas Pool
<?php $tmp = tempnam(__FILE__, 'PHP'); $key = ftok($tmp, 'a'); $shmid = shm_attach($key); $counter = 0; shm_put_var( $shmid, 1, $counter ); class CounterThread extends Thread { public function __construct($shmid){ $this->shmid = $shmid; } public function run() { $this->synchronized(function($thread){ $thread->wait(); }, $this); $counter = shm_get_var( $this->shmid, 1 ); $counter++; shm_put_var( $this->shmid, 1, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); } } for ($i=0;$i<100;$i++){ $threads[] = new CounterThread($shmid); } for ($i=0;$i<100;$i++){ $threads[$i]->start(); } for ($i=0;$i<100;$i++){ $threads[$i]->synchronized(function($thread){ $thread->notify(); }, $threads[$i]); } for ($i=0;$i<100;$i++){ $threads[$i]->join(); } shm_remove( $shmid ); shm_detach( $shmid ); ?>Kolam benang gilir dinamikContoh di atas adalah untuk melaksanakan bermula apabila kumpulan benang adalah penuh. Contoh berikut mencipta benang baharu sebaik sahaja terdapat ruang dalam kumpulan benang.
<?php class Update extends Thread { public $running = false; public $row = array(); public function __construct($row) { $this->row = $row; $this->sql = null; } public function run() { if(strlen($this->row['bankno']) > 100 ){ $bankno = safenet_decrypt($this->row['bankno']); }else{ $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']); file_put_contents("bankno_error.log", $error, FILE_APPEND); } if( strlen($bankno) > 7 ){ $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql; } printf("%s\n",$this->sql); } } class Pool { public $pool = array(); public function __construct($count) { $this->count = $count; } public function push($row){ if(count($this->pool) < $this->count){ $this->pool[] = new Update($row); return true; }else{ return false; } } public function start(){ foreach ( $this->pool as $id => $worker){ $this->pool[$id]->start(); } } public function join(){ foreach ( $this->pool as $id => $worker){ $this->pool[$id]->join(); } } public function clean(){ foreach ( $this->pool as $id => $worker){ if(! $worker->isRunning()){ unset($this->pool[$id]); } } } } try { $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select id,bankno from members order by id desc"; $row = $dbh->query($sql); $pool = new Pool(5); while($member = $row->fetch(PDO::FETCH_ASSOC)) { while(true){ if($pool->push($member)){ //压入任务到池中 break; }else{ //如果池已经满,就开始启动线程 $pool->start(); $pool->join(); $pool->clean(); } } } $pool->start(); $pool->join(); $dbh = null; } catch (Exception $e) { echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n"; } ?>pthreads Pool class
<?php class Update extends Thread { public $running = false; public $row = array(); public function __construct($row) { $this->row = $row; $this->sql = null; //print_r($this->row); } public function run() { if(strlen($this->row['bankno']) > 100 ){ $bankno = safenet_decrypt($this->row['bankno']); }else{ $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']); file_put_contents("bankno_error.log", $error, FILE_APPEND); } if( strlen($bankno) > 7 ){ $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql; } printf("%s\n",$this->sql); } } try { $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select id,bankno from members order by id desc limit 50"; $row = $dbh->query($sql); $pool = array(); while($member = $row->fetch(PDO::FETCH_ASSOC)) { $id = $member['id']; while (true){ if(count($pool) < 5){ $pool[$id] = new Update($member); $pool[$id]->start(); break; }else{ foreach ( $pool as $name => $worker){ if(! $worker->isRunning()){ unset($pool[$name]); } } } } } $dbh = null; } catch (Exception $e) { echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n"; } ?>7. Fail berbilang benang selamat membaca dan menulis
<?php class WebWorker extends Worker { public function __construct(SafeLog $logger) { $this->logger = $logger; } protected $loger; } class WebWork extends Stackable { public function isComplete() { return $this->complete; } public function run() { $this->worker ->logger ->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId()); $this->complete = true; } protected $complete; } class SafeLog extends Stackable { protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf( "{$message}\n", $args); } } } $pool = new Pool(8, \WebWorker::class, [new SafeLog()]); $pool->submit($w=new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->shutdown(); $pool->collect(function($work){ return $work->isComplete(); }); var_dump($pool);
LOCK_EX memperoleh kunci eksklusif (program yang ditulis oleh
LOCK_UN melepaskan kunci (sama ada dikongsi atau eksklusif)
LOCK_NB Jika anda tidak mahu flock() disekat semasa mengunci
<?php $fp = fopen("/tmp/lock.txt", "r+"); if (flock($fp, LOCK_EX)) { // 进行排它型锁定 ftruncate($fp, 0); // truncate file fwrite($fp, "Write something here\n"); fflush($fp); // flush output before releasing the lock flock($fp, LOCK_UN); // 释放锁定 } else { echo "Couldn't get the lock!"; } fclose($fp); $fp = fopen('/tmp/lock.txt', 'r+'); if(!flock($fp, LOCK_EX | LOCK_NB)) { echo 'Unable to obtain lock'; exit(-1); } fclose($fp); ?>
Memautkan pangkalan data dalam kumpulan benang
Untuk menambah baik lagi program di atas, kami menggunakan mod tunggal $this->worker->getInstance(. ); untuk membuat hanya satu sambungan pangkalan data secara global, dan utas menggunakan sambungan pangkalan data yang dikongsi<?php class Work extends Stackable { public function __construct() { } public function run() { $dbh = $this->worker->getConnection(); $sql = "select id,name from members order by id desc limit 50"; $row = $dbh->query($sql); while($member = $row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } } } class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } /* * The run method should just prepare the environment for the work that is coming ... */ public function run(){ self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456'); } public function getConnection(){ return self::$dbh; } } $worker = new ExampleWorker("My Worker Thread"); $work=new Work(); $worker->stack($work); $worker->start(); $worker->shutdown(); ?>Ringkasan operasi pangkalan data dalam berbilang benangSecara amnya, pthread masih dalam pembangunan dan masih terdapat beberapa kekurangan. Kita juga dapat melihat bahawa git pthreads sentiasa bertambah baik Projek ini
# cat pool.php <?php class ExampleWorker extends Worker { public function __construct(Logging $logger) { $this->logger = $logger; } protected $logger; } /* the collectable class implements machinery for Pool::collect */ class Work extends Stackable { public function __construct($number) { $this->number = $number; } public function run() { $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example_real'; $dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); $sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'"; #echo $sql; $row = $dbh->query($sql); $mt4_trades = $row->fetch(PDO::FETCH_ASSOC); if($mt4_trades){ $row = null; $sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';"; $dbh->query($sql); #printf("%s\n",$sql); } $dbh = null; printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']); } } class Logging extends Stackable { protected static $dbh; public function __construct() { $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example_real'; // 数据库名 self::$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); } protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf("{$message}\n", $args); } } protected function getConnection(){ return self::$dbh; } } $pool = new Pool(200, \ExampleWorker::class, [new Logging()]); $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'db_example'; $dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select `order`,name from accounts where deposit_time is null order by id desc"; $row = $dbh->query($sql); while($account = $row->fetch(PDO::FETCH_ASSOC)) { $pool->submit(new Work($account)); } $pool->shutdown(); ?>
Sambungan pangkalan data yang berterusan adalah sangat penting, jika tidak, setiap thread akan membuka sambungan pangkalan data dan kemudian menutupnya, yang akan menyebabkan banyak. tamat masa sambungan
<?php class ExampleWorker extends Worker { #public function __construct(Logging $logger) { # $this->logger = $logger; #} #protected $logger; protected static $dbh; public function __construct() { } public function run(){ $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example'; // 数据库名 self::$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); } protected function getInstance(){ return self::$dbh; } } /* the collectable class implements machinery for Pool::collect */ class Work extends Stackable { public function __construct($data) { $this->data = $data; #print_r($data); } public function run() { #$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() ); try { $dbh = $this->worker->getInstance(); #print_r($dbh); $id = $this->data['id']; $mobile = safenet_decrypt($this->data['mobile']); #printf("%d, %s \n", $id, $mobile); if(strlen($mobile) > 11){ $mobile = substr($mobile, -11); } if($mobile == 'null'){ # $sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'"; # printf("%s\n",$sql); # $dbh->query($sql); $mobile = ''; $sql = "UPDATE members_digest SET mobile = :mobile where id = :id"; }else{ $sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id"; } $sth = $dbh->prepare($sql); $sth->bindValue(':mobile', $mobile); $sth->bindValue(':id', $id); $sth->execute(); #echo $sth->debugDumpParams(); } catch(PDOException $e) { $error = sprintf("%s,%s\n", $mobile, $id ); file_put_contents("mobile_error.log", $error, FILE_APPEND); } #$dbh = null; printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id); #printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number); } } $pool = new Pool(100, \ExampleWorker::class, []); #foreach (range(0, 100) as $number) { # $pool->submit(new Work($number)); #} $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example'; $dbh = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); #print_r($dbh); #$sql = "select id, mobile from members where id < :id"; #$sth = $dbh->prepare($sql); #$sth->bindValue(':id',300); #$sth->execute(); #$result = $sth->fetchAll(); #print_r($result); # #$sql = "UPDATE members_digest SET mobile = :mobile where id = :id"; #$sth = $dbh->prepare($sql); #$sth->bindValue(':mobile', 'aa'); #$sth->bindValue(':id','272'); #echo $sth->execute(); #echo $sth->queryString; #echo $sth->debugDumpParams(); $sql = "select id, mobile from members order by id asc"; // limit 1000"; $row = $dbh->query($sql); while($members = $row->fetch(PDO::FETCH_ASSOC)) { #$order = $account['order']; #printf("%s\n",$order); //print_r($members); $pool->submit(new Work($members)); #unset($account['order']); } $pool->shutdown(); ?>
》
Atas ialah kandungan terperinci Cara menggunakan pthread php. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!