Heim >Backend-Entwicklung >PHP-Problem >So verwenden Sie PHP-Pthreads

So verwenden Sie PHP-Pthreads

藏色散人
藏色散人Original
2021-09-15 10:20:422094Durchsuche

So verwenden Sie PHP-Pthreads: 1. Installieren Sie Pthreads über „pecl install pthreads“ 2. Verwenden Sie eine Mutex-Sperre, wenn Sie mehrere Threads steuern müssen und nur ein Thread gleichzeitig arbeiten kann.

So verwenden Sie PHP-Pthreads

Die Betriebsumgebung dieses Artikels: Windows7-System, PHP7.0.2-Version, DELL G3-Computer

Installation und Verwendung von PHP-Multithread-PThreads

Die Installation von PThreads erfordert grundsätzlich eine Neukompilierung von PHP und das Hinzufügen von --enable - maintainer-zts-Parameter, aber dieses Dokument wird selten verwendet; es wird viele Fehler und viele unerwartete Probleme geben, und die Produktionsumgebung kann nur haha ​​sein, also spielen Sie einfach mit diesem Ding, für echtes Multithreading verwenden Sie Python, C, usw.

Der größte Teil des folgenden Codes stammt aus dem Internet

Was hier verwendet wird, ist PHP-7.0.2

./configure \
--prefix=/usr/local/php7 \
--with-config-file-path=/etc \
--with-config-file-scan-dir=/etc/php.d \
--enable-debug \
--enable-maintainer-zts \
--enable-pcntl \
--enable-fpm \
--enable-opcache \
--enable-embed=shared \
--enable-json=shared \
--enable-phpdbg \
--with-curl=shared \
--with-mysql=/usr/local/mysql \
--with-mysqli=/usr/local/mysql/bin/mysql_config \
--with-pdo-mysql


make && make install

Installieren Sie pthreads

pecl install pthreads

3. Worker und Stackable

Stackables sind Aufgaben, die von Worker-Threads ausgeführt werden. Sie können Stackable-Objekte vor, nach und während ihrer Ausführung synchronisieren.

<?php
#1
$thread = new class extends Thread {
	public function run() {
		echo "Hello World {$this->getThreadId()}\n";                                                                                  
	}   
};

$thread->start() && $thread->join();



#2

class workerThread extends Thread { 
	public function __construct($i){
		$this->i=$i;
	}

	public function run(){
		while(true){
			echo $this->i."\n";
			sleep(1);
		} 
	} 
}

for($i=0;$i<50;$i++){
	$workers[$i]=new workerThread($i);
	$workers[$i]->start();
}

?>

Unter welchen Umständen wird eine Mutex-Sperre verwendet? ? Es kann verwendet werden, wenn Sie mehrere Threads steuern müssen und nur ein Thread gleichzeitig arbeiten kann. Ein einfaches Zählerprogramm, um den Unterschied mit und ohne Mutex-Sperre zu veranschaulichen lock-Funktion

<?php
class SQLQuery extends Stackable {

	public function __construct($sql) {
		$this->sql = $sql;
	}

	public function run() {
		$dbh  = $this->worker->getConnection();
		$row = $dbh->query($this->sql);
		while($member = $row->fetch(PDO::FETCH_ASSOC)){
			print_r($member);
		}
	}

}

class ExampleWorker extends Worker {
	public static $dbh;
	public function __construct($name) {
	}

