Home  >  Article  >  Backend Development  >  Installation and use of php multi-threaded pthreads, php multi-threaded pthreads_PHP tutorial

Installation and use of php multi-threaded pthreads, php multi-threaded pthreads_PHP tutorial

WBOY
WBOYOriginal
2016-07-12 09:00:35832browse

Installation and use of php multi-threaded pthreads, php multi-threaded pthreads

Installing Pthreads basically requires recompiling PHP, adding the --enable-maintainer-zts parameter, but using This document is very sparse; there will be many bugs and unexpected problems, and the production environment can only be haha, so just play around with this thing, for real multi-threading, use Python, C, etc.

Most of the following codes come from the Internet

1. Installation

The one used here is php-7.0.2

./configure \
--prefix=/usr/local/php7 \
--with-config-file-path=/etc \
--with-config-file-scan-dir=/etc/php.d \
--enable-debug \
--enable-maintainer-zts \
--enable-pcntl \
--enable-fpm \
--enable-opcache \
--enable-embed=shared \
--enable-json=shared \
--enable-phpdbg \
--with-curl=shared \
--with-mysql=/usr/local/mysql \
--with-mysqli=/usr/local/mysql/bin/mysql_config \
--with-pdo-mysql


make && make install

Install pthreads

pecl install pthreads

2. Thread

<?php
#1
$thread = new class extends Thread {
	public function run() {
		echo "Hello World {$this->getThreadId()}\n";                                                                                  
	}   
};

$thread->start() && $thread->join();



#2

class workerThread extends Thread { 
	public function __construct($i){
		$this->i=$i;
	}

	public function run(){
		while(true){
			echo $this->i."\n";
			sleep(1);
		} 
	} 
}

for($i=0;$i<50;$i++){
	$workers[$i]=new workerThread($i);
	$workers[$i]->start();
}

?>

3. Worker and Stackable

<p class="programlisting">Stackables are tasks that are executed by Worker threads. You can synchronize with, read, and write Stackable objects before, after and during their execution.</p>
<?php
class SQLQuery extends Stackable {

	public function __construct($sql) {
		$this->sql = $sql;
	}

	public function run() {
		$dbh  = $this->worker->getConnection();
		$row = $dbh->query($this->sql);
		while($member = $row->fetch(PDO::FETCH_ASSOC)){
			print_r($member);
		}
	}

}

class ExampleWorker extends Worker {
	public static $dbh;
	public function __construct($name) {
	}

	public function run(){
		self::$dbh = new PDO('mysql:host=10.0.0.30;dbname=testdb','root','123456');
	}
	public function getConnection(){
		return self::$dbh;
	}
}

$worker = new ExampleWorker("My Worker Thread");

$sql1 = new SQLQuery('select * from test order by id desc limit 1,5');
$worker->stack($sql1);

$sql2 = new SQLQuery('select * from test order by id desc limit 5,5');
$worker->stack($sql2);

$worker->start();
$worker->shutdown();
?>

4. Mutex lock

Under what circumstances is a mutex lock used? It can be used when you need to control multiple threads and only one thread can work at the same time. A simple counter program to illustrate the difference with or without a mutex

<?php
$counter = 0;
$handle=fopen("/tmp/counter.txt", "w");
fwrite($handle, $counter );
fclose($handle);

class CounterThread extends Thread {
	public function __construct($mutex = null){
		$this->mutex = $mutex;
		$this->handle = fopen("/tmp/counter.txt", "w+");
	}
	public function __destruct(){
		fclose($this->handle);
	}
	public function run() {
		if($this->mutex)
			$locked=Mutex::lock($this->mutex);

		$counter = intval(fgets($this->handle));
		$counter++;
		rewind($this->handle);
		fputs($this->handle, $counter );
		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

		if($this->mutex)
			Mutex::unlock($this->mutex);
	}
}

//没有互斥锁
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread();
	$threads[$i]->start();

}

//加入互斥锁
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread($mutex);
	$threads[$i]->start();

}

Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
	$threads[$i]->join();
}
Mutex::destroy($mutex);

?>

Multiple threads and shared memory

In the example of shared memory, no lock is used and it may still work normally. It is possible that the working memory operation itself has the function of locking

