ホームページ  >  記事  >  バックエンド開発  >  PHP はコルーチンを使用してマルチタスク スケジューリング方法の例を実装します

PHP はコルーチンを使用してマルチタスク スケジューリング方法の例を実装します

小云云
小云云オリジナル
2018-03-30 13:48:141824ブラウズ

PHP5.5 の優れた新機能の 1 つは、反復ジェネレーターとコルーチンのサポートの追加です。ジェネレーターについては、PHP のドキュメントやその他のさまざまなブログ投稿で詳しく説明されています。コルーチンは非常に強力な機能を備えているものの、比較的複雑で理解しにくく、説明するのも難しいため、これまであまり注目されていませんでした。

この記事では、コルーチンを使用してタスクのスケジュールを実装する方法を紹介することで、PHP のコルーチンについて説明します。

最初の 3 つのセクションで背景を簡単に説明します。すでに十分な基礎ができている場合は、「共同マルチタスク」セクションに直接ジャンプできます。

反復ジェネレーター

ジェネレーターも関数です。違いは、この関数の戻り値が単一の値を返すのではなく、順次出力されることです。言い換えれば、ジェネレーターを使用すると、反復子インターフェースの実装が容易になります。以下は、xrange 関数を実装して簡単に説明します。

<?phpfunction xrange($start, $end, $step = 1) {  for ($i = $start; $i <= $end; $i += $step) {
    yield $i;
  }
}foreach (xrange(1, 1000000) as $num) {  echo $num, "\n";
}

上記の xrange() 関数は、PHP の組み込み関数 range() と同じ機能を提供します。ただし、異なる点は、 range() 関数が 1 ~ 100 万のグループ値を含む配列を返すことです (注: マニュアルを確認してください)。 xrange() 関数は、これらの値を順番に出力する反復子を返しますが、実際には配列の形式で返しません。このメソッドの利点は明らかです。これにより、大規模なデータのコレクションを一度にメモリにロードせずに処理できます。無限大のデータ ストリームを処理することもできます。

もちろん、この関数はジェネレーターを介さずに、Iteratorインターフェースを継承して実装することもできます。ただし、ジェネレーターを使用して実装する方が便利であり、イテレーター インターフェイスで 5 つのメソッドを実装する必要はありません。

ジェネレーターは割り込み可能な関数です

ジェネレーターからコルーチンを理解するには、コルーチンが内部でどのように動作するかを理解することが非常に重要です。ジェネレーターは割り込み可能な関数であり、yield が割り込みポイントを構成します。

上記の例に従って、xrange(1,1000000) を呼び出した場合、xrange() 関数のコードは実際には実行されません。代わりに、PHP はイテレーター インターフェイスを実装するジェネレーター クラスのインスタンスを返すだけです:

0fef3c088baae46583b051349abcd327

オブジェクトに対してイテレーター メソッドを 1 回呼び出すと、その中のコードが 1 回実行されます。たとえば、$range->rewind() を呼び出すと、xrange() のコードは制御フロー内の最初の yield まで実行されます。関数内の yield ステートメントに渡される戻り値は、$range->current() を通じて取得できます。

ジェネレーターでコードの実行を続けるには、$range->next() メソッドを呼び出す必要があります。これにより、次の yield ステートメントが発生するまでジェネレーターが再び開始されます。したがって、 next() メソッドと current() メソッドを連続して呼び出すと、yield ステートメントが表示されなくなるまで、ジェネレーターからすべての値が取得されます。 xrange() の場合、$i が $end を超えるとこの状況が発生します。この場合、制御フローは関数の最後に到達するため、コードは実行されません。これが発生すると、 void() メソッドは false を返し、反復は終了します。

コルーチン

コルーチンによって上記の関数に追加された主な機能は、データをジェネレーターに送り返す機能です (呼び出し元は呼び出されたジェネレーター関数にデータを送信します)。これにより、生成側から呼び出し側への一方向通信が、両者の間の双方向通信に変わります。