	public function run(){
		self::$dbh = new PDO(&#39;mysql:host=10.0.0.30;dbname=testdb&#39;,&#39;root&#39;,&#39;123456&#39;);
	}
	public function getConnection(){
		return self::$dbh;
	}
}

$worker = new ExampleWorker("My Worker Thread");

$sql1 = new SQLQuery(&#39;select * from test order by id desc limit 1,5&#39;);
$worker->stack($sql1);

$sql2 = new SQLQuery(&#39;select * from test order by id desc limit 5,5&#39;);
$worker->stack($sql2);

$worker->start();
$worker->shutdown();
?>
5. Thread-Synchronisierung

In einigen Szenarien möchten wir nicht, dass thread->start() das Programm ausführt, sondern dass der Thread auf unseren Befehl wartet. Die Testfunktion von $thread->wait(); besteht darin, dass der Thread nicht sofort nach thread->start() ausgeführt wird. Er wird erst ausgeführt, nachdem das Signal von $thread->notify() empfangen wurde

6. Thread-Pool

Eine Pool-Klasse

<?php
$counter = 0;
$handle=fopen("/tmp/counter.txt", "w");
fwrite($handle, $counter );
fclose($handle);

class CounterThread extends Thread {
	public function __construct($mutex = null){
		$this->mutex = $mutex;
		$this->handle = fopen("/tmp/counter.txt", "w+");
	}
	public function __destruct(){
		fclose($this->handle);
	}
	public function run() {
		if($this->mutex)
			$locked=Mutex::lock($this->mutex);

		$counter = intval(fgets($this->handle));
		$counter++;
		rewind($this->handle);
		fputs($this->handle, $counter );
		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

		if($this->mutex)
			Mutex::unlock($this->mutex);
	}
}

//没有互斥锁
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread();
	$threads[$i]->start();

}

//加入互斥锁
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread($mutex);
	$threads[$i]->start();

}

Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
	$threads[$i]->join();
}
Mutex::destroy($mutex);

?>

Dynamischer Warteschlangen-Thread-Pool

Das obige Beispiel besteht darin, „start“ auszuführen, wenn der Thread-Pool voll ist, und ihn einheitlich zu starten ist Leerlaufzeit im Thread-Pool.