<?php
$tmp = tempnam(__FILE__, 'PHP');
$key = ftok($tmp, 'a');

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
		$this->shmid = $shmid;
	}
	public function run() {

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
	}
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>

5. Thread synchronization

In some scenarios, we don’t want thread->start() to start running the program, but we want the thread to wait for our command. The test function of $thread->wait(); is that the thread will not run immediately after thread->start(). It will only run after receiving the signal from $thread->notify();

<?php
$tmp = tempnam(__FILE__, 'PHP');
$key = ftok($tmp, 'a');

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
		$this->shmid = $shmid;
	}
	public function run() {

		$this->synchronized(function($thread){
				$thread->wait();
				}, $this);

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
	}
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->synchronized(function($thread){
			$thread->notify();
			}, $threads[$i]);
}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>		

6. Thread pool

A Pool class

<?php
class Update extends Thread {

    public $running = false;
    public $row = array();
    public function __construct($row) {

	$this->row = $row;
        $this->sql = null;
    }

    public function run() {

	if(strlen($this->row['bankno']) > 100 ){
		$bankno = safenet_decrypt($this->row['bankno']);
	}else{
		$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);
		file_put_contents("bankno_error.log", $error, FILE_APPEND);
	}

	if( strlen($bankno) > 7 ){
		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

		$this->sql = $sql;
	}

	printf("%s\n",$this->sql);
    }

}

class Pool {
	public $pool = array();
	public function __construct($count) {
		$this->count = $count;
	}
	public function push($row){
		if(count($this->pool) < $this->count){
			$this->pool[] = new Update($row);
			return true;
		}else{
			return false;
		}
	}
	public function start(){
		foreach ( $this->pool as $id => $worker){
			$this->pool[$id]->start();
		}
	}
	public function join(){
		foreach ( $this->pool as $id => $worker){
               $this->pool[$id]->join();
		}
	}
	public function clean(){
		foreach ( $this->pool as $id => $worker){
			if(! $worker->isRunning()){
            	unset($this->pool[$id]);
            }
		}
	}
}

try {
	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
		PDO::MYSQL_ATTR_COMPRESS => true
		)
	);

	$sql  = "select id,bankno from members order by id desc";
	$row = $dbh->query($sql);
	$pool = new Pool(5);
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{

		while(true){
			if($pool->push($member)){ //压入任务到池中
				break;
			}else{ //如果池已经满,就开始启动线程
				$pool->start();
				$pool->join();
				$pool->clean();
			}
		}
	}
	$pool->start();
    $pool->join();

	$dbh = null;

} catch (Exception $e) {
    echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n";
}
?>

Dynamic queue thread pool

The above example is to execute start when the thread pool is full. The following example is to create a new thread as soon as there is free space in the thread pool.

<?php
class Update extends Thread {

	public $running = false;
	public $row = array();
	public function __construct($row) {

		$this->row = $row;
		$this->sql = null;
		//print_r($this->row);
	}

	public function run() {

		if(strlen($this->row['bankno']) > 100 ){
			$bankno = safenet_decrypt($this->row['bankno']);
		}else{
			$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);
			file_put_contents("bankno_error.log", $error, FILE_APPEND);
		}

		if( strlen($bankno) > 7 ){
			$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

			$this->sql = $sql;
		}

		printf("%s\n",$this->sql);
	}

}

try {
	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
				PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
				PDO::MYSQL_ATTR_COMPRESS => true
				)
			);

	$sql     = "select id,bankno from members order by id desc limit 50";

	$row = $dbh->query($sql);
	$pool = array();
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{
		$id 	= $member['id'];
		while (true){
			if(count($pool) < 5){
				$pool[$id] = new Update($member);
				$pool[$id]->start();
				break;
			}else{
				foreach ( $pool as $name => $worker){
					if(! $worker->isRunning()){
						unset($pool[$name]);
					}
				}
			}
		}

	}

	$dbh = null;

} catch (Exception $e) {
	echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n";
}
?> 

pthreads Pool class

<?php

class WebWorker extends Worker {

	public function __construct(SafeLog $logger) {
		$this->logger = $logger;
	}

	protected $loger;
}

class WebWork extends Stackable {

	public function isComplete() {
		return $this->complete;
	}

	public function run() {
		$this->worker
			->logger
			->log("%s executing in Thread #%lu",
					__CLASS__, $this->worker->getThreadId());
		$this->complete = true;
	}