ジェネレーターの send() メソッドを呼び出して、データをコルーチンに渡すことができます。次の logger() コルーチンは、この通信がどのように機能するかを示す例です:

<?phpfunction logger($fileName) {  $fileHandle = fopen($fileName, &#39;a&#39;);  while (true) {    fwrite($fileHandle, yield . "\n");
  }
}$logger = logger(__DIR__ . &#39;/log&#39;);$logger->send(&#39;Foo&#39;);$logger->send(&#39;Bar&#39;)?>

ご覧のとおり、ここでは yield がステートメントとしてではなく、式として使用されています。つまり、 value に展開できます。この値は、呼び出し元によって send() メソッドに渡される値です。 この例では、yield 式は最初に「Foo」の代わりに Log に書き込まれ、次に「Bar」の代わりに Log に書き込まれます。

上記の例では、yield は受信者のみです。しかし、実際には受信と送信の両方が可能です。通信の送受信の仕組みの例は次のとおりです:

<?phpfunction gen() {  $ret = (yield &#39;yield1&#39;);  var_dump($ret);  $ret = (yield &#39;yield2&#39;);  var_dump($ret);
}$gen = gen();var_dump($gen->current());    // string(6) "yield1"var_dump($gen->send(&#39;ret1&#39;)); // string(4) "ret1"   (the first var_dump in gen)
                // string(6) "yield2" (the var_dump of the ->send() return value)var_dump($gen->send(&#39;ret2&#39;)); // string(4) "ret2"   (again from within gen)
                // NULL               (the return value of ->send())?>

出力の正確なシーケンスをすぐに理解するのは少し難しいため、なぜそのように出力されるのかを必ず理解してください。特に指摘したい点が 2 つあります:

1 つ目は、yield 式の両側の括弧は、PHP7 より前ではオプションではありませんでした。つまり、PHP5.5 と PHP5.6 では括弧が必要になります。

2 番目のポイントは、current() を呼び出す前に rewind() が呼び出されないことに気づいたかもしれません。これは、反復オブジェクトの生成時に巻き戻し操作が暗黙的に実行されるためです。

マルチタスクコラボレーション

上記の logger() の例を読んだ方は、「なぜ双方向通信にコルーチンを使用する必要があるのですか? 同じ機能を実現するために process メソッドを完全に使用できるのですか?」と疑問に思うかもしれません。はい、その通りですが、上記の例は基本的な使用法を示すだけであり、コルーチンを使用する利点を実際には示していません。

上記の紹介で述べたように、コルーチンは非常に強力な概念ですが、使用されることはほとんどなく、多くの場合非常に複雑です。簡単で実際的な例をいくつか挙げるのは困難です。

在这篇文章里,我决定去做的是使用协程实现多任务协作。我们要解决的问题是你想并发地运行多任务(或者“程序”)。不过我们都知道CPU在一个时刻只能运行一个任务(不考虑多核的情况)。因此处理器需要在不同的任务之间进行切换,而且总是让每个任务运行 “一小会儿”。

多任务协作这个术语中的“协作”说明了如何进行这种切换的:它要求当前正在运行的任务自动把控制传回给调度器,这样就可以运行其他任务了。这与“抢占”多任务相反,抢占多任务是这样的:调度器可以中断运行了一段时间的任务,不管它喜欢还是不喜欢。协作多任务在Windows的早期版本(windows95)和Mac OS中有使用,不过它们后来都切换到使用抢先多任务了。理由相当明确:如果你依靠程序自动交出控制的话,那么一些设计有问题的软件将很容易为自身占用整个CPU,不与其他任务共享。

现在你应当明白协程和任务调度之间的联系:yield指令提供了任务中断自身的一种方法,然后把控制交回给任务调度器。因此协程可以运行多个其他任务。更进一步来说,yield可以用来在任务和调度器之间进行通信。

我们的目的是 对 “任务”用更轻量级的包装的协程函数:

<?phpclass Task {  protected $taskId;  protected $coroutine;  protected $sendValue = null;  protected $beforeFirstYield = true;  public function __construct($taskId, Generator $coroutine) {    $this->taskId = $taskId;    $this->coroutine = $coroutine;
  }  public function getTaskId() {    return $this->taskId;
  }  public function setSendValue($sendValue) {    $this->sendValue = $sendValue;
  }  public function run() {    if ($this->beforeFirstYield) {      $this->beforeFirstYield = false;      return $this->coroutine->current();
    } else {      $retval = $this->coroutine->send($this->sendValue);      $this->sendValue = null;      return $retval;
    }
  }  public function isFinished() {    return !$this->coroutine->valid();
  }
}

一个任务是用任务ID标记的一个协程。使用setSendValue()方法,你可以指定哪些值将被发送到下次的恢复(在之后你会了解到我们需要这个)。 run()函数确实没有做什么,除了调用send()方法的协同程序。要理解为什么添加beforeFirstYieldflag,需要考虑下面的代码片段:

<?phpfunction gen() {
  yield &#39;foo&#39;;
  yield &#39;bar&#39;;
}$gen = gen();var_dump($gen->send(&#39;something&#39;));// As the send() happens before the first yield there is an implicit rewind() call,
// so what really happens is this:$gen->rewind();var_dump($gen->send(&#39;something&#39;));// The rewind() will advance to the first yield (and ignore its value), the send() will
// advance to the second yield (and dump its value). Thus we loose the first yielded value!

通过添加 beforeFirstYieldcondition 我们可以确定 first yield 的值 被返回。

调度器现在不得不比多任务循环要做稍微多点了,然后才运行多任务:

<?phpclass Scheduler {  protected $maxTaskId = 0;  protected $taskMap = []; // taskId => task
  protected $taskQueue;  public function __construct() {    $this->taskQueue = new SplQueue();
  }  public function newTask(Generator $coroutine) {    $tid = ++$this->maxTaskId;    $task = new Task($tid, $coroutine);    $this->taskMap[$tid] = $task;    $this->schedule($task);    return $tid;
  }  public function schedule(Task $task) {    $this->taskQueue->enqueue($task);
  }  public function run() {    while (!$this->taskQueue->isEmpty()) {      $task = $this->taskQueue->dequeue();      $task->run();      if ($task->isFinished()) {        unset($this->taskMap[$task->getTaskId()]);
      } else {        $this->schedule($task);
      }
    }
  }
}?>

newTask()方法(使用下一个空闲的任务id)创建一个新任务,然后把这个任务放入任务map数组里。接着它通过把任务放入任务队列里来实现对任务的调度。接着run()方法扫描任务队列,运行任务。如果一个任务结束了,那么它将从队列里删除,否则它将在队列的末尾再次被调度。

让我们看看下面具有两个简单(并且没有什么意义)任务的调度器:

<?phpfunction task1() {  for ($i = 1; $i <= 10; ++$i) {    echo "This is task 1 iteration $i.\n";
    yield;
  }
}function task2() {  for ($i = 1; $i <= 5; ++$i) {    echo "This is task 2 iteration $i.\n";
    yield;
  }
}$scheduler = new Scheduler;$scheduler->newTask(task1());$scheduler->newTask(task2());$scheduler->run();

两个任务都仅仅回显一条信息,然后使用yield把控制回传给调度器。输出结果如下:

This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.

输出确实如我们所期望的:对前五个迭代来说,两个任务是交替运行的,接着第二个任务结束后,只有第一个任务继续运行。

与调度器之间通信

既然调度器已经运行了,那么我们来看下一项:任务和调度器之间的通信。

我们将使用进程用来和操作系统会话的同样的方式来通信:系统调用。我们需要系统调用的理由是操作系统与进程相比它处在不同的权限级别上。因此为了执行特权级别的操作(如杀死另一个进程),就不得不以某种方式把控制传回给内核,这样内核就可以执行所说的操作了。再说一遍,这种行为在内部是通过使用中断指令来实现的。过去使用的是通用的int指令,如今使用的是更特殊并且更快速的syscall/sysenter指令。

我们的任务调度系统将反映这种设计:不是简单地把调度器传递给任务(这样就允许它做它想做的任何事),我们将通过给yield表达式传递信息来与系统调用通信。这儿yield即是中断,也是传递信息给调度器(和从调度器传递出信息)的方法。

为了说明系统调用,我将对可调用的系统调用做一个小小的封装:

<?phpclass SystemCall {  protected $callback;  public function __construct(callable $callback) {    $this->callback = $callback;
  }  public function __invoke(Task $task, Scheduler $scheduler) {    $callback = $this->callback; // Can&#39;t call it directly in PHP :/
    return $callback($task, $scheduler);
  }
}

它将像其他任何可调用那样(使用_invoke)运行,不过它要求调度器把正在调用的任务和自身传递给这个函数。为了解决这个问题我们不得不微微的修改调度器的run方法:

<?phppublic function run() {  while (!$this->taskQueue->isEmpty()) {    $task = $this->taskQueue->dequeue();    $retval = $task->run();    if ($retval instanceof SystemCall) {      $retval($task, $this);      continue;
    }    if ($task->isFinished()) {      unset($this->taskMap[$task->getTaskId()]);
    } else {      $this->schedule($task);
    }
  }
}

第一个系统调用除了返回任务ID外什么都没有做:

<?phpfunction getTaskId() {    return new SystemCall(function(Task $task, Scheduler $scheduler) {        $task->setSendValue($task->getTaskId());        $scheduler->schedule($task);
    });
}

这个函数设置任务id为下一次发送的值,并再次调度了这个任务。由于使用了系统调用,所以调度器不能自动调用任务,我们需要手工调度任务(稍后你将明白为什么这么做)。要使用这个新的系统调用的话,我们要重新编写以前的例子:

<?phpfunction task($max) {  $tid = (yield getTaskId()); // <-- here&#39;s the syscall!
  for ($i = 1; $i <= $max; ++$i) {    echo "This is task $tid iteration $i.\n";
    yield;
  }
}$scheduler = new Scheduler;$scheduler->newTask(task(10));$scheduler->newTask(task(5));$scheduler->run();

这段代码将给出与前一个例子相同的输出。注意系统调用同其他任何调用一样正常地运行,不过预先增加了yield。要创建新的任务,然后再杀死它们的话,需要两个以上的系统调用:

<?phpfunction newTask(Generator $coroutine) {  return new SystemCall(    function(Task $task, Scheduler $scheduler) use ($coroutine) {      $task->setSendValue($scheduler->newTask($coroutine));      $scheduler->schedule($task);
    }
  );
}function killTask($tid) {  return new SystemCall(    function(Task $task, Scheduler $scheduler) use ($tid) {      $task->setSendValue($scheduler->killTask($tid));      $scheduler->schedule($task);
    }
  );
}

killTask函数需要在调度器里增加一个方法:

<?phppublic function killTask($tid) {  if (!isset($this->taskMap[$tid])) {    return false;
  }  unset($this->taskMap[$tid]);  // This is a bit ugly and could be optimized so it does not have to walk the queue,
  // but assuming that killing tasks is rather rare I won&#39;t bother with it now
  foreach ($this->taskQueue as $i => $task) {    if ($task->getTaskId() === $tid) {      unset($this->taskQueue[$i]);      break;
    }
  }  return true;
}

用来测试新功能的微脚本:

<?phpfunction childTask() {  $tid = (yield getTaskId());  while (true) {    echo "Child task $tid still alive!\n";
    yield;
  }
}function task() {  $tid = (yield getTaskId());  $childTid = (yield newTask(childTask()));  for ($i = 1; $i <= 6; ++$i) {    echo "Parent task $tid iteration $i.\n";
    yield;    if ($i == 3) yield killTask($childTid);
  }
}$scheduler = new Scheduler;$scheduler->newTask(task());$scheduler->run();?>

这段代码将打印以下信息:

Parent task 1 iteration 1.
Child task 2 still alive!Parent task 1 iteration 2.
Child task 2 still alive!Parent task 1 iteration 3.
Child task 2 still alive!Parent task 1 iteration 4.Parent task 1 iteration 5.Parent task 1 iteration 6.

经过三次迭代以后子任务将被杀死,因此这就是”Child is still alive”消息结束的时候。可能应当指出的是这不是真正的父子关系。 因为甚至在父任务结束后子任务仍然可以运行。或者子任务可以杀死父任务。可以修改调度器使它具有更层级化的任务结构,不过 在这篇文章里我没有这么做。

你可以实现许多进程管理调用。例如 wait(它一直等待到任务结束运行时),exec(它替代当前任务)和fork(它创建一个 当前任务的克隆)。fork非常酷,而且你可以使用PHP的协程真正地实现它,因为它们都支持克隆。

然而让我们把这些留给有兴趣的读者吧,我们来看下一个议题。

非阻塞IO

很明显,我们的任务管理系统的真正很酷的应用是web服务器。它有一个任务是在套接字上侦听是否有新连接,当有新连接要建立的时候 ,它创建一个新任务来处理新连接。

web服务器最难的部分通常是像读数据这样的套接字操作是阻塞的。例如PHP将等待到客户端完成发送为止。对一个WEB服务器来说,这 根本不行;这就意味着服务器在一个时间点上只能处理一个连接。

解决方案是确保在真正对套接字读写之前该套接字已经“准备就绪”。为了查找哪个套接字已经准备好读或者写了,可以使用 流选择函数。

首先,让我们添加两个新的 syscall,它们将等待直到指定 socket 准备好:

<?phpfunction waitForRead($socket) {  return new SystemCall(    function(Task $task, Scheduler $scheduler) use ($socket) {      $scheduler->waitForRead($socket, $task);
    }
  );
}function waitForWrite($socket) {  return new SystemCall(    function(Task $task, Scheduler $scheduler) use ($socket) {      $scheduler->waitForWrite($socket, $task);
    }
  );
}

这些 syscall 只是在调度器中代理其各自的方法:

<?php// resourceID => [socket, tasks]protected $waitingForRead = [];protected $waitingForWrite = [];public function waitForRead($socket, Task $task) {  if (isset($this->waitingForRead[(int) $socket])) {    $this->waitingForRead[(int) $socket][1][] = $task;
  } else {    $this->waitingForRead[(int) $socket] = [$socket, [$task]];
  }
}public function waitForWrite($socket, Task $task) {  if (isset($this->waitingForWrite[(int) $socket])) {    $this->waitingForWrite[(int) $socket][1][] = $task;
  } else {    $this->waitingForWrite[(int) $socket] = [$socket, [$task]];
  }
}

waitingForRead 及 waitingForWrite 属性是两个承载等待的socket 及等待它们的任务的数组。有趣的部分在于下面的方法,它将检查 socket 是否可用,并重新安排各自任务:

<?phpprotected function ioPoll($timeout) {  $rSocks = [];  foreach ($this->waitingForRead as list($socket)) {    $rSocks[] = $socket;
  }  $wSocks = [];  foreach ($this->waitingForWrite as list($socket)) {    $wSocks[] = $socket;
  }  $eSocks = []; // dummy
  if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) {    return;
  }  foreach ($rSocks as $socket) {    list(, $tasks) = $this->waitingForRead[(int) $socket];    unset($this->waitingForRead[(int) $socket]);    foreach ($tasks as $task) {      $this->schedule($task);
    }
  }  foreach ($wSocks as $socket) {    list(, $tasks) = $this->waitingForWrite[(int) $socket];    unset($this->waitingForWrite[(int) $socket]);    foreach ($tasks as $task) {      $this->schedule($task);
    }
  }
}

stream_select 函数接受承载读取、写入以及待检查的socket的数组(我们无需考虑最后一类)。数组将按引用传递,函数只会保留那些状态改变了的数组元素。我们可以遍历这些数组,并重新安排与之相关的任务。

为了正常地执行上面的轮询动作,我们将在调度器里增加一个特殊的任务:

<?phpprotected function ioPollTask() {  while (true) {    if ($this->taskQueue->isEmpty()) {      $this->ioPoll(null);
    } else {      $this->ioPoll(0);
    }
    yield;
  }
}

需要在某个地方注册这个任务,例如,你可以在run()方法的开始增加$this->newTask($this->ioPollTask())。然后就像其他 任务一样每执行完整任务循环一次就执行轮询操作一次(这么做一定不是最好的方法)。ioPollTask将使用0秒的超时来调用ioPoll, 这意味着stream_select将立即返回(而不是等待)。

只有任务队列为空时,我们才使用null超时,这意味着它一直等到某个套接口准备就绪。如果我们没有这么做,那么轮询任务将一而再, 再而三的循环运行,直到有新的连接建立。这将导致100%的CPU利用率。相反,让操作系统做这种等待会更有效。

现在编写服务器就相对容易了:

<?phpfunction server($port) {  echo "Starting server at port $port...\n";  $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);  if (!$socket) throw new Exception($errStr, $errNo);  stream_set_blocking($socket, 0);  while (true) {
    yield waitForRead($socket);    $clientSocket = stream_socket_accept($socket, 0);
    yield newTask(handleClient($clientSocket));
  }
}function handleClient($socket) {
  yield waitForRead($socket);  $data = fread($socket, 8192);  $msg = "Received following request:\n\n$data";  $msgLength = strlen($msg);  $response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r$msgRES;
  yield waitForWrite($socket);  fwrite($socket, $response);  fclose($socket);
}$scheduler = new Scheduler;$scheduler->newTask(server(8000));$scheduler->run();

