安装Pthreads 基本上需要重新编译PHP,加上 --enable-maintainer-zts 参数,但是用这个文档很少;bug会很多很有很多意想不到的问题,生成环境上只能呵呵了,所以这个东西玩玩就算了,真正多线程还是用Python、C等等
一、安装
这里使用的是 php-7.0.2
./configure \ --prefix=/usr/local/php7 \ --with-config-file-path=/etc \ --with-config-file-scan-dir=/etc/php.d \ --enable-debug \ --enable-maintainer-zts \ --enable-pcntl \ --enable-fpm \ --enable-opcache \ --enable-embed=shared \ --enable-json=shared \ --enable-phpdbg \ --with-curl=shared \ --with-mysql=/usr/local/mysql \ --with-mysqli=/usr/local/mysql/bin/mysql_config \ --with-pdo-mysql
make && make install
安装pthreads
pecl install pthreads
二、Thread
<?php #1 $thread = new class extends Thread { public function run() { echo "Hello World {$this->getThreadId()}\n"; } }; $thread->start() && $thread->join(); #2 class workerThread extends Thread { public function __construct($i){ $this->i=$i; } public function run(){ while(true){ echo $this->i."\n"; sleep(1); } } } for($i=0;$i<50;$i++){ $workers[$i]=new workerThread($i); $workers[$i]->start(); } ?>
三、 Worker 与 Stackable
Stackables are tasks that are executed by Worker threads. You can synchronize with, read, and write Stackable objects before, after and during their execution.
<?php class SQLQuery extends Stackable { public function __construct($sql) { $this->sql = $sql; } public function run() { $dbh = $this->worker->getConnection(); $row = $dbh->query($this->sql); while($member = $row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } } } class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } public function run(){ self::$dbh = new PDO('mysql:host=10.0.0.30;dbname=testdb','root','123456'); } public function getConnection(){ return self::$dbh; } } $worker = new ExampleWorker("My Worker Thread"); $sql1 = new SQLQuery('select * from test order by id desc limit 1,5'); $worker->stack($sql1); $sql2 = new SQLQuery('select * from test order by id desc limit 5,5'); $worker->stack($sql2); $worker->start(); $worker->shutdown(); ?>
四、 互斥锁
什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。一个简单的计数器程序,说明有无互斥锁情况下的不同
<?php $counter = 0; $handle=fopen("/tmp/counter.txt", "w"); fwrite($handle, $counter ); fclose($handle); class CounterThread extends Thread { public function __construct($mutex = null){ $this->mutex = $mutex; $this->handle = fopen("/tmp/counter.txt", "w+"); } public function __destruct(){ fclose($this->handle); } public function run() { if($this->mutex) $locked=Mutex::lock($this->mutex); $counter = intval(fgets($this->handle)); $counter++; rewind($this->handle); fputs($this->handle, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); if($this->mutex) Mutex::unlock($this->mutex); } } //没有互斥锁 for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread(); $threads[$i]->start(); } //加入互斥锁 $mutex = Mutex::create(true); for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread($mutex); $threads[$i]->start(); } Mutex::unlock($mutex); for ($i=0;$i<50;$i++){ $threads[$i]->join(); } Mutex::destroy($mutex); ?>
多线程与共享内存
在共享内存的例子中,没有使用任何锁,仍然可能正常工作,可能工作内存操作本身具备锁的功能
<?php $tmp = tempnam(__FILE__, 'PHP'); $key = ftok($tmp, 'a'); $shmid = shm_attach($key); $counter = 0; shm_put_var( $shmid, 1, $counter ); class CounterThread extends Thread { public function __construct($shmid){ $this->shmid = $shmid; } public function run() { $counter = shm_get_var( $this->shmid, 1 ); $counter++; shm_put_var( $this->shmid, 1, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); } } for ($i=0;$i<100;$i++){ $threads[] = new CounterThread($shmid); } for ($i=0;$i<100;$i++){ $threads[$i]->start(); } for ($i=0;$i<100;$i++){ $threads[$i]->join(); } shm_remove( $shmid ); shm_detach( $shmid ); ?>
五、 线程同步
有些场景我们不希望 thread->start() 就开始运行程序,而是希望线程等待我们的命令。thread−>wait();测作用是thread−>start()后线程并不会立即运行,只有收到 thread->notify(); 发出的信号后才运行
<?php $tmp = tempnam(__FILE__, 'PHP'); $key = ftok($tmp, 'a'); $shmid = shm_attach($key); $counter = 0; shm_put_var( $shmid, 1, $counter ); class CounterThread extends Thread { public function __construct($shmid){ $this->shmid = $shmid; } public function run() { $this->synchronized(function($thread){ $thread->wait(); }, $this); $counter = shm_get_var( $this->shmid, 1 ); $counter++; shm_put_var( $this->shmid, 1, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); } } for ($i=0;$i<100;$i++){ $threads[] = new CounterThread($shmid); } for ($i=0;$i<100;$i++){ $threads[$i]->start(); } for ($i=0;$i<100;$i++){ $threads[$i]->synchronized(function($thread){ $thread->notify(); }, $threads[$i]); } for ($i=0;$i<100;$i++){ $threads[$i]->join(); } shm_remove( $shmid ); shm_detach( $shmid ); ?>
六、线程池
一个Pool类
<?php class Update extends Thread { public $running = false; public $row = array(); public function __construct($row) { $this->row = $row; $this->sql = null; } public function run() { if(strlen($this->row['bankno']) > 100 ){ $bankno = safenet_decrypt($this->row['bankno']); }else{ $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']); file_put_contents("bankno_error.log", $error, FILE_APPEND); } if( strlen($bankno) > 7 ){ $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql; } printf("%s\n",$this->sql); } } class Pool { public $pool = array(); public function __construct($count) { $this->count = $count; } public function push($row){ if(count($this->pool) < $this->count){ $this->pool[] = new Update($row); return true; }else{ return false; } } public function start(){ foreach ( $this->pool as $id => $worker){ $this->pool[$id]->start(); } } public function join(){ foreach ( $this->pool as $id => $worker){ $this->pool[$id]->join(); } } public function clean(){ foreach ( $this->pool as $id => $worker){ if(! $worker->isRunning()){ unset($this->pool[$id]); } } } } try { $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select id,bankno from members order by id desc"; $row = $dbh->query($sql); $pool = new Pool(5); while($member = $row->fetch(PDO::FETCH_ASSOC)) { while(true){ if($pool->push($member)){ //压入任务到池中 break; }else{ //如果池已经满,就开始启动线程 $pool->start(); $pool->join(); $pool->clean(); } } } $pool->start(); $pool->join(); $dbh = null; } catch (Exception $e) { echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n"; } ?>
动态队列线程池
上面的例子是当线程池满后执行start统一启动,下面的例子是只要线程池中有空闲便立即创建新线程。
<?php class Update extends Thread { public $running = false; public $row = array(); public function __construct($row) { $this->row = $row; $this->sql = null; //print_r($this->row); } public function run() { if(strlen($this->row['bankno']) > 100 ){ $bankno = safenet_decrypt($this->row['bankno']); }else{ $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']); file_put_contents("bankno_error.log", $error, FILE_APPEND); } if( strlen($bankno) > 7 ){ $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql; } printf("%s\n",$this->sql); } } try { $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select id,bankno from members order by id desc limit 50"; $row = $dbh->query($sql); $pool = array(); while($member = $row->fetch(PDO::FETCH_ASSOC)) { $id = $member['id']; while (true){ if(count($pool) < 5){ $pool[$id] = new Update($member); $pool[$id]->start(); break; }else{ foreach ( $pool as $name => $worker){ if(! $worker->isRunning()){ unset($pool[$name]); } } } } } $dbh = null; } catch (Exception $e) { echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n"; } ?>
pthreads Pool类
<?php class WebWorker extends Worker { public function __construct(SafeLog $logger) { $this->logger = $logger; } protected $loger; } class WebWork extends Stackable { public function isComplete() { return $this->complete; } public function run() { $this->worker ->logger ->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId()); $this->complete = true; } protected $complete; } class SafeLog extends Stackable { protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf( "{$message}\n", $args); } } } $pool = new Pool(8, \WebWorker::class, [new SafeLog()]); $pool->submit($w=new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->shutdown(); $pool->collect(function($work){ return $work->isComplete(); }); var_dump($pool);
七、多线程文件安全读写
LOCK_SH 取得共享锁定(读取的程序)
LOCK_EX 取得独占锁定(写入的程序
LOCK_UN 释放锁定(无论共享或独占)
LOCK_NB 如果不希望 flock() 在锁定时堵塞
<?php $fp = fopen("/tmp/lock.txt", "r+"); if (flock($fp, LOCK_EX)) { // 进行排它型锁定 ftruncate($fp, 0); // truncate file fwrite($fp, "Write something here\n"); fflush($fp); // flush output before releasing the lock flock($fp, LOCK_UN); // 释放锁定 } else { echo "Couldn't get the lock!"; } fclose($fp); $fp = fopen('/tmp/lock.txt', 'r+'); if(!flock($fp, LOCK_EX | LOCK_NB)) { echo 'Unable to obtain lock'; exit(-1); } fclose($fp); ?>
八、多线程与数据连接
pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。
Worker 与 PDO
<?php class Work extends Stackable { public function __construct() { } public function run() { $dbh = $this->worker->getConnection(); $sql = "select id,name from members order by id desc limit "; $row = $dbh->query($sql); while($member = $row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } } } class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } /* * The run method should just prepare the environment for the work that is coming ... */ public function run(){ self::$dbh = new PDO('mysql:host=...;dbname=example','www',''); } public function getConnection(){ return self::$dbh; } } $worker = new ExampleWorker("My Worker Thread"); $work=new Work(); $worker->stack($work); $worker->start(); $worker->shutdown(); ?>
Pool 与 PDO
在线程池中链接数据库
# cat pool.php <?php class ExampleWorker extends Worker { public function __construct(Logging $logger) { $this->logger = $logger; } protected $logger; } /* the collectable class implements machinery for Pool::collect */ class Work extends Stackable { public function __construct($number) { $this->number = $number; } public function run() { $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example_real'; $dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'', PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); $sql = "select OPEN_TIME, `COMMENT` from MT_TRADES where LOGIN='".$this->number['name']."' and CMD='' and `COMMENT` = '".$this->number['order'].":DEPOSIT'"; #echo $sql; $row = $dbh->query($sql); $mt_trades = $row->fetch(PDO::FETCH_ASSOC); if($mt_trades){ $row = null; $sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';"; $dbh->query($sql); #printf("%s\n",$sql); } $dbh = null; printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']); } } class Logging extends Stackable { protected static $dbh; public function __construct() { $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example_real'; // 数据库名 self::$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); } protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf("{$message}\n", $args); } } protected function getConnection(){ return self::$dbh; } } $pool = new Pool(, \ExampleWorker::class, [new Logging()]); $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'db_example'; $dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select `order`,name from accounts where deposit_time is null order by id desc"; $row = $dbh->query($sql); while($account = $row->fetch(PDO::FETCH_ASSOC)) { $pool->submit(new Work($account)); } $pool->shutdown(); ?>
进一步改进上面程序,我们使用单例模式 $this->worker->getInstance(); 全局仅仅做一次数据库连接,线程使用共享的数据库连接
<?php class ExampleWorker extends Worker { #public function __construct(Logging $logger) { # $this->logger = $logger; #} #protected $logger; protected static $dbh; public function __construct() { } public function run(){ $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example'; // 数据库名 self::$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'', PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); } protected function getInstance(){ return self::$dbh; } } /* the collectable class implements machinery for Pool::collect */ class Work extends Stackable { public function __construct($data) { $this->data = $data; #print_r($data); } public function run() { #$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() ); try { $dbh = $this->worker->getInstance(); #print_r($dbh); $id = $this->data['id']; $mobile = safenet_decrypt($this->data['mobile']); #printf("%d, %s \n", $id, $mobile); if(strlen($mobile) > ){ $mobile = substr($mobile, -); } if($mobile == 'null'){ # $sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'"; # printf("%s\n",$sql); # $dbh->query($sql); $mobile = ''; $sql = "UPDATE members_digest SET mobile = :mobile where id = :id"; }else{ $sql = "UPDATE members_digest SET mobile = md(:mobile) where id = :id"; } $sth = $dbh->prepare($sql); $sth->bindValue(':mobile', $mobile); $sth->bindValue(':id', $id); $sth->execute(); #echo $sth->debugDumpParams(); } catch(PDOException $e) { $error = sprintf("%s,%s\n", $mobile, $id ); file_put_contents("mobile_error.log", $error, FILE_APPEND); } #$dbh = null; printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id); #printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number); } } $pool = new Pool(, \ExampleWorker::class, []); #foreach (range(, ) as $number) { # $pool->submit(new Work($number)); #} $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example'; $dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); #print_r($dbh); #$sql = "select id, mobile from members where id < :id"; #$sth = $dbh->prepare($sql); #$sth->bindValue(':id',); #$sth->execute(); #$result = $sth->fetchAll(); #print_r($result); # #$sql = "UPDATE members_digest SET mobile = :mobile where id = :id"; #$sth = $dbh->prepare($sql); #$sth->bindValue(':mobile', 'aa'); #$sth->bindValue(':id',''); #echo $sth->execute(); #echo $sth->queryString; #echo $sth->debugDumpParams(); $sql = "select id, mobile from members order by id asc"; // limit "; $row = $dbh->query($sql); while($members = $row->fetch(PDO::FETCH_ASSOC)) { #$order = $account['order']; #printf("%s\n",$order); //print_r($members); $pool->submit(new Work($members)); #unset($account['order']); } $pool->shutdown(); ?>
多线程中操作数据库总结
总的来说 pthreads 仍然处在发展中,仍有一些不足的地方,我们也可以看到pthreads的git在不断改进这个项目
数据库持久链接很重要,否则每个线程都会开启一次数据库连接,然后关闭,会导致很多链接超时。
<?php $dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array( PDO::ATTR_PERSISTENT => true )); ?>
关于php pthreads多线程的安装与使用的相关知识,就先给大家介绍到这里,后续还会持续更新。

PHP and Python each have their own advantages, and the choice should be based on project requirements. 1.PHPは、シンプルな構文と高い実行効率を備えたWeb開発に適しています。 2。Pythonは、簡潔な構文とリッチライブラリを備えたデータサイエンスと機械学習に適しています。

PHPは死にかけていませんが、常に適応して進化しています。 1)PHPは、1994年以来、新しいテクノロジーの傾向に適応するために複数のバージョンの反復を受けています。 2)現在、電子商取引、コンテンツ管理システム、その他の分野で広く使用されています。 3)PHP8は、パフォーマンスと近代化を改善するために、JITコンパイラおよびその他の機能を導入します。 4)Opcacheを使用してPSR-12標準に従って、パフォーマンスとコードの品質を最適化します。

