Maison  >  Article  >  développement back-end  >  Comment utiliser les pthreads php

Comment utiliser les pthreads php

藏色散人
藏色散人original
2021-09-15 10:20:422003parcourir

Comment utiliser php pthreads : 1. Installez pthreads via "pecl install pthreads" ; 2. Utilisez un verrou mutex lorsque vous devez contrôler plusieurs threads et qu'un seul thread peut fonctionner en même temps.

Comment utiliser les pthreads php

L'environnement d'exploitation de cet article : système Windows7, version php7.0.2, ordinateur DELL G3

Installation et utilisation de pthreads multithread php

L'installation de Pthreads nécessite essentiellement de recompiler PHP, en ajoutant --enable - paramètre mainteneur-zts, mais ce document est rarement utilisé ; il y aura de nombreux bugs et de nombreux problèmes inattendus, et l'environnement de production ne peut être que haha, alors jouez simplement avec ce truc, pour un vrai multi-threading, utilisez Python, C, etc.

La plupart du code suivant provient d'Internet

1 Installation

Ce qui est utilisé ici est php-7.0.2

./configure \
--prefix=/usr/local/php7 \
--with-config-file-path=/etc \
--with-config-file-scan-dir=/etc/php.d \
--enable-debug \
--enable-maintainer-zts \
--enable-pcntl \
--enable-fpm \
--enable-opcache \
--enable-embed=shared \
--enable-json=shared \
--enable-phpdbg \
--with-curl=shared \
--with-mysql=/usr/local/mysql \
--with-mysqli=/usr/local/mysql/bin/mysql_config \
--with-pdo-mysql


make && make install

Installer pthreads

pecl install pthreads

2.

Les Stackables sont des tâches exécutées par les threads de travail. Vous pouvez synchroniser, lire et écrire des objets Stackable avant, après et pendant leur exécution.

<?php
#1
$thread = new class extends Thread {
	public function run() {
		echo "Hello World {$this->getThreadId()}\n";                                                                                  
	}   
};

$thread->start() && $thread->join();



#2

class workerThread extends Thread { 
	public function __construct($i){
		$this->i=$i;
	}

	public function run(){
		while(true){
			echo $this->i."\n";
			sleep(1);
		} 
	} 
}

for($i=0;$i<50;$i++){
	$workers[$i]=new workerThread($i);
	$workers[$i]->start();
}

?>

4. ? Il peut être utilisé lorsque vous devez contrôler plusieurs threads et qu’un seul thread peut fonctionner en même temps. Un programme de compteur simple pour illustrer la différence avec et sans verrou mutex

<?php
class SQLQuery extends Stackable {

	public function __construct($sql) {
		$this->sql = $sql;
	}

	public function run() {
		$dbh  = $this->worker->getConnection();
		$row = $dbh->query($this->sql);
		while($member = $row->fetch(PDO::FETCH_ASSOC)){
			print_r($member);
		}
	}

}

class ExampleWorker extends Worker {
	public static $dbh;
	public function __construct($name) {
	}

	public function run(){
		self::$dbh = new PDO(&#39;mysql:host=10.0.0.30;dbname=testdb&#39;,&#39;root&#39;,&#39;123456&#39;);
	}
	public function getConnection(){
		return self::$dbh;
	}
}

$worker = new ExampleWorker("My Worker Thread");

$sql1 = new SQLQuery(&#39;select * from test order by id desc limit 1,5&#39;);
$worker->stack($sql1);

$sql2 = new SQLQuery(&#39;select * from test order by id desc limit 5,5&#39;);
$worker->stack($sql2);

$worker->start();
$worker->shutdown();
?>
Threads multiples et mémoire partagée

Dans l'exemple de la mémoire partagée, aucun verrou n'est utilisé et il se peut que cela fonctionne toujours normalement. Peut-être que l'opération de mémoire de travail elle-même a un problème. lock Function

<?php
$counter = 0;
$handle=fopen("/tmp/counter.txt", "w");
fwrite($handle, $counter );
fclose($handle);

class CounterThread extends Thread {
	public function __construct($mutex = null){
		$this->mutex = $mutex;
		$this->handle = fopen("/tmp/counter.txt", "w+");
	}
	public function __destruct(){
		fclose($this->handle);
	}
	public function run() {
		if($this->mutex)
			$locked=Mutex::lock($this->mutex);

		$counter = intval(fgets($this->handle));
		$counter++;
		rewind($this->handle);
		fputs($this->handle, $counter );
		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

		if($this->mutex)
			Mutex::unlock($this->mutex);
	}
}

//没有互斥锁
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread();
	$threads[$i]->start();

}

//加入互斥锁
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread($mutex);
	$threads[$i]->start();

}

Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
	$threads[$i]->join();
}
Mutex::destroy($mutex);

