>백엔드 개발 >PHP 문제 >PHP pthread를 사용하는 방법

PHP pthread를 사용하는 방법

藏色散人
藏色散人원래의
2021-09-15 10:20:422099검색

php pthreads 사용 방법: 1. "pecl install pthreads"를 통해 pthread를 설치합니다. 2. 여러 스레드를 제어해야 하고 동시에 하나의 스레드만 작동할 수 있는 경우 뮤텍스 잠금을 사용합니다.

PHP pthread를 사용하는 방법

이 문서의 운영 환경: Windows 7 시스템, PHP 버전 7.0.2, DELL G3 컴퓨터

PHP 다중 스레드 pthread 설치 및 사용

Pthreads를 설치하려면 기본적으로 PHP를 다시 컴파일해야 하며 다음을 추가해야 합니다. -enable-maintainer-zts 매개변수이지만 이 문서는 거의 사용되지 않습니다. 버그가 많고 예상치 못한 문제가 많을 것이며 프로덕션 환경은 하하하하하하하하하하, 그냥 이걸 가지고 놀아보세요. 실제 멀티스레딩에는 Python을 사용하세요. C 등

다음 코드의 대부분은 인터넷에서 가져온 것입니다. Stackable

Stackable은 작업자 스레드에 의해 실행되는 작업입니다. 실행 전, 실행 후, 실행 중에 Stackable 객체를 동기화하고 읽고 쓸 수 있습니다.

./configure \
--prefix=/usr/local/php7 \
--with-config-file-path=/etc \
--with-config-file-scan-dir=/etc/php.d \
--enable-debug \
--enable-maintainer-zts \
--enable-pcntl \
--enable-fpm \
--enable-opcache \
--enable-embed=shared \
--enable-json=shared \
--enable-phpdbg \
--with-curl=shared \
--with-mysql=/usr/local/mysql \
--with-mysqli=/usr/local/mysql/bin/mysql_config \
--with-pdo-mysql


make && make install

4. 뮤텍스 잠금은 어떤 상황에서 발생합니까? 사용될까? 여러 스레드를 제어해야 하고 동시에 하나의 스레드만 작동할 때 사용할 수 있습니다. 뮤텍스 잠금이 있는 경우와 없는 경우의 차이점을 설명하는 간단한 카운터 프로그램

pecl install pthreads

다중 스레드 및 공유 메모리

공유 메모리의 예에서는 잠금이 사용되지 않으며 여전히 정상적으로 작동할 수 있습니다. 작업 메모리 작업 자체에는 lock 함수

<?php
#1
$thread = new class extends Thread {
	public function run() {
		echo "Hello World {$this->getThreadId()}\n";                                                                                  
	}   
};

$thread->start() && $thread->join();



#2

class workerThread extends Thread { 
	public function __construct($i){
		$this->i=$i;
	}

	public function run(){
		while(true){
			echo $this->i."\n";
			sleep(1);
		} 
	} 
}

for($i=0;$i<50;$i++){
	$workers[$i]=new workerThread($i);
	$workers[$i]->start();
}

?>
5. 스레드 동기화

일부 시나리오에서는 thread->start()가 프로그램 실행을 시작하는 것을 원하지 않지만 스레드가 명령을 기다리기를 원합니다. $thread->wait();의 테스트 기능은 스레드가 thread->start() 직후에 실행되지 않는다는 것입니다. $thread->notify();
6. Thread pool

A Pool 클래스

<?php
class SQLQuery extends Stackable {

	public function __construct($sql) {
		$this->sql = $sql;
	}

	public function run() {
		$dbh  = $this->worker->getConnection();
		$row = $dbh->query($this->sql);
		while($member = $row->fetch(PDO::FETCH_ASSOC)){
			print_r($member);
		}
	}

}

class ExampleWorker extends Worker {
	public static $dbh;
	public function __construct($name) {
	}

	public function run(){
		self::$dbh = new PDO(&#39;mysql:host=10.0.0.30;dbname=testdb&#39;,&#39;root&#39;,&#39;123456&#39;);
	}
	public function getConnection(){
		return self::$dbh;
	}
}

$worker = new ExampleWorker("My Worker Thread");

$sql1 = new SQLQuery(&#39;select * from test order by id desc limit 1,5&#39;);
$worker->stack($sql1);

$sql2 = new SQLQuery(&#39;select * from test order by id desc limit 5,5&#39;);
$worker->stack($sql2);

$worker->start();
$worker->shutdown();
?>

Dynamic queue thread pool

위의 예는 스레드 풀이 가득 찼을 때 start를 실행하고 균일하게 시작하는 것입니다. 다음은 스레드 풀이 가득 차자마자 새로운 스레드를 생성하는 것입니다. 스레드 풀의 유휴 시간입니다.

<?php
$counter = 0;
$handle=fopen("/tmp/counter.txt", "w");
fwrite($handle, $counter );
fclose($handle);

class CounterThread extends Thread {
	public function __construct($mutex = null){
		$this->mutex = $mutex;
		$this->handle = fopen("/tmp/counter.txt", "w+");
	}
	public function __destruct(){
		fclose($this->handle);
	}
	public function run() {
		if($this->mutex)
			$locked=Mutex::lock($this->mutex);

		$counter = intval(fgets($this->handle));
		$counter++;
		rewind($this->handle);
		fputs($this->handle, $counter );
		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

		if($this->mutex)
			Mutex::unlock($this->mutex);
	}
}

//没有互斥锁
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread();
	$threads[$i]->start();

}

//加入互斥锁
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread($mutex);
	$threads[$i]->start();

}

Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
	$threads[$i]->join();
}
Mutex::destroy($mutex);