PHPの将来は、新しいテクノロジーの傾向に適応し、革新的な機能を導入することで達成されます。1)クラウドコンピューティング、コンテナ化、マイクロサービスアーキテクチャに適応し、DockerとKubernetesをサポートします。 2)パフォーマンスとデータ処理の効率を改善するために、JITコンパイラと列挙タイプを導入します。 3)パフォーマンスを継続的に最適化し、ベストプラクティスを促進します。

PHPでは、特性は方法が必要な状況に適していますが、継承には適していません。 1)特性により、クラスの多重化方法が複数の継承の複雑さを回避できます。 2)特性を使用する場合、メソッドの競合に注意を払う必要があります。メソッドの競合は、代替およびキーワードとして解決できます。 3)パフォーマンスを最適化し、コードメンテナビリティを改善するために、特性の過剰使用を避け、その単一の責任を維持する必要があります。

依存関係噴射コンテナ(DIC)は、PHPプロジェクトで使用するオブジェクト依存関係を管理および提供するツールです。 DICの主な利点には、次のものが含まれます。1。デカップリング、コンポーネントの独立したもの、およびコードの保守とテストが簡単です。 2。柔軟性、依存関係を交換または変更しやすい。 3.テスト可能性、単体テストのために模擬オブジェクトを注入するのに便利です。