?>

5. Synchronisation des threads

Dans certains scénarios, nous ne voulons pas que thread->start() commence à exécuter le programme, mais nous voulons que le thread attende notre commande. La fonction de test de $thread->wait(); est que le thread ne s'exécutera pas immédiatement après thread->start(). Il ne s'exécutera qu'après avoir reçu le signal de $thread->notify();

6. Pool de threads

Une classe Pool

<?php
$tmp = tempnam(__FILE__, &#39;PHP&#39;);
$key = ftok($tmp, &#39;a&#39;);

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
		$this->shmid = $shmid;
	}
	public function run() {

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
	}
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>

Pool de threads de file d'attente dynamique

L'exemple ci-dessus consiste à exécuter start lorsque le pool de threads est plein et à le démarrer uniformément. L'exemple suivant consiste à créer un nouveau thread dès que là-bas. est le temps d'inactivité dans le pool de threads.

<?php
$tmp = tempnam(__FILE__, &#39;PHP&#39;);
$key = ftok($tmp, &#39;a&#39;);

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
		$this->shmid = $shmid;
	}
	public function run() {

		$this->synchronized(function($thread){
				$thread->wait();
				}, $this);

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
	}
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->synchronized(function($thread){
			$thread->notify();
			}, $threads[$i]);
}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>		

pthreads Pool class

<?php
class Update extends Thread {

    public $running = false;
    public $row = array();
    public function __construct($row) {

	$this->row = $row;
        $this->sql = null;
    }

    public function run() {

	if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
		$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
	}else{
		$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
		file_put_contents("bankno_error.log", $error, FILE_APPEND);
	}

	if( strlen($bankno) > 7 ){
		$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);

		$this->sql = $sql;
	}

	printf("%s\n",$this->sql);
    }

}

class Pool {
	public $pool = array();
	public function __construct($count) {
		$this->count = $count;
	}
	public function push($row){
		if(count($this->pool) < $this->count){
			$this->pool[] = new Update($row);
			return true;
		}else{
			return false;
		}
	}
	public function start(){
		foreach ( $this->pool as $id => $worker){
			$this->pool[$id]->start();
		}
	}
	public function join(){
		foreach ( $this->pool as $id => $worker){
               $this->pool[$id]->join();
		}
	}
	public function clean(){
		foreach ( $this->pool as $id => $worker){
			if(! $worker->isRunning()){
            	unset($this->pool[$id]);
            }
		}
	}
}

try {
	$dbh    = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
		PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
		PDO::MYSQL_ATTR_COMPRESS => true
		)
	);

	$sql  = "select id,bankno from members order by id desc";
	$row = $dbh->query($sql);
	$pool = new Pool(5);
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{

		while(true){
			if($pool->push($member)){ //压入任务到池中
				break;
			}else{ //如果池已经满,就开始启动线程
				$pool->start();
				$pool->join();
				$pool->clean();
			}
		}
	}
	$pool->start();
    $pool->join();

	$dbh = null;

} catch (Exception $e) {
    echo &#39;[&#39; , date(&#39;H:i:s&#39;) , &#39;]&#39;, &#39;系统错误&#39;, $e->getMessage(), "\n";
}
?>

7. Lecture et écriture sécurisées de fichiers multithread