	protected $complete;
}

class SafeLog extends Stackable {

	protected function log($message, $args = []) {
		$args = func_get_args();

		if (($message = array_shift($args))) {
			echo vsprintf(
					"{$message}\n", $args);
		}
	}
}


$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);

$pool->submit($w=new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->shutdown();

$pool->collect(function($work){
		return $work->isComplete();
		});

var_dump($pool);			

7. Multi-threaded file safe reading and writing

  • LOCK_SH obtains shared lock (reading program)

  • LOCK_EX obtains an exclusive lock (written program

  • LOCK_UN releases the lock (whether shared or exclusive)

  • LOCK_NB if you don’t want flock() to block on lock

<?php

$fp = fopen("/tmp/lock.txt", "r+");
if (flock($fp, LOCK_EX)) {  // 进行排它型锁定
	ftruncate($fp, 0);      // truncate file
	fwrite($fp, "Write something here\n");
	fflush($fp);            // flush output before releasing the lock
	flock($fp, LOCK_UN);    // 释放锁定
} else {
	echo "Couldn't get the lock!";
}
fclose($fp);



$fp = fopen('/tmp/lock.txt', 'r+');
if(!flock($fp, LOCK_EX | LOCK_NB)) {
	echo 'Unable to obtain lock';
	exit(-1);
}
fclose($fp);
?>

8. Multi-threading and data connection

When using pthreads and pdo at the same time, one thing to note is that you need to statically declare public static $dbh; and access the database connection through singleton mode.

Worker and PDO

<?php
class Work extends Stackable {

        public function __construct() {
        }

        public function run() {
                $dbh  = $this->worker->getConnection();
                $sql     = "select id,name from members order by id desc limit 50";
                $row = $dbh->query($sql);
                while($member = $row->fetch(PDO::FETCH_ASSOC)){
                        print_r($member);
                }
        }

}

class ExampleWorker extends Worker {
        public static $dbh;
        public function __construct($name) {
        }

        /*
        * The run method should just prepare the environment for the work that is coming ...
        */
        public function run(){
                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');
        }
        public function getConnection(){
                return self::$dbh;
        }
}

$worker = new ExampleWorker("My Worker Thread");

$work=new Work();
$worker->stack($work);

$worker->start();
$worker->shutdown();
?>

Pool and PDO

Connect the database in the thread pool

# cat pool.php
<?php
class ExampleWorker extends Worker {

	public function __construct(Logging $logger) {
		$this->logger = $logger;
	}

	protected $logger;
}

/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
	public function __construct($number) {
		$this->number = $number;
	}
	public function run() {
                $dbhost = 'db.example.com';               // 数据库服务器
                $dbuser = 'example.com';                 // 数据库用户名
                $dbpw = 'password';                               // 数据库密码
                $dbname = 'example_real';
		$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
                        PDO::MYSQL_ATTR_COMPRESS => true,
			PDO::ATTR_PERSISTENT => true
                        )
                );
		$sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'";
		#echo $sql;
		$row = $dbh->query($sql);
		$mt4_trades  = $row->fetch(PDO::FETCH_ASSOC);
		if($mt4_trades){

			$row = null;

			$sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';";
			$dbh->query($sql);
			#printf("%s\n",$sql);
		}
		$dbh = null;
		printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']);

	}
}

class Logging extends Stackable {
	protected  static $dbh;
	public function __construct() {
		$dbhost = 'db.example.com';			// 数据库服务器
	        $dbuser = 'example.com';                 // 数据库用户名
        	$dbpw = 'password';                               // 数据库密码
		$dbname = 'example_real';			// 数据库名

		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
			PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
			PDO::MYSQL_ATTR_COMPRESS => true
			)
		);

	}
	protected function log($message, $args = []) {
		$args = func_get_args();

		if (($message = array_shift($args))) {
			echo vsprintf("{$message}\n", $args);
		}
	}

	protected function getConnection(){
                return self::$dbh;
        }
}

$pool = new Pool(200, \ExampleWorker::class, [new Logging()]);

$dbhost = 'db.example.com';                      // 数据库服务器
$dbuser = 'example.com';                 // 数据库用户名
$dbpw = 'password';                               // 数据库密码
$dbname = 'db_example';
$dbh    = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
                        PDO::MYSQL_ATTR_COMPRESS => true
                        )
                );
