Home >Backend Development >PHP Problem >How to use php pthreads

How to use php pthreads

藏色散人
藏色散人Original
2021-09-15 10:20:422075browse

php How to use pthreads: 1. Install pthreads through "pecl install pthreads"; 2. Use a mutex lock when you need to control multiple threads and only one thread can work at the same time.

How to use php pthreads

The operating environment of this article: windows7 system, php7.0.2 version, DELL G3 computer

Installation and use of php multi-threaded pthreads

Installing Pthreads basically requires recompiling PHP and adding the --enable-maintainer-zts parameter, but there are few documents for using this; there will be many bugs and unexpected problems in the production environment. I can only haha, so just play around with this thing. For real multi-threading, you still need to use Python, C, etc.

Most of the following codes come from the Internet

1. Installation

What is used here is php-7.0.2

./configure \
--prefix=/usr/local/php7 \
--with-config-file-path=/etc \
--with-config-file-scan-dir=/etc/php.d \
--enable-debug \
--enable-maintainer-zts \
--enable-pcntl \
--enable-fpm \
--enable-opcache \
--enable-embed=shared \
--enable-json=shared \
--enable-phpdbg \
--with-curl=shared \
--with-mysql=/usr/local/mysql \
--with-mysqli=/usr/local/mysql/bin/mysql_config \
--with-pdo-mysql


make && make install

Install pthreads

pecl install pthreads

2. Thread

<?php
#1
$thread = new class extends Thread {
	public function run() {
		echo "Hello World {$this->getThreadId()}\n";                                                                                  
	}   
};

$thread->start() && $thread->join();



#2

class workerThread extends Thread { 
	public function __construct($i){
		$this->i=$i;
	}

	public function run(){
		while(true){
			echo $this->i."\n";
			sleep(1);
		} 
	} 
}

for($i=0;$i<50;$i++){
	$workers[$i]=new workerThread($i);
	$workers[$i]->start();
}

?>

3. Worker and Stackable

Stackables are tasks that are executed by Worker threads. You can synchronize with, read, and write Stackable objects before, after and during their execution.

<?php
class SQLQuery extends Stackable {

	public function __construct($sql) {
		$this->sql = $sql;
	}

	public function run() {
		$dbh  = $this->worker->getConnection();
		$row = $dbh->query($this->sql);
		while($member = $row->fetch(PDO::FETCH_ASSOC)){
			print_r($member);
		}
	}

}

class ExampleWorker extends Worker {
	public static $dbh;
	public function __construct($name) {
	}

	public function run(){
		self::$dbh = new PDO(&#39;mysql:host=10.0.0.30;dbname=testdb&#39;,&#39;root&#39;,&#39;123456&#39;);
	}
	public function getConnection(){
		return self::$dbh;
	}
}

$worker = new ExampleWorker("My Worker Thread");

$sql1 = new SQLQuery(&#39;select * from test order by id desc limit 1,5&#39;);
$worker->stack($sql1);

$sql2 = new SQLQuery(&#39;select * from test order by id desc limit 5,5&#39;);
$worker->stack($sql2);

$worker->start();
$worker->shutdown();
?>

4. Mutex lock

What Under what circumstances will a mutex lock be used? It can be used when you need to control multiple threads and only one thread can work at the same time. A simple counter program to illustrate the difference with and without a mutex lock

<?php
$counter = 0;
$handle=fopen("/tmp/counter.txt", "w");
fwrite($handle, $counter );
fclose($handle);

class CounterThread extends Thread {
	public function __construct($mutex = null){
		$this->mutex = $mutex;
		$this->handle = fopen("/tmp/counter.txt", "w+");
	}
	public function __destruct(){
		fclose($this->handle);
	}
	public function run() {
		if($this->mutex)
			$locked=Mutex::lock($this->mutex);

		$counter = intval(fgets($this->handle));
		$counter++;
		rewind($this->handle);
		fputs($this->handle, $counter );
		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

		if($this->mutex)
			Mutex::unlock($this->mutex);
	}
}

//没有互斥锁
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread();
	$threads[$i]->start();

}

//加入互斥锁
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread($mutex);
	$threads[$i]->start();

}

Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
	$threads[$i]->join();
}
Mutex::destroy($mutex);

?>

Multiple threads and shared memory

In the example of shared memory, no locks are used and it may still work normally , maybe the working memory operation itself has the function of locking