?>

pthreads 풀 클래스

<?php
$tmp = tempnam(__FILE__, &#39;PHP&#39;);
$key = ftok($tmp, &#39;a&#39;);

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
		$this->shmid = $shmid;
	}
	public function run() {

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
	}
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>

7. 멀티 스레드 파일의 안전한 읽기 및 쓰기

LOCK_SH 공유 잠금 획득(읽기 프로그램)

LOCK_EX 독점 잠금 획득(쓰기 프로그램

LOCK_UN 해제 잠금( 공유 또는 독점 여부)

  • LOCK_NB 잠금 시 Flock()이 차단되는 것을 원하지 않는 경우

  • <?php
    $tmp = tempnam(__FILE__, &#39;PHP&#39;);
    $key = ftok($tmp, &#39;a&#39;);
    
    $shmid = shm_attach($key);
    $counter = 0;
    shm_put_var( $shmid, 1, $counter );
    
    class CounterThread extends Thread {
    	public function __construct($shmid){
    		$this->shmid = $shmid;
    	}
    	public function run() {
    
    		$this->synchronized(function($thread){
    				$thread->wait();
    				}, $this);
    
    		$counter = shm_get_var( $this->shmid, 1 );
    		$counter++;
    		shm_put_var( $this->shmid, 1, $counter );
    
    		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
    	}
    }
    
    for ($i=0;$i<100;$i++){
    	$threads[] = new CounterThread($shmid);
    }
    for ($i=0;$i<100;$i++){
    	$threads[$i]->start();
    
    }
    
    for ($i=0;$i<100;$i++){
    	$threads[$i]->synchronized(function($thread){
    			$thread->notify();
    			}, $threads[$i]);
    }
    
    for ($i=0;$i<100;$i++){
    	$threads[$i]->join();
    }
    shm_remove( $shmid );
    shm_detach( $shmid );
    ?>		
  • 8. 멀티스레딩 및 데이터 연결

    pdo와 pthread를 동시에 사용하는 경우 다음이 필요합니다. 정적 선언 public static $dbh가 필요한 한 가지 사항에 주의하고 싱글톤 모드
  • Worker 및 PDO

    <?php
    class Update extends Thread {
    
        public $running = false;
        public $row = array();
        public function __construct($row) {
    
    	$this->row = $row;
            $this->sql = null;
        }
    
        public function run() {
    
    	if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
    		$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
    	}else{
    		$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
    		file_put_contents("bankno_error.log", $error, FILE_APPEND);
    	}
    
    	if( strlen($bankno) > 7 ){
    		$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);
    
    		$this->sql = $sql;
    	}
    
    	printf("%s\n",$this->sql);
        }
    
    }
    
    class Pool {
    	public $pool = array();
    	public function __construct($count) {
    		$this->count = $count;
    	}
    	public function push($row){
    		if(count($this->pool) < $this->count){
    			$this->pool[] = new Update($row);
    			return true;
    		}else{
    			return false;
    		}
    	}
    	public function start(){
    		foreach ( $this->pool as $id => $worker){
    			$this->pool[$id]->start();
    		}
    	}
    	public function join(){
    		foreach ( $this->pool as $id => $worker){
                   $this->pool[$id]->join();
    		}
    	}
    	public function clean(){
    		foreach ( $this->pool as $id => $worker){
    			if(! $worker->isRunning()){
                	unset($this->pool[$id]);
                }
    		}
    	}
    }
    
    try {
    	$dbh    = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
    		PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
    		PDO::MYSQL_ATTR_COMPRESS => true
    		)
    	);
    
    	$sql  = "select id,bankno from members order by id desc";
    	$row = $dbh->query($sql);
    	$pool = new Pool(5);
    	while($member = $row->fetch(PDO::FETCH_ASSOC))
    	{
    
    		while(true){
    			if($pool->push($member)){ //压入任务到池中
    				break;
    			}else{ //如果池已经满,就开始启动线程
    				$pool->start();
    				$pool->join();
    				$pool->clean();
    			}
    		}
    	}
    	$pool->start();
        $pool->join();
    
    	$dbh = null;
    
    } catch (Exception $e) {
        echo &#39;[&#39; , date(&#39;H:i:s&#39;) , &#39;]&#39;, &#39;系统错误&#39;, $e->getMessage(), "\n";
    }
    ?>

    Pool 및 PDO
  • 스레드 풀의 데이터베이스 연결

    <?php
    class Update extends Thread {
    
    	public $running = false;
    	public $row = array();
    	public function __construct($row) {
    
    		$this->row = $row;
    		$this->sql = null;
    		//print_r($this->row);
    	}
    
    	public function run() {
    
    		if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
    			$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
    		}else{
    			$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
    			file_put_contents("bankno_error.log", $error, FILE_APPEND);
    		}
    
    		if( strlen($bankno) > 7 ){
    			$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);
    
    			$this->sql = $sql;
    		}
    
    		printf("%s\n",$this->sql);
    	}
    
    }
    
    try {
    	$dbh    = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
    				PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
    				PDO::MYSQL_ATTR_COMPRESS => true
    				)
    			);
    
    	$sql     = "select id,bankno from members order by id desc limit 50";
    
    	$row = $dbh->query($sql);
    	$pool = array();
    	while($member = $row->fetch(PDO::FETCH_ASSOC))
    	{
    		$id 	= $member[&#39;id&#39;];
    		while (true){
    			if(count($pool) < 5){
    				$pool[$id] = new Update($member);
    				$pool[$id]->start();
    				break;
    			}else{
    				foreach ( $pool as $name => $worker){
    					if(! $worker->isRunning()){
    						unset($pool[$name]);
    					}
    				}
    			}
    		}
    
    	}
    
    	$dbh = null;
    
    } catch (Exception $e) {
    	echo &#39;【&#39; , date(&#39;H:i:s&#39;) , &#39;】&#39;, &#39;【系统错误】&#39;, $e->getMessage(), "\n";
    }
    ?> 

    을 통해 데이터베이스 연결에 액세스합니다. 위 프로그램을 개선하여 싱글톤 모드를 사용합니다. $this->worker->getInstance(); 전역적으로 하나의 데이터베이스 연결만 만들고 스레드는 공유 데이터베이스 연결을 사용합니다
  • <?php
    
    class WebWorker extends Worker {
    
    	public function __construct(SafeLog $logger) {
    		$this->logger = $logger;
    	}
    
    	protected $loger;
    }
    
    class WebWork extends Stackable {
    
    	public function isComplete() {
    		return $this->complete;
    	}
    
    	public function run() {
    		$this->worker
    			->logger
    			->log("%s executing in Thread #%lu",
    					__CLASS__, $this->worker->getThreadId());
    		$this->complete = true;
    	}
    
    	protected $complete;
    }
    
    class SafeLog extends Stackable {
    
    	protected function log($message, $args = []) {
    		$args = func_get_args();
    
    		if (($message = array_shift($args))) {
    			echo vsprintf(
    					"{$message}\n", $args);
    		}
    	}
    }
    
    
    $pool = new Pool(8, \WebWorker::class, [new SafeLog()]);
    
    $pool->submit($w=new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->submit(new WebWork());
    $pool->shutdown();
    
    $pool->collect(function($work){
    		return $work->isComplete();
    		});
    
    var_dump($pool);			
멀티 스레드에서 데이터베이스 운영 요약

In 일반적으로 pthreads는 아직 개발 중이며 여전히 몇 가지 단점이 있습니다. 또한 pthreads의 git이 이 프로젝트를 지속적으로 개선하고 있음을 알 수 있습니다.

지속적인 데이터베이스 연결이 매우 중요합니다. 그렇지 않으면 각 스레드가 데이터베이스 연결을 열고 닫습니다. 많은 링크 시간 초과가 발생합니다.

<?php

$fp = fopen("/tmp/lock.txt", "r+");
if (flock($fp, LOCK_EX)) {  // 进行排它型锁定
	ftruncate($fp, 0);      // truncate file
	fwrite($fp, "Write something here\n");
	fflush($fp);            // flush output before releasing the lock
	flock($fp, LOCK_UN);    // 释放锁定
} else {
	echo "Couldn&#39;t get the lock!";
}
fclose($fp);



$fp = fopen(&#39;/tmp/lock.txt&#39;, &#39;r+&#39;);
if(!flock($fp, LOCK_EX | LOCK_NB)) {
	echo &#39;Unable to obtain lock&#39;;
	exit(-1);
}
fclose($fp);
?>

권장 학습: "

PHP. 비디오 튜토리얼

위 내용은 PHP pthread를 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.

관련 기사

더보기