<?php
$tmp = tempnam(__FILE__, &#39;PHP&#39;);
$key = ftok($tmp, &#39;a&#39;);

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
		$this->shmid = $shmid;
	}
	public function run() {

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
	}
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>

pthreads Pool-Klasse

<?php
$tmp = tempnam(__FILE__, &#39;PHP&#39;);
$key = ftok($tmp, &#39;a&#39;);

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
		$this->shmid = $shmid;
	}
	public function run() {

		$this->synchronized(function($thread){
				$thread->wait();
				}, $this);

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
	}
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->synchronized(function($thread){
			$thread->notify();
			}, $threads[$i]);
}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>		

7. Multithread-Datei-sicheres Lesen und Schreiben

LOCK_SH erwirbt eine gemeinsame Sperre (Leseprogramm)

LOCK_EX erwirbt eine exklusive Sperre (Schreibprogramm)

    LOCK_UN Freigabesperre ( ob geteilt oder exklusiv)
  • LOCK_NB Wenn Sie nicht möchten, dass flock() beim Sperren blockiert
  • <?php
    class Update extends Thread {
    
        public $running = false;
        public $row = array();
        public function __construct($row) {
    
    	$this->row = $row;
            $this->sql = null;
        }
    
        public function run() {
    
    	if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
    		$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
    	}else{
    		$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
    		file_put_contents("bankno_error.log", $error, FILE_APPEND);
    	}
    
    	if( strlen($bankno) > 7 ){
    		$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);
    
    		$this->sql = $sql;
    	}
    
    	printf("%s\n",$this->sql);
        }
    
    }
    
    class Pool {
    	public $pool = array();
    	public function __construct($count) {
    		$this->count = $count;
    	}
    	public function push($row){
    		if(count($this->pool) < $this->count){
    			$this->pool[] = new Update($row);
    			return true;
    		}else{
    			return false;
    		}
    	}
    	public function start(){
    		foreach ( $this->pool as $id => $worker){
    			$this->pool[$id]->start();
    		}
    	}
    	public function join(){
    		foreach ( $this->pool as $id => $worker){
                   $this->pool[$id]->join();
    		}
    	}
    	public function clean(){
    		foreach ( $this->pool as $id => $worker){
    			if(! $worker->isRunning()){
                	unset($this->pool[$id]);
                }
    		}
    	}
    }
    
    try {
    	$dbh    = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
    		PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
    		PDO::MYSQL_ATTR_COMPRESS => true
    		)
    	);
    
    	$sql  = "select id,bankno from members order by id desc";
    	$row = $dbh->query($sql);
    	$pool = new Pool(5);
    	while($member = $row->fetch(PDO::FETCH_ASSOC))
    	{
    
    		while(true){
    			if($pool->push($member)){ //压入任务到池中
    				break;
    			}else{ //如果池已经满,就开始启动线程
    				$pool->start();
    				$pool->join();
    				$pool->clean();
    			}
    		}
    	}
    	$pool->start();
        $pool->join();
    
    	$dbh = null;
    
    } catch (Exception $e) {
        echo &#39;[&#39; , date(&#39;H:i:s&#39;) , &#39;]&#39;, &#39;系统错误&#39;, $e->getMessage(), "\n";
    }
    ?>
  • 8. Multithreading und Datenverbindung
  • Bei gleichzeitiger Verwendung von pthreads mit pdo benötigen Sie Achten Sie auf eine Sache, die eine statische Deklaration erfordert: public static $dbh; und greifen Sie über den Singleton-Modus auf die Datenbankverbindung zu Um das obige Programm zu verbessern, verwenden wir den Singleton-Modus $this-> ;worker->getInstance(); stellt global nur eine Datenbankverbindung her und Threads verwenden gemeinsam genutzte Datenbankverbindungen

    <?php
    class Update extends Thread {
    
    	public $running = false;
    	public $row = array();
    	public function __construct($row) {
    
    		$this->row = $row;
    		$this->sql = null;
    		//print_r($this->row);
    	}
    
    	public function run() {
    
    		if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
    			$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
    		}else{
    			$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
    			file_put_contents("bankno_error.log", $error, FILE_APPEND);
    		}
    
    		if( strlen($bankno) > 7 ){
    			$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);
    
    			$this->sql = $sql;
    		}
    
    		printf("%s\n",$this->sql);
    	}
    
    }
    
    try {
    	$dbh    = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
    				PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
    				PDO::MYSQL_ATTR_COMPRESS => true
    				)
    			);
    
    	$sql     = "select id,bankno from members order by id desc limit 50";
    
    	$row = $dbh->query($sql);
    	$pool = array();
    	while($member = $row->fetch(PDO::FETCH_ASSOC))
    	{
    		$id 	= $member[&#39;id&#39;];
    		while (true){
    			if(count($pool) < 5){
    				$pool[$id] = new Update($member);
    				$pool[$id]->start();
    				break;
    			}else{
    				foreach ( $pool as $name => $worker){
    					if(! $worker->isRunning()){
    						unset($pool[$name]);
    					}
    				}
    			}
    		}
    
    	}
    
    	$dbh = null;
    
    } catch (Exception $e) {
    	echo &#39;【&#39; , date(&#39;H:i:s&#39;) , &#39;】&#39;, &#39;【系统错误】&#39;, $e->getMessage(), "\n";
    }
    ?> 
  • Zusammenfassung der Betriebsdatenbanken in Multi-Threads
  • In Im Allgemeinen befindet sich pthreads noch in der Entwicklung und es gibt noch einige Mängel. Wir können auch sehen, dass der Git von pthreads dieses Projekt ständig verbessert.

  • Persistente Datenbankverbindungen sind sehr wichtig, da sonst jeder Thread eine Datenbankverbindung öffnet und diese dann schließt. was viele Link-Timeouts verursachen wird
<?php

class WebWorker extends Worker {

	public function __construct(SafeLog $logger) {
		$this->logger = $logger;
	}

	protected $loger;
}

class WebWork extends Stackable {

	public function isComplete() {
		return $this->complete;
	}

	public function run() {
		$this->worker
			->logger
			->log("%s executing in Thread #%lu",
					__CLASS__, $this->worker->getThreadId());
		$this->complete = true;
	}

	protected $complete;
}

class SafeLog extends Stackable {

	protected function log($message, $args = []) {
		$args = func_get_args();

		if (($message = array_shift($args))) {
			echo vsprintf(
					"{$message}\n", $args);
		}
	}
}


$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);

$pool->submit($w=new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->shutdown();

$pool->collect(function($work){
		return $work->isComplete();
		});

var_dump($pool);			

Empfohlenes Lernen: „

PHP. Video-Tutorial

Das obige ist der detaillierte Inhalt vonSo verwenden Sie PHP-Pthreads. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn

In Verbindung stehende Artikel

Mehr sehen