<?php
$tmp = tempnam(__FILE__, &#39;PHP&#39;);
$key = ftok($tmp, &#39;a&#39;);

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
		$this->shmid = $shmid;
	}
	public function run() {

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
	}
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>

5. Thread synchronization

In some scenarios, we do not want thread->start() to start running the program, but want the thread to wait for us The command. The test effect of $thread->wait(); is that the thread will not run immediately after thread->start(). It will only run after receiving the signal from $thread->notify();

<?php
$tmp = tempnam(__FILE__, &#39;PHP&#39;);
$key = ftok($tmp, &#39;a&#39;);

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
		$this->shmid = $shmid;
	}
	public function run() {

		$this->synchronized(function($thread){
				$thread->wait();
				}, $this);

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
	}
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->synchronized(function($thread){
			$thread->notify();
			}, $threads[$i]);
}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>		

6. Thread pool

A Pool class

<?php
class Update extends Thread {

    public $running = false;
    public $row = array();
    public function __construct($row) {

	$this->row = $row;
        $this->sql = null;
    }

    public function run() {

	if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
		$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
	}else{
		$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
		file_put_contents("bankno_error.log", $error, FILE_APPEND);
	}

	if( strlen($bankno) > 7 ){
		$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);

		$this->sql = $sql;
	}

	printf("%s\n",$this->sql);
    }

}

class Pool {
	public $pool = array();
	public function __construct($count) {
		$this->count = $count;
	}
	public function push($row){
		if(count($this->pool) < $this->count){
			$this->pool[] = new Update($row);
			return true;
		}else{
			return false;
		}
	}
	public function start(){
		foreach ( $this->pool as $id => $worker){
			$this->pool[$id]->start();
		}
	}
	public function join(){
		foreach ( $this->pool as $id => $worker){
               $this->pool[$id]->join();
		}
	}
	public function clean(){
		foreach ( $this->pool as $id => $worker){
			if(! $worker->isRunning()){
            	unset($this->pool[$id]);
            }
		}
	}
}

try {
	$dbh    = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
		PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
		PDO::MYSQL_ATTR_COMPRESS => true
		)
	);

	$sql  = "select id,bankno from members order by id desc";
	$row = $dbh->query($sql);
	$pool = new Pool(5);
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{

		while(true){
			if($pool->push($member)){ //压入任务到池中
				break;
			}else{ //如果池已经满,就开始启动线程
				$pool->start();
				$pool->join();
				$pool->clean();
			}
		}
	}
	$pool->start();
    $pool->join();

	$dbh = null;

} catch (Exception $e) {
    echo &#39;[&#39; , date(&#39;H:i:s&#39;) , &#39;]&#39;, &#39;系统错误&#39;, $e->getMessage(), "\n";
}
?>

Dynamic queue thread pool

The above example is to execute start when the thread pool is full. The following example is Create new threads as soon as there is room in the thread pool.

<?php
class Update extends Thread {

	public $running = false;
	public $row = array();
	public function __construct($row) {

		$this->row = $row;
		$this->sql = null;
		//print_r($this->row);
	}

	public function run() {

		if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
			$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
		}else{
			$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
			file_put_contents("bankno_error.log", $error, FILE_APPEND);
		}

		if( strlen($bankno) > 7 ){
			$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);

			$this->sql = $sql;
		}

		printf("%s\n",$this->sql);
	}

}

try {
	$dbh    = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
				PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
				PDO::MYSQL_ATTR_COMPRESS => true
				)
			);

	$sql     = "select id,bankno from members order by id desc limit 50";

	$row = $dbh->query($sql);
	$pool = array();
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{
		$id 	= $member[&#39;id&#39;];
		while (true){
			if(count($pool) < 5){
				$pool[$id] = new Update($member);
				$pool[$id]->start();
				break;
			}else{
				foreach ( $pool as $name => $worker){
					if(! $worker->isRunning()){
						unset($pool[$name]);
					}
				}
			}
		}

	}

	$dbh = null;

} catch (Exception $e) {
	echo &#39;【&#39; , date(&#39;H:i:s&#39;) , &#39;】&#39;, &#39;【系统错误】&#39;, $e->getMessage(), "\n";
}
?> 

pthreads Pool class

<?php

class WebWorker extends Worker {

	public function __construct(SafeLog $logger) {
		$this->logger = $logger;
	}

	protected $loger;
}

class WebWork extends Stackable {

	public function isComplete() {
		return $this->complete;
	}

	public function run() {
		$this->worker
			->logger
			->log("%s executing in Thread #%lu",
					__CLASS__, $this->worker->getThreadId());
		$this->complete = true;
	}

	protected $complete;
}

class SafeLog extends Stackable {

	protected function log($message, $args = []) {
		$args = func_get_args();

		if (($message = array_shift($args))) {
			echo vsprintf(
					"{$message}\n", $args);
		}
	}
}


$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);