$sql = "select `order`,name from accounts where deposit_time is null order by id desc";

$row = $dbh->query($sql);
while($account = $row->fetch(PDO::FETCH_ASSOC))
{
        $pool->submit(new Work($account));
}

$pool->shutdown();

?> 

To further improve the above program, we use the singleton mode $this->worker->getInstance(); to only make a database connection globally, and the thread uses a shared database connection

<?php
class ExampleWorker extends Worker {

	#public function __construct(Logging $logger) {
	#	$this->logger = $logger;
	#}

	#protected $logger;
	protected  static $dbh;
	public function __construct() {

	}
	public function run(){
		$dbhost = 'db.example.com';			// 数据库服务器
	    $dbuser = 'example.com';        	// 数据库用户名
        $dbpw = 'password';             	// 数据库密码
		$dbname = 'example';				// 数据库名

		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
			PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
			PDO::MYSQL_ATTR_COMPRESS => true,
			PDO::ATTR_PERSISTENT => true
			)
		);

	}
	protected function getInstance(){
        return self::$dbh;
    }

}

/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
	public function __construct($data) {
		$this->data = $data;
		#print_r($data);
	}

	public function run() {
		#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );

		try {
			$dbh  = $this->worker->getInstance();
			#print_r($dbh);
               		$id = $this->data['id'];
			$mobile = safenet_decrypt($this->data['mobile']);
			#printf("%d, %s \n", $id, $mobile);
			if(strlen($mobile) > 11){
				$mobile = substr($mobile, -11);
			}
			if($mobile == 'null'){
			#	$sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'";
			#	printf("%s\n",$sql);
			#	$dbh->query($sql);
				$mobile = '';
				$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
			}else{
				$sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id";
			}
			$sth = $dbh->prepare($sql);
			$sth->bindValue(':mobile', $mobile);
			$sth->bindValue(':id', $id);
			$sth->execute();
			#echo $sth->debugDumpParams();
		}
		catch(PDOException $e) {
			$error = sprintf("%s,%s\n", $mobile, $id );
			file_put_contents("mobile_error.log", $error, FILE_APPEND);
		}

		#$dbh = null;
		printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);
		#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);
	}
}

$pool = new Pool(100, \ExampleWorker::class, []);

#foreach (range(0, 100) as $number) {
#	$pool->submit(new Work($number));
#}

$dbhost = 'db.example.com';                     // 数据库服务器
$dbuser = 'example.com';                 		// 数据库用户名
$dbpw = 'password';                             // 数据库密码
$dbname = 'example';
$dbh    = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array(
                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
                        PDO::MYSQL_ATTR_COMPRESS => true
                        )
                );
#print_r($dbh);

#$sql = "select id, mobile from members where id < :id";
#$sth = $dbh->prepare($sql);
#$sth->bindValue(':id',300);
#$sth->execute();
#$result = $sth->fetchAll();
#print_r($result);
#
#$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
#$sth = $dbh->prepare($sql);
#$sth->bindValue(':mobile', 'aa');
#$sth->bindValue(':id','272');
#echo $sth->execute();
#echo $sth->queryString;
#echo $sth->debugDumpParams();


$sql = "select id, mobile from members order by id asc"; // limit 1000";
$row = $dbh->query($sql);
while($members = $row->fetch(PDO::FETCH_ASSOC))
{
        #$order =  $account['order'];
        #printf("%s\n",$order);
        //print_r($members);
        $pool->submit(new Work($members));
		#unset($account['order']);
}

$pool->shutdown();

?>	

Summary of operating databases in multi-threads

In general, pthreads is still under development and there are still some shortcomings. We can also see that pthreads’ git is constantly improving the project

Persistent database connections are important, otherwise each thread will open a database connection once and then close it, resulting in many link timeouts.

<?php
$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(
    PDO::ATTR_PERSISTENT => true
));
?>

 

www.bkjia.comtruehttp: //www.bkjia.com/PHPjc/1093101.htmlTechArticleInstallation and use of php multi-threaded pthreads, php multi-threaded pthreads. Installing Pthreads basically requires recompiling PHP, plus --enable-maintainer-zts parameter, but this document is rarely used;...
Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn