搜尋
首頁後端開發php教程php協成實現的詳解(附程式碼)

這篇文章帶給大家的內容是關於php協成實現的詳解(附程式碼),有一定的參考價值,有需要的朋友可以參考一下,希望對你有幫助。

實作 PHP 協程需要了解的基本內容。

多進程/執行緒

最早的伺服器端程式都是透過多進程、多執行緒來解決並發IO的問題。進程模型出現的最早,從Unix 系統誕生就開始有了進程的概念。最早的伺服器端程式一般都是 Accept 一個客戶端連線就建立一個進程,然後子進程進入循環同步阻塞地與客戶端連線進行交互,收發處理資料。

多執行緒模式出現要晚一些,執行緒與進程相比更輕量,而且執行緒之間共享記憶體堆疊,所以不同的執行緒之間互動非常容易實現。例如實現一個聊天室,客戶端連線之間可以交互,聊天室​​中的玩家可以任意的其他人發送訊息。用多執行緒模式實作非常簡單,在線程中可以直接向某一個客戶端連線發送資料。而多進程模式就要用到管道、訊息佇列、共享記憶體等等統稱進程間通訊(IPC)複雜的技術才能實現。

最簡單的多進程服務端模型

$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr)
or die("Create server failed");
while(1) {	
$conn = stream_socket_accept($serv);	
if (pcntl_fork() == 0) {	
	$request = fread($conn);		
	// do something		
	// $response = "hello world";		
	fwrite($response);		
	fclose($conn);		
	exit(0);	
	}
}

#多進程/執行緒模型的流程是:

建立一個socket,綁定伺服器連接埠(bind),監聽埠(listen),在PHP 中用stream_socket_server 一個函數就能完成上面3 個步驟,當然也可以使用更底層的sockets 擴充來分別實現。

進入 while 循環,阻塞在 accept 操作上,等待客戶端連線進入。此時程式會進入睡眠狀態,直到有新的客戶端發起 connect 到伺服器,作業系統會喚醒此進程。 accept 函數傳回客戶端連線的 socket 主進程在多進程模型下透過 fork(php: pcntl_fork)建立子進程,多執行緒模型下使用 pthread_create(php: new Thread)建立子執行緒。

下文如無特殊宣告將使用進程同時表示進程/執行緒。

子程序建立成功後進入 while 循環,阻塞在 recv(php:fread)呼叫上,等待客戶端傳送資料到伺服器。收到資料後伺服器程式進行處理然後使用 send(php: fwrite)向客戶端發送回應。長連線的服務會持續與客戶端交互,而短連線服務一般收到回應就會 close。

當客戶端連線關閉時,子程序退出並銷毀所有資源,主程序會回收此子程序。

php協成實現的詳解(附程式碼)

這種模式最大的問題是,進程建立和銷毀的開銷很大。所以上面的模式沒辦法應用在非常繁忙的伺服器程式。對應的改進版解決了這個問題,這就是經典的 Leader-Follower 模型。

$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr)
or die("Create server failed");
for($i = 0; $i < 32; $i++) {
if (pcntl_fork() == 0)
 {
 while(1) {
 $conn = stream_socket_accept($serv);
 if ($conn == false) continue;
 // do something
 $request = fread($conn);
 // $response = "hello world";
 fwrite($response);
 fclose($conn);
 }
 exit(0);
 }
 }

它的特點是程式啟動後就會建立 N 個進程。每個子程序進入 Accept,等待新的連線進入。當客戶端連接到伺服器時,其中一個子程序會被喚醒,開始處理客戶端請求,並且不再接受新的 TCP 連線。當此連線關閉時,子程序會釋放,重新進入 Accept,參與處理新的連線。

這個模型的優點是完全可以重複使用進程,沒有額外消耗,效能非常好。很多常見的伺服器程式都是基於此模型的,例如 Apache、PHP-FPM。

多進程模型也有一些缺點。

這種模型嚴重依賴進程的數量來解決並發問題,一個客戶端連線就需要佔用一個進程,工作進程的數量有多少,並發處理能力就有多少。作業系統可以建立的進程數量是有限的。

啟動大量進程會帶來額外的進程排程消耗。數百個進程時可能進程上下文切換調度消耗佔 CPU 不到 1% 可以忽略不計,如果啟動數千甚至數萬個進程,消耗就會直線上升。調度消耗可能占到 CPU 的百分之幾十甚至 100%。

並行和並發

談到多進程以及類似同時執行多個任務的模型,就不得不先談談並行和並發。

並發(Concurrency)

是指能處理多個同時活動的能力,並發事件之間不一定要在同一時刻發生。

並行(Parallesim)

是指同時刻發生的兩個並發事件,具有並發的意義,但並發不一定並行。

區別

『並發』指的是程式的結構,『並行』指的是程式運行時的狀態

『並行』一定是並發的,『並行』是『並行』設計的一種

單執行緒永遠無法達到『平行』狀態

正確的並發設計的標準是:

使多個操作可以在重疊的時間段內進行。
two tasks can start, run, and complete in overlapping time periods

迭代器 & 生成器

在了解 PHP 协程前,还有 迭代器 和 生成器 这两个概念需要先认识一下。

迭代器

PHP5 开始内置了 Iterator 即迭代器接口,所以如果你定义了一个类,并实现了Iterator 接口,那么你的这个类对象就是 ZEND_ITER_OBJECT 即可迭代的,否则就是 ZEND_ITER_PLAIN_OBJECT。

对于 ZEND_ITER_PLAIN_OBJECT 的类,foreach 会获取该对象的默认属性数组,然后对该数组进行迭代。

而对于 ZEND_ITER_OBJECT 的类对象,则会通过调用对象实现的 Iterator 接口相关函数来进行迭代。

任何实现了 Iterator 接口的类都是可迭代的,即都可以用 foreach 语句来遍历。

Iterator 接口

interface Iterator extends Traversable
{	
// 获取当前内部标量指向的元素的数据
public mixed current()	
// 获取当前标量
public scalar key()	
// 移动到下一个标量
public void next()	
// 重置标量
public void rewind()	
// 检查当前标量是否有效
public boolean valid()
}

常规实现 range 函数

PHP 自带的 range 函数原型:

range — 根据范围创建数组,包含指定的元素

array range (mixed $start , mixed $end [, number $step = 1 ])

建立一个包含指定范围单元的数组。

在不使用迭代器的情况要实现一个和 PHP 自带的 range 函数类似的功能,可能会这么写:

function range ($start, $end, $step = 1){
$ret = [];
for ($i = $start; $i <= $end; $i += $step) {
$ret[] = $i;
}
return $ret;
}

需要将生成的所有元素放在内存数组中,如果需要生成一个非常大的集合,则会占用巨大的内存。

迭代器实现 xrange 函数

来看看迭代实现的 range,我们叫做 xrange,他实现了 Iterator 接口必须的 5 个方法:

class Xrange implements Iterator
{
protected $start;
protected $limit;
protected $step;
protected $current;
public function __construct($start, $limit, $step = 1){
$this->start = $start;
$this->limit = $limit;
$this->step  = $step;
}
public function rewind(){
$this->current = $this->start;
}
public function next(){
$this->current += $this->step;
}
public function current(){
return $this->current;
}
public function key(){
return $this->current + 1;
}
public function valid(){
return $this->current <= $this->limit;
}
}

使用时代码如下:

foreach (new Xrange(0, 9) as $key => $val)
 {
   echo $key, &#39; &#39;, $val, "\n";
 }

   

输出:

0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9

看上去功能和 range() 函数所做的一致,不同点在于迭代的是一个 对象(Object) 而不是数组:

var_dump(new Xrange(0, 9));

输出:

object(Xrange)#1 (4) {
["start":protected]=>
int(0)
["limit":protected]=>
int(9)
["step":protected]=>
int(1)
["current":protected]=>NULL
}

另外,内存的占用情况也完全不同:

// range
$startMemory = memory_get_usage();
$arr = range(0, 500000);
echo &#39;range(): &#39;, memory_get_usage() - $startMemory, " bytes\n";
unset($arr);
// xrange
$startMemory = memory_get_usage();
$arr = new Xrange(0, 500000);
echo &#39;xrange(): &#39;, memory_get_usage() - $startMemory, " bytes\n";

输出:

xrange(): 624 bytes
range(): 72194784 bytes

range() 函数在执行后占用了 50W 个元素内存空间,而 xrange 对象在整个迭代过程中只占用一个对象的内存。

Yii2 Query

在喜闻乐见的各种 PHP 框架里有不少生成器的实例,比如 Yii2 中用来构建 SQL 语句的 \yii\db\Query 类:

$query = (new \yii\db\Query)->from(&#39;user&#39;);
// yii\db\BatchQueryResult
foreach ($query->batch() as $users) {
// 每次循环得到多条 user 记录
}

来看一下 batch() 做了什么:

/**
* Starts a batch query.
*
* A batch query supports fetching data in batches, which can keep the memory usage under a limit.
* This method will return a [[BatchQueryResult]] object which implements the [[\Iterator]] interface
* and can be traversed to retrieve the data in batches.
*
* For example,
*
*
* $query = (new Query)->from(&#39;user&#39;);
* foreach ($query->batch() as $rows) {
*     // $rows is an array of 10 or fewer rows from user table
* }
*
*
* @param integer $batchSize the number of records to be fetched in each batch.
* @param Connection $db the database connection. If not set, the "db" application component will be used.
* @return BatchQueryResult the batch query result. It implements the [[\Iterator]] interface
* and can be traversed to retrieve the data in batches.
*/
public function batch($batchSize = 100, $db = null){
return Yii::createObject([
&#39;class&#39; => BatchQueryResult::className(),
&#39;query&#39; => $this,
&#39;batchSize&#39; => $batchSize,
&#39;db&#39; => $db,
&#39;each&#39; => false,
]);
}

实际上返回了一个 BatchQueryResult 类,类的源码实现了 Iterator 接口 5  个关键方法:

class BatchQueryResult extends Object implements \Iterator
{
public $db;
public $query;
public $batchSize = 100;
public $each = false;
private $_dataReader;
private $_batch;
private $_value;
private $_key;
/**
* Destructor.
*/
public function __destruct()
{
// make sure cursor is closed
$this->reset();
}
/**
* Resets the batch query.
* This method will clean up the existing batch query so that a new batch query can be performed.
*/
public function reset(){
if ($this->_dataReader !== null)
{
$this->_dataReader->close();
}
$this->_dataReader = null;
$this->_batch = null;
$this->_value = null;
$this->_key = null;
}
/*
*
* Resets the iterator to the initial state.
* This method is required by the interface [[\Iterator]].
*/
public function rewind()
{
$this->reset();
$this->next();
}
/**
* Moves the internal pointer to the next dataset.
* This method is required by the interface [[\Iterator]].
*/
public function next()
{
if ($this->_batch === null || !$this->each || $this->each && next($this->_batch) === false) 
{
$this->_batch = $this->fetchData();
reset($this->_batch);
}
if ($this->each) 
{
$this->_value = current($this->_batch);
if ($this->query->indexBy !== null) 
{
$this->_key = key($this->_batch);
} 
elseif (key($this->_batch) !== null) 
{
$this->_key++;
} else {
$this->_key = null;
}
} else {
$this->_value = $this->_batch;
$this->_key = $this->_key === null ? 0 : $this->_key + 1;
}
}
/**
* Fetches the next batch of data.
* @return array the data fetched
*/
protected function fetchData()
{
// ...
}
/**
* Returns the index of the current dataset.
* This method is required by the interface [[\Iterator]].
* @return integer the index of the current row.
*/
public function key()
{
return $this->_key;
}
/**
* Returns the current dataset.
* This method is required by the interface [[\Iterator]].
* @return mixed the current dataset.
*/
public function current()
{
return $this->_value;
}
/**
* Returns whether there is a valid dataset at the current position.
* This method is required by the interface [[\Iterator]].
* @return boolean whether there is a valid dataset at the current position.
*/
public function valid()
{
return !empty($this->_batch);
}
}

以迭代器的方式实现了类似分页取的效果,同时避免了一次性取出所有数据占用太多的内存空间。

迭代器使用场景

使用返回迭代器的包或库时(如 PHP5 中的 SPL 迭代器)

无法在一次调用获取所需的所有元素时

要处理数量巨大的元素时(数据库中要处理的结果集内容超过内存)

生成器

需要 PHP 5 >= 5.5.0 或 PHP 7

虽然迭代器仅需继承接口即可实现,但毕竟需要定义一整个类然后实现接口的所有方法,实在是不怎么方便。

生成器则提供了一种更简单的方式来实现简单的对象迭代,相比定义类来实现 Iterator 接口的方式,性能开销和复杂度大大降低。

PHP Manual

生成器允许在 foreach 代码块中迭代一组数据而不需要创建任何数组。一个生成器函数,就像一个普通的有返回值的自定义函数类似,但普通函数只返回一次, 而生成器可以根据需要通过 yield 关键字返回多次,以便连续生成需要迭代返回的值。

一个最简单的例子就是使用生成器来重新实现 xrange() 函数。效果和上面我们用迭代器实现的差不多,但实现起来要简单的多。

生成器实现 xrange 函数

function xrange($start, $limit, $step = 1) 
{
for ($i = 0; $i < $limit; $i += $step) {
yield $i + 1 => $i;
}
}
foreach (xrange(0, 9) as $key => $val) {
printf("%d %d \n", $key, $val);
}
// 输出
// 1 0
// 2 1
// 3 2
// 4 3
// 5 4
// 6 5
// 7 6
// 8 7
// 9 8

实际上生成器生成的正是一个迭代器对象实例,该迭代器对象继承了 Iterator 接口,同时也包含了生成器对象自有的接口,具体可以参考 Generator 类的定义以及语法参考。

同时需要注意的是:

一个生成器不可以返回值,这样做会产生一个编译错误。然而 return 空是一个有效的语法并且它将会终止生成器继续执行。

yield 关键字

需要注意的是 yield 关键字,这是生成器的关键。通过上面的例子可以看出,yield 会将当前产生的值传递给 foreach,换句话说,foreach 每一次迭代过程都会从 yield 处取一个值,直到整个遍历过程不再能执行到 yield 时遍历结束,此时生成器函数简单的退出,而调用生成器的上层代码还可以继续执行,就像一个数组已经被遍历完了。

yield 最简单的调用形式看起来像一个 return 申明,不同的是 yield 暂停当前过程的执行并返回值,而 return 是中断当前过程并返回值。暂停当前过程,意味着将处理权转交由上一级继续进行,直到上一级再次调用被暂停的过程,该过程又会从上一次暂停的位置继续执行。这像是什么呢?如果之前已经在鸟哥的文章中粗略看过,应该知道这很像操作系统的进程调度,多个进程在一个 CPU 核心上执行,在系统调度下每一个进程执行一段指令就被暂停,切换到下一个进程,这样外部用户看起来就像是同时在执行多个任务。

但仅仅如此还不够,yield 除了可以返回值以外,还能接收值,也就是可以在两个层级间实现双向通信。

来看看如何传递一个值给 yield:

function printer()
{
while (true) 
{
printf("receive: %s\n", yield);
}
}
$printer = printer();
$printer->send(&#39;hello&#39;);
$printer->send(&#39;world&#39;);
// 输出
receive: hello
receive: world

根据 PHP 官方文档的描述可以知道 Generator 对象除了实现 Iterator 接口中的必要方法以外,还有一个 send 方法,这个方法就是向 yield 语句处传递一个值,同时从 yield 语句处继续执行,直至再次遇到 yield 后控制权回到外部。

既然 yield 可以在其位置中断并返回或者接收一个值,那能不能同时进行接收和返回呢?当然,这也是实现协程的根本。对上述代码做出修改:

function printer()
{
$i = 0;
while (true) {
printf("receive: %s\n", (yield ++$i));
}
}
$printer = printer();
printf("%d\n", $printer->current());
$printer->send(&#39;hello&#39;);
printf("%d\n", $printer->current());
$printer->send(&#39;world&#39;);
printf("%d\n", $printer->current());
// 输出
1
receive: hello
2
receive: world
3

这是另一个例子:

function gen() 
{
$ret = (yield &#39;yield1&#39;);
var_dump($ret);
$ret = (yield &#39;yield2&#39;);
var_dump($ret);
}
$gen = gen();
var_dump($gen->current());   // string(6) "yield1"
var_dump($gen->send(&#39;ret1&#39;)); // string(4) "ret1"   (第一个 var_dump)
                              // string(6) "yield2" (继续执行到第二个 yield,吐出了返回值)
var_dump($gen->send(&#39;ret2&#39;)); // string(4) "ret2"   (第二个 var_dump)
                              // NULL (var_dump 之后没有其他语句,所以这次 ->send() 的返回值为 null)

current 方法是迭代器 Iterator 接口必要的方法,foreach 语句每一次迭代都会通过其获取当前值,而后调用迭代器的 next 方法。在上述例子里则是手动调用了 current 方法获取值。

上述例子已经足以表示 yield 能够作为实现双向通信的工具,也就是具备了后续实现协程的基本条件。

上面的例子如果第一次接触并稍加思考,不免会疑惑为什么一个 yield 既是语句又是表达式,而且这两种情况还同时存在:

对于所有在生成器函数中出现的 yield,首先它都是语句,而跟在 yield 后面的任何表达式的值将作为调用生成器函数的返回值,如果 yield 后面没有任何表达式(变量、常量都是表达式),那么它会返回 NULL,这一点和 return 语句一致。

yield 也是表达式,它的值就是 send 函数传过来的值(相当于一个特殊变量,只不过赋值是通过 send 函数进行的)。只要调用send方法,并且生成器对象的迭代并未终结,那么当前位置的 yield 就会得到 send 方法传递过来的值,这和生成器函数有没有把这个值赋值给某个变量没有任何关系。

这个地方可能需要仔细品味上面两个 send() 方法的例子才能理解。但可以简单的记住:

任何时候 yield 关键词即是语句:可以为生成器函数返回值;也是表达式:可以接收生成器对象发过来的值。

除了 send() 方法,还有一种控制生成器执行的方法是 next() 函数:

Next(),恢复生成器函数的执行直到下一个 yield

Send(),向生成器传入一个值,恢复执行直到下一个 yield

协程

对于单核处理器,多进程实现多任务的原理是让操作系统给一个任务每次分配一定的 CPU 时间片,然后中断、让下一个任务执行一定的时间片接着再中断并继续执行下一个,如此反复。由于切换执行任务的速度非常快,给外部用户的感受就是多个任务的执行是同时进行的。

多进程的调度是由操作系统来实现的,进程自身不能控制自己何时被调度,也就是说:

进程的调度是由外层调度器抢占式实现的

而协程要求当前正在运行的任务自动把控制权回传给调度器,这样就可以继续运行其他任务。这与『抢占式』的多任务正好相反, 抢占多任务的调度器可以强制中断正在运行的任务, 不管它自己有没有意愿。『协作式多任务』在 Windows 的早期版本 (windows95) 和 Mac OS 中有使用, 不过它们后来都切换到『抢占式多任务』了。理由相当明确:如果仅依靠程序自动交出控制的话,那么一些恶意程序将会很容易占用全部 CPU 时间而不与其他任务共享。

协程的调度是由协程自身主动让出控制权到外层调度器实现的

回到刚才生成器实现 xrange 函数的例子,整个执行过程的交替可以用下图来表示:

php協成實現的詳解(附程式碼)

协程可以理解为纯用户态的线程,通过协作而不是抢占来进行任务切换。相对于进程或者线程,协程所有的操作都可以在用户态而非操作系统内核态完成,创建和切换的消耗非常低。

简单的说 Coroutine(协程) 就是提供一种方法来中断当前任务的执行,保存当前的局部变量,下次再过来又可以恢复当前局部变量继续执行。

我们可以把大任务拆分成多个小任务轮流执行,如果有某个小任务在等待系统 IO,就跳过它,执行下一个小任务,这样往复调度,实现了 IO 操作和 CPU 计算的并行执行,总体上就提升了任务的执行效率,这也便是协程的意义。

PHP 协程和 yield

PHP 从 5.5 开始支持生成器及 yield 关键字,而 PHP 协程则由 yield 来实现。

要理解协程,首先要理解:代码是代码,函数是函数。函数包裹的代码赋予了这段代码附加的意义:不管是否显式的指明返回值,当函数内的代码块执行完后都会返回到调用层。而当调用层调用某个函数的时候,必须等这个函数返回,当前函数才能继续执行,这就构成了后进先出,也就是 Stack。

而协程包裹的代码,不是函数,不完全遵守函数的附加意义,协程执行到某个点,协会协程会 yield 返回一个值然后挂起,而不是 return 一个值然后结束,当再次调用协程的时候,会在上次 yield 的点继续执行。

所以协程违背了通常操作系统和 x86 的 CPU 认定的代码执行方式,也就是 Stack 的这种执行方式,需要运行环境(比如 php,python 的 yield 和 golang 的 goroutine)自己调度,来实现任务的中断和恢复,具体到 PHP,就是靠 yield 来实现。

堆栈式调用 和 协程调用的对比:

php協成實現的詳解(附程式碼)

结合之前的例子,可以总结一下 yield 能做的就是:

实现不同任务间的主动让位、让行,把控制权交回给任务调度器。

通过 send() 实现不同任务间的双向通信,也就可以实现任务和调度器之间的通信。

yield 就是 PHP 实现协程的方式。

协程多任务调度

下面是雄文 Cooperative multitasking using coroutines (in PHP!) 里一个简单但完整的例子,来展示如何具体的在 PHP 里实现协程任务的调度。

首先是一个任务类:

Task

class Task
{
// 任务 IDprotected 
$taskId;
// 协程对象protected 
$coroutine;
// send() 值protected 
$sendVal = null;
// 是否首次 yieldprotected 
$beforeFirstYield = true;
public function __construct($taskId, Generator $coroutine) {
$this->taskId = $taskId;
$this->coroutine = $coroutine;
}
public function getTaskId() {
return $this->taskId;
}
public function setSendValue($sendVal) {
$this->sendVal = $sendVal;
}
public function run() {
// 如之前提到的在send之前, 当迭代器被创建后第一次 yield 之前,一个 renwind() 方法会被隐式调用
// 所以实际上发生的应该类似:
// $this->coroutine->rewind();
// $this->coroutine->send();
// 这样 renwind 的执行将会导致第一个 yield 被执行, 并且忽略了他的返回值.
// 真正当我们调用 yield 的时候, 我们得到的是第二个yield的值,导致第一个yield的值被忽略。
// 所以这个加上一个是否第一次 yield 的判断来避免这个问题
if ($this->beforeFirstYield) {
$this->beforeFirstYield = false;
return $this->coroutine->current();
} else {
$retval = $this->coroutine->send($this->sendVal);
$this->sendVal = null;
return $retval;
}
}
public function isFinished() {
return !$this->coroutine->valid();
}
}

接下来是调度器,比 foreach 是要复杂一点,但好歹也能算个正儿八经的 Scheduler :)

Scheduler

class Scheduler
{
protected $maxTaskId = 0;
protected $taskMap = []; // taskId => task
protected $taskQueue;
public function __construct() {
$this->taskQueue = new SplQueue();
}
// (使用下一个空闲的任务id)创建一个新任务,然后把这个任务放入任务map数组里. 接着它通过把任务放入任务队列里来实现对任务的调度. 接着run()方法扫描任务队列, 运行任务.如果一个任务结束了, 那么它将从队列里删除, 否则它将在队列的末尾再次被调度。
public function newTask(Generator $coroutine) {
$tid = ++$this->maxTaskId;
$task = new Task($tid, $coroutine);
$this->taskMap[$tid] = $task;
$this->schedule($task);
return $tid;
}
public function schedule(Task $task) {    
// 任务入队
$this->queue->enqueue($task);
}
public function run() {
while (!$this->queue->isEmpty()) {        
// 任务出队
$task = $this->queue->dequeue();
$task->run();
if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
}

队列可以使每个任务获得同等的 CPU 使用时间,

Demo

function task1() {
for ($i = 1; $i <= 10; ++$i) {
echo "This is task 1 iteration $i.\n";
yield;
}
}
function task2() {
for ($i = 1; $i <= 5; ++$i){
echo "This is task 2 iteration $i.\n";
yield;
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task1());
$scheduler->newTask(task2());
$scheduler->run();

输出:

This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.

结果正是我们期待的,最初的 5 次迭代,两个任务是交替进行的,而在第二个任务结束后,只有第一个任务继续执行到结束。

协程非阻塞 IO

若想真正的发挥出协程的作用,那一定是在一些涉及到阻塞 IO 的场景,我们都知道 Web 服务器最耗时的部分通常都是 socket 读取数据等操作上,如果进程对每个请求都挂起的等待 IO 操作,那处理效率就太低了,接下来我们看个支持非阻塞 IO 的 Scheduler:

<?php
class Scheduler
{
protected $maxTaskId = 0;
protected $tasks = []; // taskId => task
protected $queue;
// resourceID => [socket, tasks]
protected $waitingForRead = [];
protected $waitingForWrite = [];
public function __construct() {
// SPL 队列
$this->queue = new SplQueue();
}
public function newTask(Generator $coroutine) {
$tid = ++$this->maxTaskId;
$task = new Task($tid, $coroutine);
$this->tasks[$tid] = $task;
$this->schedule($task);
return $tid;
}
public function schedule(Task $task) {    
// 任务入队
$this->queue->enqueue($task);
}
public function run() {
while (!$this->queue->isEmpty()) {        
// 任务出队
$task = $this->queue->dequeue();
$task->run();
if ($task->isFinished()) {
unset($this->tasks[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
public function waitForRead($socket, Task $task)
{
if (isset($this->waitingForRead[(int)$socket])) {
$this->waitingForRead[(int)$socket][1][] = $task;
} else {
$this->waitingForRead[(int)$socket] = [$socket, [$task]];
}
}
public function waitForWrite($socket, Task $task)
{
if (isset($this->waitingForWrite[(int)$socket])) {
$this->waitingForWrite[(int)$socket][1][] = $task;
} else {
$this->waitingForWrite[(int)$socket] = [$socket, [$task]];
}
}
/**
* @param $timeout 0 represent
*/
protected function ioPoll($timeout)
{
$rSocks = [];
foreach ($this->waitingForRead as list($socket)) {
$rSocks[] = $socket;
}
$wSocks = [];
foreach ($this->waitingForWrite as list($socket)) {
$wSocks[] = $socket;
}
$eSocks = [];
// $timeout 为 0 时, stream_select 为立即返回,为 null 时则会阻塞的等,见 http://php.net/manual/zh/function.stream-select.php
if (!@stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
return;
}
foreach ($rSocks as $socket) {
list(, $tasks) = $this->waitingForRead[(int)$socket];
unset($this->waitingForRead[(int)$socket]);
foreach ($tasks as $task) {
$this->schedule($task);
}
}
foreach ($wSocks as $socket) {
list(, $tasks) = $this->waitingForWrite[(int)$socket];
unset($this->waitingForWrite[(int)$socket]);
foreach ($tasks as $task) {
$this->schedule($task);
}
}
}
/**
* 检查队列是否为空,若为空则挂起的执行 stream_select,否则检查完 IO 状态立即返回,详见 ioPoll()
* 作为任务加入队列后,由于 while true,会被一直重复的加入任务队列,实现每次任务前检查 IO 状态
* @return Generator object for newTask
*
*/
protected function ioPollTask()
{
while (true) {
if ($this->taskQueue->isEmpty()) {
$this->ioPoll(null);
} else {
$this->ioPoll(0);
}
yield;
}
}
/**
* $scheduler = new Scheduler;
* $scheduler->newTask(Web Server Generator);
* $scheduler->withIoPoll()->run();
*
* 新建 Web Server 任务后先执行 withIoPoll() 将 ioPollTask() 作为任务入队
*
* @return $this
*/
public function withIoPoll()
{
$this->newTask($this->ioPollTask());
return $this;
}
}

这个版本的 Scheduler 里加入一个永不退出的任务,并且通过 stream_select 支持的特性来实现快速的来回检查各个任务的 IO 状态,只有 IO 完成的任务才会继续执行,而 IO 还未完成的任务则会跳过,完整的代码和例子可以戳这里。

也就是说任务交替执行的过程中,一旦遇到需要 IO 的部分,调度器就会把 CPU 时间分配给不需要 IO 的任务,等到当前任务遇到 IO 或者之前的任务 IO 结束才再次调度 CPU 时间,以此实现 CPU 和 IO 并行来提升执行效率,类似下图:

php協成實現的詳解(附程式碼)

单任务改造

如果想将一个单进程任务改造成并发执行,我们可以选择改造成多进程或者协程:

多进程,不改变任务执行的整体过程,在一个时间段内同时执行多个相同的代码段,调度权在 CPU,如果一个任务能独占一个 CPU 则可以实现并行。

协程,把原有任务拆分成多个小任务,原有任务的执行流程被改变,调度权在进程自己,如果有 IO 并且可以实现异步,则可以实现并行。

多进程改造

php協成實現的詳解(附程式碼)

协程改造

php協成實現的詳解(附程式碼)

協程(Coroutines)和Go 協程(Goroutines)

PHP 的協程或在其他語言中,例如Python、Lua 等都有協程的概念,和Go 協程有些相似,不過有兩點不同:

Go 協程意味著並行(或者可以以並行的方式部署,可以用runtime .GOMAXPROCS() 指定可同時使用的CPU 個數),協程一般來說只是並發。

Go 協程透過通道 channel 來通訊;協程透過 yield 讓出和恢復操作來通訊。

Go 協程比普通協程更強大,也很容易從協程的邏輯復用到Go 協程,而且在Go 的開發中也使用的極為普遍,有興趣的話可以了解一下作為對比。

結束

個人感覺PHP 的協程在實際使用中想要徒手實現和應用並不方便而且場景有限,但了解其概念及實現原理對更好的理解並發不無裨益。

如果想更多的了解協程的實際應用場景不妨試試已經大名鼎鼎的Swoole,其對多種協議的client 做了底層的協程封裝,幾乎可以做到以同步編程的寫法實現協程異步IO 的效果。

以上是php協成實現的詳解(附程式碼)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述
本文轉載於:segmentfault思否。如有侵權,請聯絡admin@php.cn刪除
繼續使用PHP:耐力的原因繼續使用PHP:耐力的原因Apr 19, 2025 am 12:23 AM

PHP仍然流行的原因是其易用性、靈活性和強大的生態系統。 1)易用性和簡單語法使其成為初學者的首選。 2)與web開發緊密結合,處理HTTP請求和數據庫交互出色。 3)龐大的生態系統提供了豐富的工具和庫。 4)活躍的社區和開源性質使其適應新需求和技術趨勢。

PHP和Python:探索他們的相似性和差異PHP和Python:探索他們的相似性和差異Apr 19, 2025 am 12:21 AM

PHP和Python都是高層次的編程語言,廣泛應用於Web開發、數據處理和自動化任務。 1.PHP常用於構建動態網站和內容管理系統,而Python常用於構建Web框架和數據科學。 2.PHP使用echo輸出內容,Python使用print。 3.兩者都支持面向對象編程,但語法和關鍵字不同。 4.PHP支持弱類型轉換,Python則更嚴格。 5.PHP性能優化包括使用OPcache和異步編程,Python則使用cProfile和異步編程。

PHP和Python:解釋了不同的範例PHP和Python:解釋了不同的範例Apr 18, 2025 am 12:26 AM

PHP主要是過程式編程,但也支持面向對象編程(OOP);Python支持多種範式,包括OOP、函數式和過程式編程。 PHP適合web開發,Python適用於多種應用,如數據分析和機器學習。

PHP和Python:深入了解他們的歷史PHP和Python:深入了解他們的歷史Apr 18, 2025 am 12:25 AM

PHP起源於1994年,由RasmusLerdorf開發,最初用於跟踪網站訪問者,逐漸演變為服務器端腳本語言,廣泛應用於網頁開發。 Python由GuidovanRossum於1980年代末開發,1991年首次發布,強調代碼可讀性和簡潔性,適用於科學計算、數據分析等領域。

在PHP和Python之間進行選擇:指南在PHP和Python之間進行選擇:指南Apr 18, 2025 am 12:24 AM

PHP適合網頁開發和快速原型開發,Python適用於數據科學和機器學習。 1.PHP用於動態網頁開發,語法簡單,適合快速開發。 2.Python語法簡潔,適用於多領域,庫生態系統強大。

PHP和框架:現代化語言PHP和框架:現代化語言Apr 18, 2025 am 12:14 AM

PHP在現代化進程中仍然重要,因為它支持大量網站和應用,並通過框架適應開發需求。 1.PHP7提升了性能並引入了新功能。 2.現代框架如Laravel、Symfony和CodeIgniter簡化開發,提高代碼質量。 3.性能優化和最佳實踐進一步提升應用效率。

PHP的影響:網絡開發及以後PHP的影響:網絡開發及以後Apr 18, 2025 am 12:10 AM

PHPhassignificantlyimpactedwebdevelopmentandextendsbeyondit.1)ItpowersmajorplatformslikeWordPressandexcelsindatabaseinteractions.2)PHP'sadaptabilityallowsittoscaleforlargeapplicationsusingframeworkslikeLaravel.3)Beyondweb,PHPisusedincommand-linescrip

PHP類型提示如何起作用,包括標量類型,返回類型,聯合類型和無效類型?PHP類型提示如何起作用,包括標量類型,返回類型,聯合類型和無效類型?Apr 17, 2025 am 12:25 AM

PHP類型提示提升代碼質量和可讀性。 1)標量類型提示:自PHP7.0起,允許在函數參數中指定基本數據類型,如int、float等。 2)返回類型提示:確保函數返回值類型的一致性。 3)聯合類型提示:自PHP8.0起,允許在函數參數或返回值中指定多個類型。 4)可空類型提示:允許包含null值,處理可能返回空值的函數。

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱工具

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

MinGW - Minimalist GNU for Windows

MinGW - Minimalist GNU for Windows

這個專案正在遷移到osdn.net/projects/mingw的過程中,你可以繼續在那裡關注我們。 MinGW:GNU編譯器集合(GCC)的本機Windows移植版本,可自由分發的導入函式庫和用於建置本機Windows應用程式的頭檔;包括對MSVC執行時間的擴展,以支援C99功能。 MinGW的所有軟體都可以在64位元Windows平台上運作。

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

mPDF

mPDF

mPDF是一個PHP庫,可以從UTF-8編碼的HTML產生PDF檔案。原作者Ian Back編寫mPDF以從他的網站上「即時」輸出PDF文件,並處理不同的語言。與原始腳本如HTML2FPDF相比,它的速度較慢,並且在使用Unicode字體時產生的檔案較大,但支援CSS樣式等,並進行了大量增強。支援幾乎所有語言,包括RTL(阿拉伯語和希伯來語)和CJK(中日韓)。支援嵌套的區塊級元素(如P、DIV),

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境