$pool->submit($w=new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->shutdown();

$pool->collect(function($work){
		return $work->isComplete();
		});

var_dump($pool);			

7. Multi-threaded file safe reading and writing

  • LOCK_SH Obtain shared lock (reading program)

  • LOCK_EX Obtains an exclusive lock (writing program

  • LOCK_UN Releases a lock (whether shared or exclusive)

  • LOCK_NB If you don’t want flock() to block during locking

<?php

$fp = fopen("/tmp/lock.txt", "r+");
if (flock($fp, LOCK_EX)) {  // 进行排它型锁定
	ftruncate($fp, 0);      // truncate file
	fwrite($fp, "Write something here\n");
	fflush($fp);            // flush output before releasing the lock
	flock($fp, LOCK_UN);    // 释放锁定
} else {
	echo "Couldn&#39;t get the lock!";
}
fclose($fp);



$fp = fopen(&#39;/tmp/lock.txt&#39;, &#39;r+&#39;);
if(!flock($fp, LOCK_EX | LOCK_NB)) {
	echo &#39;Unable to obtain lock&#39;;
	exit(-1);
}
fclose($fp);
?>

8. Multi-threading and data connection

When using pthreads and pdo at the same time, you need to pay attention to it. Static declaration public static $dbh; and access the database connection through singleton mode.

Worker and PDO

<?php
class Work extends Stackable {

        public function __construct() {
        }

        public function run() {
                $dbh  = $this->worker->getConnection();
                $sql     = "select id,name from members order by id desc limit 50";
                $row = $dbh->query($sql);
                while($member = $row->fetch(PDO::FETCH_ASSOC)){
                        print_r($member);
                }
        }

}

class ExampleWorker extends Worker {
        public static $dbh;
        public function __construct($name) {
        }

        /*
        * The run method should just prepare the environment for the work that is coming ...
        */
        public function run(){
                self::$dbh = new PDO(&#39;mysql:host=192.168.2.1;dbname=example&#39;,&#39;www&#39;,&#39;123456&#39;);
        }
        public function getConnection(){
                return self::$dbh;
        }
}

$worker = new ExampleWorker("My Worker Thread");

$work=new Work();
$worker->stack($work);

$worker->start();
$worker->shutdown();
?>

Pool and PDO

Link the database in the thread pool

# cat pool.php
<?php
class ExampleWorker extends Worker {

	public function __construct(Logging $logger) {
		$this->logger = $logger;
	}

	protected $logger;
}

/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
	public function __construct($number) {
		$this->number = $number;
	}
	public function run() {
                $dbhost = &#39;db.example.com&#39;;               // 数据库服务器
                $dbuser = &#39;example.com&#39;;                 // 数据库用户名
                $dbpw = &#39;password&#39;;                               // 数据库密码
                $dbname = &#39;example_real&#39;;
		$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
                        PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
                        PDO::MYSQL_ATTR_COMPRESS => true,
			PDO::ATTR_PERSISTENT => true
                        )
                );
		$sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN=&#39;".$this->number[&#39;name&#39;]."&#39; and CMD=&#39;6&#39; and `COMMENT` = &#39;".$this->number[&#39;order&#39;].":DEPOSIT&#39;";
		#echo $sql;
		$row = $dbh->query($sql);
		$mt4_trades  = $row->fetch(PDO::FETCH_ASSOC);
		if($mt4_trades){

			$row = null;

			$sql = "UPDATE db_example.accounts SET paystatus=&#39;成功&#39;, deposit_time=&#39;".$mt4_trades[&#39;OPEN_TIME&#39;]."&#39; where `order` = &#39;".$this->number[&#39;order&#39;]."&#39;;";
			$dbh->query($sql);
			#printf("%s\n",$sql);
		}
		$dbh = null;
		printf("runtime: %s, %s, %s\n", date(&#39;Y-m-d H:i:s&#39;), $this->worker->getThreadId() ,$this->number[&#39;order&#39;]);

	}
}

class Logging extends Stackable {
	protected  static $dbh;
	public function __construct() {
		$dbhost = &#39;db.example.com&#39;;			// 数据库服务器
	        $dbuser = &#39;example.com&#39;;                 // 数据库用户名
        	$dbpw = &#39;password&#39;;                               // 数据库密码
		$dbname = &#39;example_real&#39;;			// 数据库名

		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
			PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
			PDO::MYSQL_ATTR_COMPRESS => true
			)
		);

	}
	protected function log($message, $args = []) {
		$args = func_get_args();

		if (($message = array_shift($args))) {
			echo vsprintf("{$message}\n", $args);
		}
	}

	protected function getConnection(){
                return self::$dbh;
        }
}

$pool = new Pool(200, \ExampleWorker::class, [new Logging()]);

$dbhost = &#39;db.example.com&#39;;                      // 数据库服务器
$dbuser = &#39;example.com&#39;;                 // 数据库用户名
$dbpw = &#39;password&#39;;                               // 数据库密码
$dbname = &#39;db_example&#39;;
$dbh    = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
                        PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
                        PDO::MYSQL_ATTR_COMPRESS => true
                        )
                );
$sql = "select `order`,name from accounts where deposit_time is null order by id desc";

$row = $dbh->query($sql);
while($account = $row->fetch(PDO::FETCH_ASSOC))
{
        $pool->submit(new Work($account));
}

$pool->shutdown();

?> 

To further improve the above program, we use the singleton mode $this->worker->getInstance(); to make only one database connection globally, and the thread uses a shared database connection

<?php
class ExampleWorker extends Worker {

	#public function __construct(Logging $logger) {
	#	$this->logger = $logger;
	#}

	#protected $logger;
	protected  static $dbh;
	public function __construct() {

	}
	public function run(){
		$dbhost = &#39;db.example.com&#39;;			// 数据库服务器
	    $dbuser = &#39;example.com&#39;;        	// 数据库用户名
        $dbpw = &#39;password&#39;;             	// 数据库密码
		$dbname = &#39;example&#39;;				// 数据库名

		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
			PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
			PDO::MYSQL_ATTR_COMPRESS => true,
			PDO::ATTR_PERSISTENT => true
			)
		);

	}
	protected function getInstance(){
        return self::$dbh;
    }

}

/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
	public function __construct($data) {
		$this->data = $data;
		#print_r($data);
	}

	public function run() {
		#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );

		try {
			$dbh  = $this->worker->getInstance();
			#print_r($dbh);
               		$id = $this->data[&#39;id&#39;];
			$mobile = safenet_decrypt($this->data[&#39;mobile&#39;]);
			#printf("%d, %s \n", $id, $mobile);
			if(strlen($mobile) > 11){
				$mobile = substr($mobile, -11);
			}
			if($mobile == &#39;null&#39;){
			#	$sql = "UPDATE members_digest SET mobile = &#39;".$mobile."&#39; where id = &#39;".$id."&#39;";
			#	printf("%s\n",$sql);
			#	$dbh->query($sql);
				$mobile = &#39;&#39;;
				$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
			}else{
				$sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id";
			}
			$sth = $dbh->prepare($sql);
			$sth->bindValue(&#39;:mobile&#39;, $mobile);
			$sth->bindValue(&#39;:id&#39;, $id);
			$sth->execute();
			#echo $sth->debugDumpParams();
		}
		catch(PDOException $e) {
			$error = sprintf("%s,%s\n", $mobile, $id );
			file_put_contents("mobile_error.log", $error, FILE_APPEND);
		}

		#$dbh = null;
		printf("runtime: %s, %s, %s, %s\n", date(&#39;Y-m-d H:i:s&#39;), $this->worker->getThreadId() ,$mobile, $id);
		#printf("runtime: %s, %s\n", date(&#39;Y-m-d H:i:s&#39;), $this->number);
	}
}

$pool = new Pool(100, \ExampleWorker::class, []);

#foreach (range(0, 100) as $number) {
#	$pool->submit(new Work($number));
#}

$dbhost = &#39;db.example.com&#39;;                     // 数据库服务器
$dbuser = &#39;example.com&#39;;                 		// 数据库用户名
$dbpw = &#39;password&#39;;                             // 数据库密码
$dbname = &#39;example&#39;;
$dbh    = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array(
                        PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
                        PDO::MYSQL_ATTR_COMPRESS => true
                        )
                );
#print_r($dbh);

#$sql = "select id, mobile from members where id < :id";
#$sth = $dbh->prepare($sql);
#$sth->bindValue(&#39;:id&#39;,300);
#$sth->execute();
#$result = $sth->fetchAll();
#print_r($result);
#
#$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
#$sth = $dbh->prepare($sql);
#$sth->bindValue(&#39;:mobile&#39;, &#39;aa&#39;);
#$sth->bindValue(&#39;:id&#39;,&#39;272&#39;);
#echo $sth->execute();
#echo $sth->queryString;
#echo $sth->debugDumpParams();


$sql = "select id, mobile from members order by id asc"; // limit 1000";
$row = $dbh->query($sql);
while($members = $row->fetch(PDO::FETCH_ASSOC))
{
        #$order =  $account[&#39;order&#39;];
        #printf("%s\n",$order);
        //print_r($members);
        $pool->submit(new Work($members));
		#unset($account[&#39;order&#39;]);
}

$pool->shutdown();

?>	

In multi-threading Summary of operating database

In general, pthreads is still in development and there are still some shortcomings. We can also see that pthreads’ git is constantly improving this project

The database persistent link is very Important, otherwise each thread will open a database connection and then close it, which will cause many link timeouts.

<?php
$dbh = new PDO(&#39;mysql:host=localhost;dbname=test&#39;, $user, $pass, array(
    PDO::ATTR_PERSISTENT => true
));
?>

Recommended learning: "PHP Video Tutorial"

The above is the detailed content of How to use php pthreads. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Related articles

See more