Heim  >  Artikel  >  Backend-Entwicklung  >  Installation und Verwendung von PHP-Pthreads-Multithreading

Installation und Verwendung von PHP-Pthreads-Multithreading

高洛峰
高洛峰Original
2016-12-21 11:51:431332Durchsuche

Für die Installation von Pthreads ist grundsätzlich eine Neukompilierung von PHP und das Hinzufügen des Parameters --enable-maintainer-zts erforderlich. Es gibt jedoch nur sehr wenige Dokumente für die Verwendung dieses Programms und es treten viele Fehler und unerwartete Probleme auf, sodass die Produktionsumgebung nur ignoriert werden kann Spielen Sie einfach mit diesem Ding herum. Für echtes Multithreading müssen Sie immer noch Python, C usw. verwenden.

1. Die hier verwendete ist php-7.0.2

./configure \
--prefix=/usr/local/php7 \
--with-config-file-path=/etc \
--with-config-file-scan-dir=/etc/php.d \
--enable-debug \
--enable-maintainer-zts \
--enable-pcntl \
--enable-fpm \
--enable-opcache \
--enable-embed=shared \
--enable-json=shared \
--enable-phpdbg \
--with-curl=shared \
--with-mysql=/usr/local/mysql \
--with-mysqli=/usr/local/mysql/bin/mysql_config \
--with-pdo-mysql

make && make install

Pthreads installieren

pecl install pthreads

2. Thread

<?php
#1
$thread = new class extends Thread {
public function run() {
echo "Hello World {$this->getThreadId()}\n";
}
};
$thread->start() && $thread->join();
#2
class workerThread extends Thread {
public function __construct($i){
$this->i=$i;
}
public function run(){
while(true){
echo $this->i."\n";
sleep(1);
}
}
}
for($i=0;$i<50;$i++){
$workers[$i]=new workerThread($i);
$workers[$i]->start();
}
?>

3. Worker und Stackable

Stackables sind Aufgaben, die von Worker-Threads ausgeführt werden Sie können Stackable-Objekte vor, nach und während ihrer Ausführung synchronisieren, lesen und schreiben. Wird eine Mutex-Sperre verwendet? Es kann verwendet werden, wenn Sie mehrere Threads steuern müssen und nur ein Thread gleichzeitig arbeiten kann. Ein einfaches Zählerprogramm zur Veranschaulichung des Unterschieds mit oder ohne Mutex

