phpのpthreadの使い方

藏色散人
藏色散人オリジナル
2021-09-15 10:20:422058ブラウズ

php pthreads の使用方法: 1. 「pecl install pthreads」を通じて pthreads をインストールします; 2. 複数のスレッドを制御する必要があり、同時に 1 つのスレッドのみが動作できる場合は、mutex ロックを使用します。

phpのpthreadの使い方

この記事の動作環境:Windows7システム、php7.0.2バージョン、DELL G3コンピュータ

phpのインストールと使い方マルチスレッド pthreads

Pthreads をインストールするには、基本的に PHP を再コンパイルして --enable-maintainer-zts パラメータを追加する必要がありますが、これを使用するためのドキュメントはほとんどありません。運用環境です。笑えるだけなので、これを試してみてください。実際のマルチスレッドの場合は、やはり Python、C などを使用する必要があります。

次のコードのほとんどはインターネットから取得したものです

1. インストール

ここで使用するのは php-7.0.2

./configure \
--prefix=/usr/local/php7 \
--with-config-file-path=/etc \
--with-config-file-scan-dir=/etc/php.d \
--enable-debug \
--enable-maintainer-zts \
--enable-pcntl \
--enable-fpm \
--enable-opcache \
--enable-embed=shared \
--enable-json=shared \
--enable-phpdbg \
--with-curl=shared \
--with-mysql=/usr/local/mysql \
--with-mysqli=/usr/local/mysql/bin/mysql_config \
--with-pdo-mysql


make && make install

pthreads

pecl install pthreads

2 をインストールします。ワーカーとスタッカブル

スタッカブルはワーカー スレッドによって実行されるタスクです。スタッカブル オブジェクトの実行前、実行後、実行中に同期、読み取り、書き込みを行うことができます。

<?php
#1
$thread = new class extends Thread {
	public function run() {
		echo "Hello World {$this->getThreadId()}\n";                                                                                  
	}   
};

$thread->start() && $thread->join();



#2

class workerThread extends Thread { 
	public function __construct($i){
		$this->i=$i;
	}

	public function run(){
		while(true){
			echo $this->i."\n";
			sleep(1);
		} 
	} 
}

for($i=0;$i<50;$i++){
	$workers[$i]=new workerThread($i);
	$workers[$i]->start();
}

?>

4. ミューテックス ロック

何 ミューテックス ロックはどのような状況で使用されますか?複数のスレッドを制御する必要があり、同時に動作できるスレッドが 1 つだけである場合に使用できます。ミューテックス ロックがある場合とない場合の違いを示す簡単なカウンター プログラム

<?php
class SQLQuery extends Stackable {

	public function __construct($sql) {
		$this->sql = $sql;
	}

	public function run() {
		$dbh  = $this->worker->getConnection();
		$row = $dbh->query($this->sql);
		while($member = $row->fetch(PDO::FETCH_ASSOC)){
			print_r($member);
		}
	}

}

class ExampleWorker extends Worker {
	public static $dbh;
	public function __construct($name) {
	}

	public function run(){
		self::$dbh = new PDO(&#39;mysql:host=10.0.0.30;dbname=testdb&#39;,&#39;root&#39;,&#39;123456&#39;);
	}
	public function getConnection(){
		return self::$dbh;
	}
}

$worker = new ExampleWorker("My Worker Thread");

$sql1 = new SQLQuery(&#39;select * from test order by id desc limit 1,5&#39;);
$worker->stack($sql1);

$sql2 = new SQLQuery(&#39;select * from test order by id desc limit 5,5&#39;);
$worker->stack($sql2);

$worker->start();
$worker->shutdown();
?>

複数のスレッドと共有メモリ

共有メモリの例では、ロックは使用されていないため、正常に動作する可能性があります。おそらく作業メモリの操作自体にロック機能がある可能性があります

<?php
$counter = 0;
$handle=fopen("/tmp/counter.txt", "w");
fwrite($handle, $counter );
fclose($handle);

class CounterThread extends Thread {
	public function __construct($mutex = null){
		$this->mutex = $mutex;
		$this->handle = fopen("/tmp/counter.txt", "w+");
	}
	public function __destruct(){
		fclose($this->handle);
	}
	public function run() {
		if($this->mutex)
			$locked=Mutex::lock($this->mutex);

		$counter = intval(fgets($this->handle));
		$counter++;
		rewind($this->handle);
		fputs($this->handle, $counter );
		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

		if($this->mutex)
			Mutex::unlock($this->mutex);
	}
}

//没有互斥锁
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread();
	$threads[$i]->start();

}

//加入互斥锁
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread($mutex);
	$threads[$i]->start();

}

Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
	$threads[$i]->join();
}
Mutex::destroy($mutex);