SplfixedArrayは、PHPの固定サイズの配列であり、高性能と低いメモリの使用が必要なシナリオに適しています。 1)動的調整によって引き起こされるオーバーヘッドを回避するために、作成時にサイズを指定する必要があります。 2)C言語アレイに基づいて、メモリと高速アクセス速度を直接動作させます。 3)大規模なデータ処理とメモリに敏感な環境に適していますが、サイズが固定されているため、注意して使用する必要があります。

PHPは、$ \ _ファイル変数を介してファイルのアップロードを処理します。セキュリティを確保するための方法には次のものが含まれます。1。アップロードエラー、2。ファイルの種類とサイズを確認する、3。ファイル上書きを防ぐ、4。ファイルを永続的なストレージの場所に移動します。

JavaScriptでは、nullcoalescingoperator(??)およびnullcoalescingsignmentoperator(?? =)を使用できます。 1.??最初の非潜水金または非未定されたオペランドを返します。 2.??これらの演算子は、コードロジックを簡素化し、読みやすさとパフォーマンスを向上させます。


ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

MantisBT
Mantis は、製品の欠陥追跡を支援するために設計された、導入が簡単な Web ベースの欠陥追跡ツールです。 PHP、MySQL、Web サーバーが必要です。デモおよびホスティング サービスをチェックしてください。

ZendStudio 13.5.1 Mac
強力な PHP 統合開発環境

SublimeText3 中国語版
中国語版、とても使いやすい

PhpStorm Mac バージョン
最新(2018.2.1)のプロフェッショナル向けPHP統合開発ツール

SecLists
SecLists は、セキュリティ テスターの究極の相棒です。これは、セキュリティ評価中に頻繁に使用されるさまざまな種類のリストを 1 か所にまとめたものです。 SecLists は、セキュリティ テスターが必要とする可能性のあるすべてのリストを便利に提供することで、セキュリティ テストをより効率的かつ生産的にするのに役立ちます。リストの種類には、ユーザー名、パスワード、URL、ファジング ペイロード、機密データ パターン、Web シェルなどが含まれます。テスターはこのリポジトリを新しいテスト マシンにプルするだけで、必要なあらゆる種類のリストにアクセスできるようになります。