<?php
class SQLQuery extends Stackable {
public function __construct($sql) {
$this->sql = $sql;
}
public function run() {
$dbh = $this->worker->getConnection();
$row = $dbh->query($this->sql);
while($member = $row->fetch(PDO::FETCH_ASSOC)){
print_r($member);
}
}
}
class ExampleWorker extends Worker {
public static $dbh;
public function __construct($name) {
}
public function run(){
self::$dbh = new PDO(&#39;mysql:host=10.0.0.30;dbname=testdb&#39;,&#39;root&#39;,&#39;123456&#39;);
}
public function getConnection(){
return self::$dbh;
}
}
$worker = new ExampleWorker("My Worker Thread");
$sql1 = new SQLQuery(&#39;select * from test order by id desc limit 1,5&#39;);
$worker->stack($sql1);
$sql2 = new SQLQuery(&#39;select * from test order by id desc limit 5,5&#39;);
$worker->stack($sql2);
$worker->start();
$worker->shutdown();
?>

Multithreading und Shared Memory

In Im Beispiel des gemeinsam genutzten Speichers wird keine Sperre verwendet und es kann sein, dass der Arbeitsspeichervorgang selbst die Funktion einer Sperre hat

<?php
$counter = 0;
$handle=fopen("/tmp/counter.txt", "w");
fwrite($handle, $counter );
fclose($handle);
class CounterThread extends Thread {
public function __construct($mutex = null){
$this->mutex = $mutex;
$this->handle = fopen("/tmp/counter.txt", "w+");
}
public function __destruct(){
fclose($this->handle);
}
public function run() {
if($this->mutex)
$locked=Mutex::lock($this->mutex);
$counter = intval(fgets($this->handle));
$counter++;
rewind($this->handle);
fputs($this->handle, $counter );
printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
if($this->mutex)
Mutex::unlock($this->mutex);
}
}
//没有互斥锁
for ($i=0;$i<50;$i++){
$threads[$i] = new CounterThread();
$threads[$i]->start();
}
//加入互斥锁
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
$threads[$i] = new CounterThread($mutex);
$threads[$i]->start();
}
Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
$threads[$i]->join();
}
Mutex::destroy($mutex);
?>

5 . Thread-Synchronisierung

In einigen Szenarien möchten wir nicht, dass thread->start() das Programm ausführt, sondern dass der Thread auf unseren Befehl wartet. Die Testfunktion von thread->wait(); besteht darin, dass der Thread nicht sofort nach thread->start() ausgeführt wird. Er wird erst ausgeführt, nachdem das Signal von thread->notify();

<?php
$tmp = tempnam(__FILE__, &#39;PHP&#39;);
$key = ftok($tmp, &#39;a&#39;);
$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );
class CounterThread extends Thread {
public function __construct($shmid){
$this->shmid = $shmid;
}
public function run() {
$counter = shm_get_var( $this->shmid, 1 );
$counter++;
shm_put_var( $this->shmid, 1, $counter );
printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
}
}
for ($i=0;$i<100;$i++){
$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
$threads[$i]->start();
}
for ($i=0;$i<100;$i++){
$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>

6. Thread-Pool

Eine Pool-Klasse


<?php
$tmp = tempnam(__FILE__, &#39;PHP&#39;);
$key = ftok($tmp, &#39;a&#39;);
$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );
class CounterThread extends Thread {
public function __construct($shmid){
$this->shmid = $shmid;
}
public function run() {
$this->synchronized(function($thread){
$thread->wait();
}, $this);
$counter = shm_get_var( $this->shmid, 1 );
$counter++;
shm_put_var( $this->shmid, 1, $counter );
printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
}
}
for ($i=0;$i<100;$i++){
$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
$threads[$i]->start();
}
for ($i=0;$i<100;$i++){
$threads[$i]->synchronized(function($thread){
$thread->notify();
}, $threads[$i]);
}
for ($i=0;$i<100;$i++){
$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>

Dynamischer Warteschlangen-Thread-Pool

Das obige Beispiel dient dazu, den Thread-Pool auszuführen, sobald er voll ist. Das folgende Beispiel dient dazu, einen neuen Thread zu erstellen, sobald freier Speicherplatz im Thread vorhanden ist Pool.


<?php
class Update extends Thread {
public $running = false;
public $row = array();
public function __construct($row) {
$this->row = $row;
$this->sql = null;
}
public function run() {
if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
}else{
$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
file_put_contents("bankno_error.log", $error, FILE_APPEND);
}
if( strlen($bankno) > 7 ){
$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);
$this->sql = $sql;
}
printf("%s\n",$this->sql);
}
}
class Pool {
public $pool = array();
public function __construct($count) {
$this->count = $count;
}
public function push($row){
if(count($this->pool) < $this->count){
$this->pool[] = new Update($row);
return true;
}else{
return false;
}
}
public function start(){
foreach ( $this->pool as $id => $worker){
$this->pool[$id]->start();
}
}
public function join(){
foreach ( $this->pool as $id => $worker){
$this->pool[$id]->join();
}
}
public function clean(){
foreach ( $this->pool as $id => $worker){
if(! $worker->isRunning()){
unset($this->pool[$id]);
}
}
}
}
try {
$dbh = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true
)
);
$sql = "select id,bankno from members order by id desc";
$row = $dbh->query($sql);
$pool = new Pool(5);
while($member = $row->fetch(PDO::FETCH_ASSOC))
{
while(true){
if($pool->push($member)){ //压入任务到池中
break;
}else{ //如果池已经满,就开始启动线程
$pool->start();
$pool->join();
$pool->clean();
}
}
}
$pool->start();
$pool->join();
$dbh = null;
} catch (Exception $e) {
echo &#39;[&#39; , date(&#39;H:i:s&#39;) , &#39;]&#39;, &#39;系统错误&#39;, $e->getMessage(), "\n";
}
?>

pthreads Pool-Klasse


7. Multithreaded Dateien sicher lesen und schreiben

<?php
class Update extends Thread {
public $running = false;
public $row = array();
public function __construct($row) {
$this->row = $row;
$this->sql = null;
//print_r($this->row);
}
public function run() {
if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
}else{
$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
file_put_contents("bankno_error.log", $error, FILE_APPEND);
}
if( strlen($bankno) > 7 ){
$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);
$this->sql = $sql;
}
printf("%s\n",$this->sql);
}
}
try {
$dbh = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true
)
);
$sql = "select id,bankno from members order by id desc limit 50";
$row = $dbh->query($sql);
$pool = array();
while($member = $row->fetch(PDO::FETCH_ASSOC))
{
$id = $member[&#39;id&#39;];
while (true){
if(count($pool) < 5){
$pool[$id] = new Update($member);
$pool[$id]->start();
break;
}else{
foreach ( $pool as $name => $worker){
if(! $worker->isRunning()){
unset($pool[$name]);
}
}
}
}
}
$dbh = null;
} catch (Exception $e) {
echo &#39;【&#39; , date(&#39;H:i:s&#39;) , &#39;】&#39;, &#39;【系统错误】&#39;, $e->getMessage(), "\n";
}
?>
LOCK_SH Gemeinsame Sperre erwerben (Leseprogramm)