?>

5. スレッドの同期

シナリオによっては、thread->start() でプログラムの実行を開始したくないが、私たちを待つスレッドのコマンド。 $thread->wait(); のテスト効果は、スレッドが thread->start() の直後には実行されないことです。$thread->notify();## からのシグナルを受信した後にのみ実行されます。 #
<?php
$tmp = tempnam(__FILE__, &#39;PHP&#39;);
$key = ftok($tmp, &#39;a&#39;);

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
		$this->shmid = $shmid;
	}
	public function run() {

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
	}
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>

6. スレッド プール

A プール クラス

<?php
$tmp = tempnam(__FILE__, &#39;PHP&#39;);
$key = ftok($tmp, &#39;a&#39;);

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
		$this->shmid = $shmid;
	}
	public function run() {

		$this->synchronized(function($thread){
				$thread->wait();
				}, $this);

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
	}
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->synchronized(function($thread){
			$thread->notify();
			}, $threads[$i]);
}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>		

ダイナミック キュー スレッド プール

上記の例は、スレッド プールがいっぱいになったときに start を実行します。次の例は、スレッド プールに空きができたらすぐに新しいスレッドを作成します。

<?php
class Update extends Thread {

    public $running = false;
    public $row = array();
    public function __construct($row) {

	$this->row = $row;
        $this->sql = null;
    }

    public function run() {

	if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
		$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
	}else{
		$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
		file_put_contents("bankno_error.log", $error, FILE_APPEND);
	}

	if( strlen($bankno) > 7 ){
		$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);

		$this->sql = $sql;
	}

	printf("%s\n",$this->sql);
    }

}

class Pool {
	public $pool = array();
	public function __construct($count) {
		$this->count = $count;
	}
	public function push($row){
		if(count($this->pool) < $this->count){
			$this->pool[] = new Update($row);
			return true;
		}else{
			return false;
		}
	}
	public function start(){
		foreach ( $this->pool as $id => $worker){
			$this->pool[$id]->start();
		}
	}
	public function join(){
		foreach ( $this->pool as $id => $worker){
               $this->pool[$id]->join();
		}
	}
	public function clean(){
		foreach ( $this->pool as $id => $worker){
			if(! $worker->isRunning()){
            	unset($this->pool[$id]);
            }
		}
	}
}

try {
	$dbh    = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
		PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
		PDO::MYSQL_ATTR_COMPRESS => true
		)
	);

	$sql  = "select id,bankno from members order by id desc";
	$row = $dbh->query($sql);
	$pool = new Pool(5);
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{

		while(true){
			if($pool->push($member)){ //压入任务到池中
				break;
			}else{ //如果池已经满,就开始启动线程
				$pool->start();
				$pool->join();
				$pool->clean();
			}
		}
	}
	$pool->start();
    $pool->join();

	$dbh = null;

} catch (Exception $e) {
    echo &#39;[&#39; , date(&#39;H:i:s&#39;) , &#39;]&#39;, &#39;系统错误&#39;, $e->getMessage(), "\n";
}
?>

pthreads プール クラス

<?php
class Update extends Thread {

	public $running = false;
	public $row = array();
	public function __construct($row) {

		$this->row = $row;
		$this->sql = null;
		//print_r($this->row);
	}

	public function run() {

		if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
			$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
		}else{
			$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
			file_put_contents("bankno_error.log", $error, FILE_APPEND);
		}