LOCK_SH acquiert un verrou partagé (programme de lecture)

    LOCK_EX acquiert un verrou exclusif (programme d'écriture
  • LOCK_UN release Lock ( qu'il soit partagé ou exclusif)
  • LOCK_NB Si vous ne voulez pas que flock() bloque lors du verrouillage
  • <?php
    class Update extends Thread {
    
    	public $running = false;
    	public $row = array();
    	public function __construct($row) {
    
    		$this->row = $row;
    		$this->sql = null;
    		//print_r($this->row);
    	}
    
    	public function run() {
    
    		if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
    			$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
    		}else{
    			$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
    			file_put_contents("bankno_error.log", $error, FILE_APPEND);
    		}
    
    		if( strlen($bankno) > 7 ){
    			$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);
    
    			$this->sql = $sql;
    		}
    
    		printf("%s\n",$this->sql);
    	}
    
    }
    
    try {
    	$dbh    = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
    				PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
    				PDO::MYSQL_ATTR_COMPRESS => true
    				)
    			);
    
    	$sql     = "select id,bankno from members order by id desc limit 50";
    
    	$row = $dbh->query($sql);
    	$pool = array();
    	while($member = $row->fetch(PDO::FETCH_ASSOC))
    	{
    		$id 	= $member[&#39;id&#39;];
    		while (true){
    			if(count($pool) < 5){
    				$pool[$id] = new Update($member);
    				$pool[$id]->start();
    				break;
    			}else{
    				foreach ( $pool as $name => $worker){
    					if(! $worker->isRunning()){
    						unset($pool[$name]);
    					}
    				}
    			}
    		}
    
    	}
    
    	$dbh = null;
    
    } catch (Exception $e) {
    	echo &#39;【&#39; , date(&#39;H:i:s&#39;) , &#39;】&#39;, &#39;【系统错误】&#39;, $e->getMessage(), "\n";
    }
    ?> 
  • 8. Multi-threading et connexion de données
  • Lorsque vous utilisez pthreads avec pdo en même temps, vous avez besoin faire attention à une chose, qui nécessite une déclaration statique public static $dbh ; et accéder à la connexion à la base de données via le mode singleton

  • Worker et PDO
<?php

class WebWorker extends Worker {

	public function __construct(SafeLog $logger) {
		$this->logger = $logger;
	}

	protected $loger;
}

class WebWork extends Stackable {

	public function isComplete() {
		return $this->complete;
	}

	public function run() {
		$this->worker
			->logger
			->log("%s executing in Thread #%lu",
					__CLASS__, $this->worker->getThreadId());
		$this->complete = true;
	}

	protected $complete;
}

class SafeLog extends Stackable {

	protected function log($message, $args = []) {
		$args = func_get_args();

		if (($message = array_shift($args))) {
			echo vsprintf(
					"{$message}\n", $args);
		}
	}
}


$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);

$pool->submit($w=new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->shutdown();

$pool->collect(function($work){
		return $work->isComplete();
		});

var_dump($pool);			

Pool et PDO

liez la base de données dans le pool de threads

<?php

$fp = fopen("/tmp/lock.txt", "r+");
if (flock($fp, LOCK_EX)) {  // 进行排它型锁定
	ftruncate($fp, 0);      // truncate file
	fwrite($fp, "Write something here\n");
	fflush($fp);            // flush output before releasing the lock
	flock($fp, LOCK_UN);    // 释放锁定
} else {
	echo "Couldn&#39;t get the lock!";
}
fclose($fp);



$fp = fopen(&#39;/tmp/lock.txt&#39;, &#39;r+&#39;);
if(!flock($fp, LOCK_EX | LOCK_NB)) {
	echo &#39;Unable to obtain lock&#39;;
	exit(-1);
}
fclose($fp);
?>

pour poursuivre améliorer le programme ci-dessus, nous utilisons le mode singleton $this-> ;worker->getInstance(); en général, pthreads est encore en développement, et il y a encore quelques lacunes, on voit aussi que le git de pthreads améliore constamment ce projet

Les connexions persistantes à la base de données sont très importantes, sinon chaque thread ouvrira une connexion à la base de données puis la fermera, ce qui entraînera de nombreux délais d'attente de liens

<?php
class Work extends Stackable {

        public function __construct() {
        }

        public function run() {
                $dbh  = $this->worker->getConnection();
                $sql     = "select id,name from members order by id desc limit 50";
                $row = $dbh->query($sql);
                while($member = $row->fetch(PDO::FETCH_ASSOC)){
                        print_r($member);
                }
        }

}

class ExampleWorker extends Worker {
        public static $dbh;
        public function __construct($name) {
        }

        /*
        * The run method should just prepare the environment for the work that is coming ...
        */
        public function run(){
                self::$dbh = new PDO(&#39;mysql:host=192.168.2.1;dbname=example&#39;,&#39;www&#39;,&#39;123456&#39;);
        }
        public function getConnection(){
                return self::$dbh;
        }
}

$worker = new ExampleWorker("My Worker Thread");

$work=new Work();
$worker->stack($work);

$worker->start();
$worker->shutdown();
?>

Apprentissage recommandé : "

PHP. Tutoriel vidéo

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn

Articles Liés

Voir plus