LOCK_EX Exklusive Sperre erwerben (Schreibprogramm)

LOCK_UN Sperre freigeben (ob gemeinsam genutzt oder exklusiv). )

<?php
class WebWorker extends Worker {
public function __construct(SafeLog $logger) {
$this->logger = $logger;
}
protected $loger;
}
class WebWork extends Stackable {
public function isComplete() {
return $this->complete;
}
public function run() {
$this->worker
->logger
->log("%s executing in Thread #%lu",
__CLASS__, $this->worker->getThreadId());
$this->complete = true;
}
protected $complete;
}
class SafeLog extends Stackable {
protected function log($message, $args = []) {
$args = func_get_args();
if (($message = array_shift($args))) {
echo vsprintf(
"{$message}\n", $args);
}
}
}
$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);
$pool->submit($w=new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->shutdown();
$pool->collect(function($work){
return $work->isComplete();
});
var_dump($pool);

LOCK_NB Wenn Sie nicht möchten, dass flock() blockiert, wenn es gesperrt ist


8. Multithreading und Datenverbindung

Bei gleichzeitiger Verwendung von pthreads und pdo müssen Sie statisch public static $dbh deklarieren und über den Singleton-Modus auf die Datenbankverbindung zugreifen Worker und PDO<.>


<?php
$fp = fopen("/tmp/lock.txt", "r+");
if (flock($fp, LOCK_EX)) { // 进行排它型锁定
ftruncate($fp, 0); // truncate file
fwrite($fp, "Write something here\n");
fflush($fp); // flush output before releasing the lock
flock($fp, LOCK_UN); // 释放锁定
} else {
echo "Couldn&#39;t get the lock!";
}
fclose($fp);
$fp = fopen(&#39;/tmp/lock.txt&#39;, &#39;r+&#39;);
if(!flock($fp, LOCK_EX | LOCK_NB)) {
echo &#39;Unable to obtain lock&#39;;
exit(-1);
}
fclose($fp);
?>
Pool und PDO

Verknüpfen Sie die Datenbank im Thread-Pool



Um das obige Programm weiter zu verbessern, verwenden wir den Singleton-Modus $this->worker->getInstance(); um nur global eine Datenbankverbindung herzustellen, und die Thread verwendet eine gemeinsame Datenbankverbindung

<?php
class Work extends Stackable {
public function __construct() {
}
public function run() {
$dbh = $this->worker->getConnection();
$sql = "select id,name from members order by id desc limit ";
$row = $dbh->query($sql);
while($member = $row->fetch(PDO::FETCH_ASSOC)){
print_r($member);
}
}
}
class ExampleWorker extends Worker {
public static $dbh;
public function __construct($name) {
}
/*
* The run method should just prepare the environment for the work that is coming ...
*/
public function run(){
self::$dbh = new PDO(&#39;mysql:host=...;dbname=example&#39;,&#39;www&#39;,&#39;&#39;);
}
public function getConnection(){
return self::$dbh;
}
}
$worker = new ExampleWorker("My Worker Thread");
$work=new Work();
$worker->stack($work);
$worker->start();
$worker->shutdown();
?>

Zusammenfassung der Datenbankoperationen in Multithreads


Im Allgemeinen ist pthreads immer noch in der Entwicklung und es gibt noch einige Mängel. Sie können auch sehen, dass der Git von pthreads dieses Projekt ständig verbessert.

# cat pool.php
<?php
class ExampleWorker extends Worker {
public function __construct(Logging $logger) {
$this->logger = $logger;
}
protected $logger;
}
/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
public function __construct($number) {
$this->number = $number;
}
public function run() {
$dbhost = &#39;db.example.com&#39;; // 数据库服务器
$dbuser = &#39;example.com&#39;; // 数据库用户名
$dbpw = &#39;password&#39;; // 数据库密码
$dbname = &#39;example_real&#39;;
$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true,
PDO::ATTR_PERSISTENT => true
)
);
$sql = "select OPEN_TIME, `COMMENT` from MT_TRADES where LOGIN=&#39;".$this->number[&#39;name&#39;]."&#39; and CMD=&#39;&#39; and `COMMENT` = &#39;".$this->number[&#39;order&#39;].":DEPOSIT&#39;";
#echo $sql;
$row = $dbh->query($sql);
$mt_trades = $row->fetch(PDO::FETCH_ASSOC);
if($mt_trades){
$row = null;
$sql = "UPDATE db_example.accounts SET paystatus=&#39;成功&#39;, deposit_time=&#39;".$mt_trades[&#39;OPEN_TIME&#39;]."&#39; where `order` = &#39;".$this->number[&#39;order&#39;]."&#39;;";
$dbh->query($sql);
#printf("%s\n",$sql);
}
$dbh = null;
printf("runtime: %s, %s, %s\n", date(&#39;Y-m-d H:i:s&#39;), $this->worker->getThreadId() ,$this->number[&#39;order&#39;]);
}
}
class Logging extends Stackable {
protected static $dbh;
public function __construct() {
$dbhost = &#39;db.example.com&#39;; // 数据库服务器
$dbuser = &#39;example.com&#39;; // 数据库用户名
$dbpw = &#39;password&#39;; // 数据库密码
$dbname = &#39;example_real&#39;; // 数据库名
self::$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true
)
);
}
protected function log($message, $args = []) {
$args = func_get_args();
if (($message = array_shift($args))) {
echo vsprintf("{$message}\n", $args);
}
}
protected function getConnection(){
return self::$dbh;
}
}
$pool = new Pool(, \ExampleWorker::class, [new Logging()]);
$dbhost = &#39;db.example.com&#39;; // 数据库服务器
$dbuser = &#39;example.com&#39;; // 数据库用户名
$dbpw = &#39;password&#39;; // 数据库密码
$dbname = &#39;db_example&#39;;
$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true
)
);
$sql = "select `order`,name from accounts where deposit_time is null order by id desc";
$row = $dbh->query($sql);
while($account = $row->fetch(PDO::FETCH_ASSOC))
{
$pool->submit(new Work($account));
}
$pool->shutdown();
?> 
Persistente Datenbankverbindungen sind sehr wichtig, da sonst jeder Thread eine Datenbankverbindung öffnet und dann schließt es, was viele Link-Timeouts verursachen wird