		if( strlen($bankno) > 7 ){
			$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);

			$this->sql = $sql;
		}

		printf("%s\n",$this->sql);
	}

}

try {
	$dbh    = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
				PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
				PDO::MYSQL_ATTR_COMPRESS => true
				)
			);

	$sql     = "select id,bankno from members order by id desc limit 50";

	$row = $dbh->query($sql);
	$pool = array();
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{
		$id 	= $member[&#39;id&#39;];
		while (true){
			if(count($pool) < 5){
				$pool[$id] = new Update($member);
				$pool[$id]->start();
				break;
			}else{
				foreach ( $pool as $name => $worker){
					if(! $worker->isRunning()){
						unset($pool[$name]);
					}
				}
			}
		}

	}

	$dbh = null;

} catch (Exception $e) {
	echo &#39;【&#39; , date(&#39;H:i:s&#39;) , &#39;】&#39;, &#39;【系统错误】&#39;, $e->getMessage(), "\n";
}
?> 

7. マルチスレッド ファイルの安全な読み取りと書き込み

LOCK_SH 共有ロックの取得 (プログラムの読み取り)
  • #LOCK_EX 排他ロックの取得 (プログラムの書き込み)

  • #LOCK_UN ロックの解放 (共有か排他かを問わず)

  • LOCK_NB ロック中に flock() をブロックしたくない場合

  • <?php
    
    class WebWorker extends Worker {
    
    	public function __construct(SafeLog $logger) {
    		$this->logger = $logger;
    	}
    
    	protected $loger;
    }
    
    class WebWork extends Stackable {
    
    	public function isComplete() {
    		return $this->complete;
    	}
    
    	public function run() {
    		$this->worker
    			->logger
    			->log("%s executing in Thread #%lu",
    					__CLASS__, $this->worker->getThreadId());
    		$this->complete = true;
    	}
    
    	protected $complete;
    }
    
    class SafeLog extends Stackable {
    
    	protected function log($message, $args = []) {
    		$args = func_get_args();
    
    		if (($message = array_shift($args))) {
    			echo vsprintf(
    					"{$message}\n", $args);
    		}
    	}
    }
    
    
    $pool = new Pool(8, \WebWorker::class, [new SafeLog()]);
    
    $pool->submit($w=new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->shutdown();
    
    $pool->collect(function($work){
    		return $work->isComplete();
    		});
    
    var_dump($pool);			

    8. マルチスレッドとデータ接続

  • pthread と pdo を同時に使用する場合静的宣言 public static $dbh; を実行し、シングルトン モードでデータベース接続にアクセスします。 #スレッド プール内のデータベースをリンクします
<?php

$fp = fopen("/tmp/lock.txt", "r+");
if (flock($fp, LOCK_EX)) {  // 进行排它型锁定
	ftruncate($fp, 0);      // truncate file
	fwrite($fp, "Write something here\n");
	fflush($fp);            // flush output before releasing the lock
	flock($fp, LOCK_UN);    // 释放锁定
} else {
	echo "Couldn&#39;t get the lock!";
}
fclose($fp);



$fp = fopen(&#39;/tmp/lock.txt&#39;, &#39;r+&#39;);
if(!flock($fp, LOCK_EX | LOCK_NB)) {
	echo &#39;Unable to obtain lock&#39;;
	exit(-1);
}
fclose($fp);
?>

上記のプログラムをさらに改善するために、シングルトン モード $this->worker->getInstance(); を使用して、グローバルに 1 つのデータベース接続のみを作成します。スレッドは共有データベース接続を使用します

<?php
class Work extends Stackable {

        public function __construct() {
        }

        public function run() {
                $dbh  = $this->worker->getConnection();
                $sql     = "select id,name from members order by id desc limit 50";
                $row = $dbh->query($sql);
                while($member = $row->fetch(PDO::FETCH_ASSOC)){
                        print_r($member);
                }
        }

}

class ExampleWorker extends Worker {
        public static $dbh;
        public function __construct($name) {
        }

        /*
        * The run method should just prepare the environment for the work that is coming ...
        */
        public function run(){
                self::$dbh = new PDO(&#39;mysql:host=192.168.2.1;dbname=example&#39;,&#39;www&#39;,&#39;123456&#39;);
        }
        public function getConnection(){
                return self::$dbh;
        }
}

$worker = new ExampleWorker("My Worker Thread");

$work=new Work();
$worker->stack($work);

$worker->start();
$worker->shutdown();
?>

マルチスレッドで動作するデータベースの概要

一般に、pthreads はまだ開発中であり、まだいくつかの欠点があります。は、このプロジェクトを継続的に改善しています。

データベースの永続リンクは非常に重要です。そうしないと、各スレッドがデータベース接続を開いてから閉じることになり、多くのリンク タイムアウトが発生します。

# cat pool.php
<?php
class ExampleWorker extends Worker {

	public function __construct(Logging $logger) {
		$this->logger = $logger;
	}

	protected $logger;
}

/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
	public function __construct($number) {
		$this->number = $number;
	}
	public function run() {
                $dbhost = &#39;db.example.com&#39;;               // 数据库服务器
                $dbuser = &#39;example.com&#39;;                 // 数据库用户名
                $dbpw = &#39;password&#39;;                               // 数据库密码
                $dbname = &#39;example_real&#39;;
		$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
                        PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
                        PDO::MYSQL_ATTR_COMPRESS => true,
			PDO::ATTR_PERSISTENT => true
                        )
                );
		$sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN=&#39;".$this->number[&#39;name&#39;]."&#39; and CMD=&#39;6&#39; and `COMMENT` = &#39;".$this->number[&#39;order&#39;].":DEPOSIT&#39;";
		#echo $sql;
		$row = $dbh->query($sql);
		$mt4_trades  = $row->fetch(PDO::FETCH_ASSOC);
		if($mt4_trades){

			$row = null;

			$sql = "UPDATE db_example.accounts SET paystatus=&#39;成功&#39;, deposit_time=&#39;".$mt4_trades[&#39;OPEN_TIME&#39;]."&#39; where `order` = &#39;".$this->number[&#39;order&#39;]."&#39;;";
			$dbh->query($sql);
			#printf("%s\n",$sql);
		}
		$dbh = null;
		printf("runtime: %s, %s, %s\n", date(&#39;Y-m-d H:i:s&#39;), $this->worker->getThreadId() ,$this->number[&#39;order&#39;]);

	}
}

class Logging extends Stackable {
	protected  static $dbh;
	public function __construct() {
		$dbhost = &#39;db.example.com&#39;;			// 数据库服务器
	        $dbuser = &#39;example.com&#39;;                 // 数据库用户名
        	$dbpw = &#39;password&#39;;                               // 数据库密码
		$dbname = &#39;example_real&#39;;			// 数据库名

		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
			PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
			PDO::MYSQL_ATTR_COMPRESS => true
			)
		);

	}
	protected function log($message, $args = []) {
		$args = func_get_args();

		if (($message = array_shift($args))) {
			echo vsprintf("{$message}\n", $args);
		}
	}

	protected function getConnection(){
                return self::$dbh;
        }
}

$pool = new Pool(200, \ExampleWorker::class, [new Logging()]);

$dbhost = &#39;db.example.com&#39;;                      // 数据库服务器
$dbuser = &#39;example.com&#39;;                 // 数据库用户名
$dbpw = &#39;password&#39;;                               // 数据库密码
$dbname = &#39;db_example&#39;;
$dbh    = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
                        PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
                        PDO::MYSQL_ATTR_COMPRESS => true
                        )
                );
$sql = "select `order`,name from accounts where deposit_time is null order by id desc";

$row = $dbh->query($sql);
while($account = $row->fetch(PDO::FETCH_ASSOC))
{
        $pool->submit(new Work($account));
}

$pool->shutdown();

?> 

推奨される学習: "

PHP ビデオ チュートリアル

"

以上がphpのpthreadの使い方の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

関連記事

続きを見る