这段代码将接收到localhost:8000上的连接,然后仅仅返回发送来的内容作为HTTP响应。要做“实际”的事情的话就爱哪个非常复杂(处理 HTTP请求可能已经超出了这篇文章的范围)。上面的代码片段只是演示了一般性的概念。

你可以使用类似于ab -n 10000 -c 100 localhost:8000/这样命令来测试服务器。这条命令将向服务器发送10000个请求,并且其中100个请求将同时到达。使用这样的数目,我得到了处于中间的10毫秒的响应时间。不过还有一个问题:有少数几个请求真正处理的很慢(如5秒), 这就是为什么总吞吐量只有2000请求/秒(如果是10毫秒的响应时间的话,总的吞吐量应该更像是10000请求/秒

协程堆栈

如果你试图用我们的调度系统建立更大的系统的话,你将很快遇到问题:我们习惯了把代码分解为更小的函数,然后调用它们。然而, 如果使用了协程的话,就不能这么做了。例如,看下面代码:

<?phpfunction echoTimes($msg, $max) {  for ($i = 1; $i <= $max; ++$i) {    echo "$msg iteration $i\n";
    yield;
  }
}function task() {
  echoTimes(&#39;foo&#39;, 10); // print foo ten times
  echo "---\n";
  echoTimes(&#39;bar&#39;, 5); // print bar five times
  yield; // force it to be a coroutine}$scheduler = new Scheduler;$scheduler->newTask(task());$scheduler->run();

这段代码试图把重复循环“输出n次“的代码嵌入到一个独立的协程里,然后从主任务里调用它。然而它无法运行。正如在这篇文章的开始 所提到的,调用生成器(或者协程)将没有真正地做任何事情,它仅仅返回一个对象。这也出现在上面的例子里。echoTimes调用除了放回一个(无用的)协程对象外不做任何事情。

为了仍然允许这么做,我们需要在这个裸协程上写一个小小的封装。我们将调用它:“协程堆栈”。因为它将管理嵌套的协程调用堆栈。 这将是通过生成协程来调用子协程成为可能:

$retval = (yield someCoroutine($foo, $bar));

使用yield,子协程也能再次返回值:

yield retval("I'm a return value!");

retval函数除了返回一个值的封装外没有做任何其他事情。这个封装将表示它是一个返回值。

<?phpclass CoroutineReturnValue {  protected $value;  public function __construct($value) {    $this->value = $value;
  }  public function getValue() {    return $this->value;
  }
}function retval($value) {  return new CoroutineReturnValue($value);
}

为了把协程转变为协程堆栈(它支持子调用),我们将不得不编写另外一个函数(很明显,它是另一个协程):

<?phpfunction stackedCoroutine(Generator $gen) {  $stack = new SplStack;  for (;;) {    $value = $gen->current();    if ($value instanceof Generator) {      $stack->push($gen);      $gen = $value;      continue;
    }    $isReturnValue = $value instanceof CoroutineReturnValue;    if (!$gen->valid() || $isReturnValue) {      if ($stack->isEmpty()) {        return;
      }      $gen = $stack->pop();      $gen->send($isReturnValue ? $value->getValue() : NULL);      continue;
    }    $gen->send(yield $gen->key() => $value);
  }
}

这个函数在调用者和当前正在运行的子协程之间扮演着简单代理的角色。在$gen->send(yield $gen->key()=>$value);这行完成了代理功能。另外它检查返回值是否是生成器,万一是生成器的话,它将开始运行这个生成器,并把前一个协程压入堆栈里。一旦它获得了CoroutineReturnValue的话,它将再次请求堆栈弹出,然后继续执行前一个协程。

为了使协程堆栈在任务里可用,任务构造器里的$this-coroutine =$coroutine;这行需要替代为$this->coroutine = StackedCoroutine($coroutine);。

现在我们可以稍微改进上面web服务器例子:把wait+read(和wait+write和warit+accept)这样的动作分组为函数。为了分组相关的 功能,我将使用下面类:

<?phpclass CoSocket {  protected $socket;  public function __construct($socket) {    $this->socket = $socket;
  }  public function accept() {
    yield waitForRead($this->socket);
    yield retval(new CoSocket(stream_socket_accept($this->socket, 0)));
  }  public function read($size) {
    yield waitForRead($this->socket);
    yield retval(fread($this->socket, $size));
  }  public function write($string) {
    yield waitForWrite($this->socket);    fwrite($this->socket, $string);
  }  public function close() {
    @fclose($this->socket);
  }
}

现在服务器可以编写的稍微简洁点了:

<?phpfunction server($port) {  echo "Starting server at port $port...\n";  $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);  if (!$socket) throw new Exception($errStr, $errNo);  stream_set_blocking($socket, 0);  $socket = new CoSocket($socket);  while (true) {
    yield newTask(
      handleClient(yield $socket->accept())
    );
  }
}function handleClient($socket) {  $data = (yield $socket->read(8192));  $msg = "Received following request:\n\n$data";  $msgLength = strlen($msg);  $response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r$msgRES;
  yield $socket->write($response);
  yield $socket->close();
}

错误处理

作为一个优秀的程序员,相信你已经察觉到上面的例子缺少错误处理。几乎所有的 socket 都是易出错的。我这样做的原因一方面固然是因为错误处理的乏味(特别是 socket!),另一方面也在于它很容易使代码体积膨胀。

不过,我仍然了一讲一下常见的协程错误处理:协程允许使用 throw() 方法在其内部抛出一个错误。尽管此方法还未在 PHP 中实现,但我很快就会提交它,就在今天。

throw() 方法接受一个 Exception,并将其抛出到协程的当前悬挂点,看看下面代码:

<?phpfunction gen() {  echo "Foo\n";  try {
    yield;
  } catch (Exception $e) {    echo "Exception: {$e->getMessage()}\n";
  }  echo "Bar\n";
}$gen = gen();$gen->rewind();                     // echos "Foo"$gen->throw(new Exception(&#39;Test&#39;)); // echos "Exception: Test"
                  // and "Bar"

这非常棒,因为我们可以使用系统调用以及子协程调用异常抛出。对与系统调用,Scheduler::run() 方法需要一些小调整:

<?phpif ($retval instanceof SystemCall) {  try {    $retval($task, $this);
  } catch (Exception $e) {    $task->setException($e);    $this->schedule($task);
  }  continue;
}

Task 类也许要添加 throw 调用处理:

<?phpclass Task {  // ...
  protected $exception = null;  public function setException($exception) {    $this->exception = $exception;
  }  public function run() {    if ($this->beforeFirstYield) {      $this->beforeFirstYield = false;      return $this->coroutine->current();
    } elseif ($this->exception) {      $retval = $this->coroutine->throw($this->exception);      $this->exception = null;      return $retval;
    } else {      $retval = $this->coroutine->send($this->sendValue);      $this->sendValue = null;      return $retval;
    }
  }  // ...}

现在,我们已经可以在系统调用中使用异常抛出了!例如,要调用 killTask,让我们在传递 ID 不可用时抛出一个异常:

<?phpfunction killTask($tid) {  return new SystemCall(    function(Task $task, Scheduler $scheduler) use ($tid) {      if ($scheduler->killTask($tid)) {        $scheduler->schedule($task);
      } else {        throw new InvalidArgumentException(&#39;Invalid task ID!&#39;);
      }
    }
  );
}

试试看:

<?phpfunction task() {    try {
        yield killTask(500);
    } catch (Exception $e) {        echo &#39;Tried to kill task 500 but failed: &#39;, $e->getMessage(), "\n";
    }
}

这些代码现在尚不能正常运作,因为 stackedCoroutine 函数无法正确处理异常。要修复需要做些调整:

<?phpfunction stackedCoroutine(Generator $gen) {  $stack = new SplStack;  $exception = null;  for (;;) {    try {      if ($exception) {        $gen->throw($exception);        $exception = null;        continue;
      }      $value = $gen->current();      if ($value instanceof Generator) {        $stack->push($gen);        $gen = $value;        continue;
      }      $isReturnValue = $value instanceof CoroutineReturnValue;      if (!$gen->valid() || $isReturnValue) {        if ($stack->isEmpty()) {          return;
        }        $gen = $stack->pop();        $gen->send($isReturnValue ? $value->getValue() : NULL);        continue;
      }      try {        $sendValue = (yield $gen->key() => $value);
      } catch (Exception $e) {        $gen->throw($e);        continue;
      }      $gen->send($sendValue);
    } catch (Exception $e) {      if ($stack->isEmpty()) {        throw $e;
      }      $gen = $stack->pop();      $exception = $e;
    }
  }
}

结束语

在这篇文章里,我使用多任务协作构建了一个任务调度器,其中包括执行“系统调用”,做非阻塞操作和处理错误。所有这些里真正很酷的事情是任务的结果代码看起来完全同步,甚至任务正在执行大量的异步操作的时候也是这样。如果你打算从套接口读取数据的话,你将不需要传递某个回调函数或者注册一个事件侦听器。相反,你只要书写yield $socket->read()。这儿大部分都是你常常也要编写的,只在它的前面增加yield。

当我第一次听到所有这一切的时候,我发现这个概念完全令人折服,而且正是这个激励我在PHP中实现了它。同时我发现协程真正令人心慌。在令人敬畏的代码和很大一堆代码之间只有单薄的一行,我认为协程正好处在这一行上。讲讲使用上面所述的方法书写异步代码是否真的有益对我来说很难。

以上がPHP はコルーチンを使用してマルチタスク スケジューリング方法の例を実装しますの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。