<?php
class ExampleWorker extends Worker {
#public function __construct(Logging $logger) {
# $this->logger = $logger;
#}
#protected $logger;
protected static $dbh;
public function __construct() {
}
public function run(){
$dbhost = &#39;db.example.com&#39;; // 数据库服务器
$dbuser = &#39;example.com&#39;; // 数据库用户名
$dbpw = &#39;password&#39;; // 数据库密码
$dbname = &#39;example&#39;; // 数据库名
self::$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true,
PDO::ATTR_PERSISTENT => true
)
);
}
protected function getInstance(){
return self::$dbh;
}
}
/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
public function __construct($data) {
$this->data = $data;
#print_r($data);
}
public function run() {
#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );
try {
$dbh = $this->worker->getInstance();
#print_r($dbh);
$id = $this->data[&#39;id&#39;];
$mobile = safenet_decrypt($this->data[&#39;mobile&#39;]);
#printf("%d, %s \n", $id, $mobile);
if(strlen($mobile) > ){
$mobile = substr($mobile, -);
}
if($mobile == &#39;null&#39;){
# $sql = "UPDATE members_digest SET mobile = &#39;".$mobile."&#39; where id = &#39;".$id."&#39;";
# printf("%s\n",$sql);
# $dbh->query($sql);
$mobile = &#39;&#39;;
$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
}else{
$sql = "UPDATE members_digest SET mobile = md(:mobile) where id = :id";
}
$sth = $dbh->prepare($sql);
$sth->bindValue(&#39;:mobile&#39;, $mobile);
$sth->bindValue(&#39;:id&#39;, $id);
$sth->execute();
#echo $sth->debugDumpParams();
}
catch(PDOException $e) {
$error = sprintf("%s,%s\n", $mobile, $id );
file_put_contents("mobile_error.log", $error, FILE_APPEND);
}
#$dbh = null;
printf("runtime: %s, %s, %s, %s\n", date(&#39;Y-m-d H:i:s&#39;), $this->worker->getThreadId() ,$mobile, $id);
#printf("runtime: %s, %s\n", date(&#39;Y-m-d H:i:s&#39;), $this->number);
}
}
$pool = new Pool(, \ExampleWorker::class, []);
#foreach (range(, ) as $number) {
# $pool->submit(new Work($number));
#}
$dbhost = &#39;db.example.com&#39;; // 数据库服务器
$dbuser = &#39;example.com&#39;; // 数据库用户名
$dbpw = &#39;password&#39;; // 数据库密码
$dbname = &#39;example&#39;;
$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true
)
);
#print_r($dbh);
#$sql = "select id, mobile from members where id < :id";
#$sth = $dbh->prepare($sql);
#$sth->bindValue(&#39;:id&#39;,);
#$sth->execute();
#$result = $sth->fetchAll();
#print_r($result);
#
#$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
#$sth = $dbh->prepare($sql);
#$sth->bindValue(&#39;:mobile&#39;, &#39;aa&#39;);
#$sth->bindValue(&#39;:id&#39;,&#39;&#39;);
#echo $sth->execute();
#echo $sth->queryString;
#echo $sth->debugDumpParams();
$sql = "select id, mobile from members order by id asc"; // limit ";
$row = $dbh->query($sql);
while($members = $row->fetch(PDO::FETCH_ASSOC))
{
#$order = $account[&#39;order&#39;];
#printf("%s\n",$order);
//print_r($members);
$pool->submit(new Work($members));
#unset($account[&#39;order&#39;]);
}
$pool->shutdown();
?>
Ich werde Ihnen hier das relevante Wissen über die Installation und Verwendung von PHP-Pthreads-Multithreading vorstellen Weitere Artikel zur Installation und Verwendung von PHP-Pthreads-Multithreading finden Sie auf der chinesischen PHP-Website

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn