>  기사  >  백엔드 개발  >  PHP는 코루틴을 사용하여 다중 작업 예약 방법 예제를 구현합니다.

PHP는 코루틴을 사용하여 다중 작업 예약 방법 예제를 구현합니다.

小云云
小云云원래의
2018-03-30 13:48:141782검색

PHP5.5의 더 좋은 새로운 기능 중 하나는 반복 생성기 및 코루틴에 대한 지원이 추가된 것입니다. 생성기에 대해서는 PHP 문서와 기타 다양한 블로그 게시물에서 자세히 다루고 있습니다. 코루틴은 매우 강력한 기능을 가지고 있음에도 불구하고 상대적으로 복잡하고 이해하기 어렵고 설명하기 어렵기 때문에 상대적으로 덜 주목을 받았습니다.

이 글에서는 코루틴을 사용하여 작업 예약을 구현하는 방법을 소개하여 PHP의 코루틴을 설명하려고 합니다.

처음 세 부분에서는 간략한 배경 소개를 하겠습니다. 이미 기초가 탄탄하다면 "협업 ​​멀티태스킹" 섹션으로 바로 이동할 수 있습니다.

반복 생성기

생성기도 함수입니다. 차이점은 이 함수의 반환 값이 단일 값을 반환하는 것이 아니라 순차적으로 출력된다는 것입니다. 즉, 생성기를 사용하면 반복자 인터페이스를 더 쉽게 구현할 수 있습니다. 다음은 xrange 함수를 구현하여 간단하게 설명합니다.

<?phpfunction xrange($start, $end, $step = 1) {  for ($i = $start; $i <= $end; $i += $step) {
    yield $i;
  }
}foreach (xrange(1, 1000000) as $num) {  echo $num, "\n";
}

위의 xrange() 함수는 PHP 내장 함수 range()와 동일한 기능을 제공합니다. 하지만 차이점은 range() 함수가 100만부터 100만까지의 그룹 값을 포함하는 배열을 반환한다는 것입니다(참고: 매뉴얼을 확인하세요). xrange() 함수는 이러한 값을 순차적으로 출력하는 반복자를 반환하며, 실제로 배열 형태로 반환하지 않습니다. 이 방법의 장점은 분명합니다. 이를 통해 대용량 데이터 컬렉션을 메모리에 한꺼번에 로드하지 않고도 처리할 수 있습니다. 무한히 큰 데이터 스트림을 처리할 수도 있습니다.

물론 이 함수는 생성기를 통하지 않고 Iterator 인터페이스를 상속하여 구현할 수도 있습니다. 하지만 생성기를 사용하여 구현하는 것이 더 편리할 것이며, 반복자 인터페이스에서 5가지 메서드를 구현할 필요가 없습니다.

제너레이터는 인터럽트 가능한 함수입니다.

제너레이터의 코루틴을 이해하려면 코루틴이 내부적으로 어떻게 작동하는지 이해하는 것이 매우 중요합니다. 제너레이터는 인터럽트 가능한 함수이므로 수율이 인터럽트 지점을 구성합니다.

위의 예에 따라 xrange(1,1000000)을 호출하면 xrange() 함수의 코드가 실제로 실행되지 않습니다. 대신, PHP는 반복자 인터페이스를 구현하는 생성기 클래스의 인스턴스를 반환합니다.

0fef3c088baae46583b051349abcd327

객체에서 반복자 메서드를 한 번 호출하면 그 안의 코드가 한 번 실행됩니다. 예를 들어, $range->rewind()를 호출하면 xrange()의 코드는 제어 흐름에서 첫 번째 항복이 발생할 때까지 실행됩니다. 함수 내에서 Yield 문에 전달된 반환 값은 $range->current()를 통해 얻을 수 있습니다.

생성기에서 코드를 계속 실행하려면 $range->next() 메서드를 호출해야 합니다. 그러면 다음 항복 문이 발생할 때까지 생성기가 다시 시작됩니다. 따라서 next()와 current()를 연속적으로 호출하면 더 이상 Yield 문이 나타나지 않을 때까지 생성기에서 모든 값을 가져옵니다. xrange()의 경우 $i가 $end를 초과하면 이 상황이 발생합니다. 이 경우 제어 흐름은 함수의 끝에 도달하므로 코드가 실행되지 않습니다. 이런 일이 발생하면 void() 메서드는 false를 반환하고 반복이 종료됩니다.

코루틴

코루틴이 위 함수에 추가한 주요 기능은 데이터를 생성기로 다시 보내는 기능입니다(호출자는 호출된 생성기 함수로 데이터를 보냅니다). 이는 생성자에서 호출자까지의 단방향 통신을 둘 사이의 양방향 통신으로 전환합니다.

생성기의 send() 메서드를 호출하여 코루틴에 데이터를 전달할 수 있습니다. 다음 logger() 코루틴은 이 통신이 작동하는 방식에 대한 예입니다.

<?phpfunction logger($fileName) {  $fileHandle = fopen($fileName, &#39;a&#39;);  while (true) {    fwrite($fileHandle, yield . "\n");
  }
}$logger = logger(__DIR__ . &#39;/log&#39;);$logger->send(&#39;Foo&#39;);$logger->send(&#39;Bar&#39;)?>

보시다시피 여기서 Yield는 명령문으로 사용되지 않고 표현식으로 사용됩니다. 즉, 값으로 진화할 수 있습니다. 이 값은 호출자가 send() 메서드에 전달한 값입니다. 이 예에서 항복 표현식은 먼저 "Foo" 대신 Log에 기록된 다음 "Bar" 대신 Log에 기록됩니다.

위의 예에서 Yield는 수신자일 뿐입니다. 그러나 실제로는 수신 및 송신이 모두 가능합니다. 통신이 어떻게 수신되고 전송되는지에 대한 예는 다음과 같습니다.

<?phpfunction gen() {  $ret = (yield &#39;yield1&#39;);  var_dump($ret);  $ret = (yield &#39;yield2&#39;);  var_dump($ret);
}$gen = gen();var_dump($gen->current());    // string(6) "yield1"var_dump($gen->send(&#39;ret1&#39;)); // string(4) "ret1"   (the first var_dump in gen)
                // string(6) "yield2" (the var_dump of the ->send() return value)var_dump($gen->send(&#39;ret2&#39;)); // string(4) "ret2"   (again from within gen)
                // NULL               (the return value of ->send())?>

정확한 출력 순서를 바로 이해하기는 다소 어려우므로 왜 이렇게 출력되는지 꼭 알아두세요. 특히 지적하고 싶은 두 가지 사항이 있습니다.

첫 번째 요점은 PHP7 이전에는 항복 표현식 양쪽의 괄호가 선택 사항이 아니었기 때문에 PHP5.5 및 PHP5.6에서는 괄호가 필수라는 의미입니다.

두 번째로, current()를 호출하기 전에 rewind()가 호출되지 않는다는 점을 눈치챘을 것입니다. 반복 객체를 생성할 때 되감기 작업이 암시적으로 수행되기 때문입니다.

다중 작업 협업

위의 logger() 예제를 읽으면 "왜 양방향 통신을 위해 코루틴을 사용해야 하는가? 동일한 기능을 수행하기 위해 프로세스 메서드를 완전히 사용할 수 있지?"라고 궁금해하실 수 있습니다. 예, 맞습니다. 하지만 위의 예는 단지 기본적인 사용법을 보여주기 위한 것입니다. 이 예는 코루틴 사용의 이점을 실제로 보여주지는 않습니다.

위의 소개에서 언급했듯이 코루틴은 매우 강력한 개념이지만 거의 사용되지 않으며 종종 매우 복잡합니다. 몇 가지 간단하고 실제적인 예를 제시하는 것은 어렵습니다.

在这篇文章里,我决定去做的是使用协程实现多任务协作。我们要解决的问题是你想并发地运行多任务(或者“程序”)。不过我们都知道CPU在一个时刻只能运行一个任务(不考虑多核的情况)。因此处理器需要在不同的任务之间进行切换,而且总是让每个任务运行 “一小会儿”。

多任务协作这个术语中的“协作”说明了如何进行这种切换的:它要求当前正在运行的任务自动把控制传回给调度器,这样就可以运行其他任务了。这与“抢占”多任务相反,抢占多任务是这样的:调度器可以中断运行了一段时间的任务,不管它喜欢还是不喜欢。协作多任务在Windows的早期版本(windows95)和Mac OS中有使用,不过它们后来都切换到使用抢先多任务了。理由相当明确:如果你依靠程序自动交出控制的话,那么一些设计有问题的软件将很容易为自身占用整个CPU,不与其他任务共享。

现在你应当明白协程和任务调度之间的联系:yield指令提供了任务中断自身的一种方法,然后把控制交回给任务调度器。因此协程可以运行多个其他任务。更进一步来说,yield可以用来在任务和调度器之间进行通信。

我们的目的是 对 “任务”用更轻量级的包装的协程函数:

<?phpclass Task {  protected $taskId;  protected $coroutine;  protected $sendValue = null;  protected $beforeFirstYield = true;  public function __construct($taskId, Generator $coroutine) {    $this->taskId = $taskId;    $this->coroutine = $coroutine;
  }  public function getTaskId() {    return $this->taskId;
  }  public function setSendValue($sendValue) {    $this->sendValue = $sendValue;
  }  public function run() {    if ($this->beforeFirstYield) {      $this->beforeFirstYield = false;      return $this->coroutine->current();
    } else {      $retval = $this->coroutine->send($this->sendValue);      $this->sendValue = null;      return $retval;
    }
  }  public function isFinished() {    return !$this->coroutine->valid();
  }
}

一个任务是用任务ID标记的一个协程。使用setSendValue()方法,你可以指定哪些值将被发送到下次的恢复(在之后你会了解到我们需要这个)。 run()函数确实没有做什么,除了调用send()方法的协同程序。要理解为什么添加beforeFirstYieldflag,需要考虑下面的代码片段:

<?phpfunction gen() {
  yield &#39;foo&#39;;
  yield &#39;bar&#39;;
}$gen = gen();var_dump($gen->send(&#39;something&#39;));// As the send() happens before the first yield there is an implicit rewind() call,
// so what really happens is this:$gen->rewind();var_dump($gen->send(&#39;something&#39;));// The rewind() will advance to the first yield (and ignore its value), the send() will
// advance to the second yield (and dump its value). Thus we loose the first yielded value!

通过添加 beforeFirstYieldcondition 我们可以确定 first yield 的值 被返回。

调度器现在不得不比多任务循环要做稍微多点了,然后才运行多任务:

<?phpclass Scheduler {  protected $maxTaskId = 0;  protected $taskMap = []; // taskId => task
  protected $taskQueue;  public function __construct() {    $this->taskQueue = new SplQueue();
  }  public function newTask(Generator $coroutine) {    $tid = ++$this->maxTaskId;    $task = new Task($tid, $coroutine);    $this->taskMap[$tid] = $task;    $this->schedule($task);    return $tid;
  }  public function schedule(Task $task) {    $this->taskQueue->enqueue($task);
  }  public function run() {    while (!$this->taskQueue->isEmpty()) {      $task = $this->taskQueue->dequeue();      $task->run();      if ($task->isFinished()) {        unset($this->taskMap[$task->getTaskId()]);
      } else {        $this->schedule($task);
      }
    }
  }
}?>

newTask()方法(使用下一个空闲的任务id)创建一个新任务,然后把这个任务放入任务map数组里。接着它通过把任务放入任务队列里来实现对任务的调度。接着run()方法扫描任务队列,运行任务。如果一个任务结束了,那么它将从队列里删除,否则它将在队列的末尾再次被调度。

让我们看看下面具有两个简单(并且没有什么意义)任务的调度器:

<?phpfunction task1() {  for ($i = 1; $i <= 10; ++$i) {    echo "This is task 1 iteration $i.\n";
    yield;
  }
}function task2() {  for ($i = 1; $i <= 5; ++$i) {    echo "This is task 2 iteration $i.\n";
    yield;
  }
}$scheduler = new Scheduler;$scheduler->newTask(task1());$scheduler->newTask(task2());$scheduler->run();

两个任务都仅仅回显一条信息,然后使用yield把控制回传给调度器。输出结果如下:

This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.

输出确实如我们所期望的:对前五个迭代来说,两个任务是交替运行的,接着第二个任务结束后,只有第一个任务继续运行。

与调度器之间通信

既然调度器已经运行了,那么我们来看下一项:任务和调度器之间的通信。

我们将使用进程用来和操作系统会话的同样的方式来通信:系统调用。我们需要系统调用的理由是操作系统与进程相比它处在不同的权限级别上。因此为了执行特权级别的操作(如杀死另一个进程),就不得不以某种方式把控制传回给内核,这样内核就可以执行所说的操作了。再说一遍,这种行为在内部是通过使用中断指令来实现的。过去使用的是通用的int指令,如今使用的是更特殊并且更快速的syscall/sysenter指令。

我们的任务调度系统将反映这种设计:不是简单地把调度器传递给任务(这样就允许它做它想做的任何事),我们将通过给yield表达式传递信息来与系统调用通信。这儿yield即是中断,也是传递信息给调度器(和从调度器传递出信息)的方法。

为了说明系统调用,我将对可调用的系统调用做一个小小的封装:

<?phpclass SystemCall {  protected $callback;  public function __construct(callable $callback) {    $this->callback = $callback;
  }  public function __invoke(Task $task, Scheduler $scheduler) {    $callback = $this->callback; // Can&#39;t call it directly in PHP :/
    return $callback($task, $scheduler);
  }
}

它将像其他任何可调用那样(使用_invoke)运行,不过它要求调度器把正在调用的任务和自身传递给这个函数。为了解决这个问题我们不得不微微的修改调度器的run方法:

<?phppublic function run() {  while (!$this->taskQueue->isEmpty()) {    $task = $this->taskQueue->dequeue();    $retval = $task->run();    if ($retval instanceof SystemCall) {      $retval($task, $this);      continue;
    }    if ($task->isFinished()) {      unset($this->taskMap[$task->getTaskId()]);
    } else {      $this->schedule($task);
    }
  }
}

第一个系统调用除了返回任务ID外什么都没有做:

<?phpfunction getTaskId() {    return new SystemCall(function(Task $task, Scheduler $scheduler) {        $task->setSendValue($task->getTaskId());        $scheduler->schedule($task);
    });
}

这个函数设置任务id为下一次发送的值,并再次调度了这个任务。由于使用了系统调用,所以调度器不能自动调用任务,我们需要手工调度任务(稍后你将明白为什么这么做)。要使用这个新的系统调用的话,我们要重新编写以前的例子:

<?phpfunction task($max) {  $tid = (yield getTaskId()); // <-- here&#39;s the syscall!
  for ($i = 1; $i <= $max; ++$i) {    echo "This is task $tid iteration $i.\n";
    yield;
  }
}$scheduler = new Scheduler;$scheduler->newTask(task(10));$scheduler->newTask(task(5));$scheduler->run();

这段代码将给出与前一个例子相同的输出。注意系统调用同其他任何调用一样正常地运行,不过预先增加了yield。要创建新的任务,然后再杀死它们的话,需要两个以上的系统调用:

<?phpfunction newTask(Generator $coroutine) {  return new SystemCall(    function(Task $task, Scheduler $scheduler) use ($coroutine) {      $task->setSendValue($scheduler->newTask($coroutine));      $scheduler->schedule($task);
    }
  );
}function killTask($tid) {  return new SystemCall(    function(Task $task, Scheduler $scheduler) use ($tid) {      $task->setSendValue($scheduler->killTask($tid));      $scheduler->schedule($task);
    }
  );
}

killTask函数需要在调度器里增加一个方法:

<?phppublic function killTask($tid) {  if (!isset($this->taskMap[$tid])) {    return false;
  }  unset($this->taskMap[$tid]);  // This is a bit ugly and could be optimized so it does not have to walk the queue,
  // but assuming that killing tasks is rather rare I won&#39;t bother with it now
  foreach ($this->taskQueue as $i => $task) {    if ($task->getTaskId() === $tid) {      unset($this->taskQueue[$i]);      break;
    }
  }  return true;
}

用来测试新功能的微脚本:

<?phpfunction childTask() {  $tid = (yield getTaskId());  while (true) {    echo "Child task $tid still alive!\n";
    yield;
  }
}function task() {  $tid = (yield getTaskId());  $childTid = (yield newTask(childTask()));  for ($i = 1; $i <= 6; ++$i) {    echo "Parent task $tid iteration $i.\n";
    yield;    if ($i == 3) yield killTask($childTid);
  }
}$scheduler = new Scheduler;$scheduler->newTask(task());$scheduler->run();?>

这段代码将打印以下信息:

Parent task 1 iteration 1.
Child task 2 still alive!Parent task 1 iteration 2.
Child task 2 still alive!Parent task 1 iteration 3.
Child task 2 still alive!Parent task 1 iteration 4.Parent task 1 iteration 5.Parent task 1 iteration 6.

经过三次迭代以后子任务将被杀死,因此这就是”Child is still alive”消息结束的时候。可能应当指出的是这不是真正的父子关系。 因为甚至在父任务结束后子任务仍然可以运行。或者子任务可以杀死父任务。可以修改调度器使它具有更层级化的任务结构,不过 在这篇文章里我没有这么做。

你可以实现许多进程管理调用。例如 wait(它一直等待到任务结束运行时),exec(它替代当前任务)和fork(它创建一个 当前任务的克隆)。fork非常酷,而且你可以使用PHP的协程真正地实现它,因为它们都支持克隆。

然而让我们把这些留给有兴趣的读者吧,我们来看下一个议题。

非阻塞IO

很明显,我们的任务管理系统的真正很酷的应用是web服务器。它有一个任务是在套接字上侦听是否有新连接,当有新连接要建立的时候 ,它创建一个新任务来处理新连接。

web服务器最难的部分通常是像读数据这样的套接字操作是阻塞的。例如PHP将等待到客户端完成发送为止。对一个WEB服务器来说,这 根本不行;这就意味着服务器在一个时间点上只能处理一个连接。

解决方案是确保在真正对套接字读写之前该套接字已经“准备就绪”。为了查找哪个套接字已经准备好读或者写了,可以使用 流选择函数。

首先,让我们添加两个新的 syscall,它们将等待直到指定 socket 准备好:

<?phpfunction waitForRead($socket) {  return new SystemCall(    function(Task $task, Scheduler $scheduler) use ($socket) {      $scheduler->waitForRead($socket, $task);
    }
  );
}function waitForWrite($socket) {  return new SystemCall(    function(Task $task, Scheduler $scheduler) use ($socket) {      $scheduler->waitForWrite($socket, $task);
    }
  );
}

这些 syscall 只是在调度器中代理其各自的方法:

<?php// resourceID => [socket, tasks]protected $waitingForRead = [];protected $waitingForWrite = [];public function waitForRead($socket, Task $task) {  if (isset($this->waitingForRead[(int) $socket])) {    $this->waitingForRead[(int) $socket][1][] = $task;
  } else {    $this->waitingForRead[(int) $socket] = [$socket, [$task]];
  }
}public function waitForWrite($socket, Task $task) {  if (isset($this->waitingForWrite[(int) $socket])) {    $this->waitingForWrite[(int) $socket][1][] = $task;
  } else {    $this->waitingForWrite[(int) $socket] = [$socket, [$task]];
  }
}

waitingForRead 及 waitingForWrite 属性是两个承载等待的socket 及等待它们的任务的数组。有趣的部分在于下面的方法,它将检查 socket 是否可用,并重新安排各自任务:

<?phpprotected function ioPoll($timeout) {  $rSocks = [];  foreach ($this->waitingForRead as list($socket)) {    $rSocks[] = $socket;
  }  $wSocks = [];  foreach ($this->waitingForWrite as list($socket)) {    $wSocks[] = $socket;
  }  $eSocks = []; // dummy
  if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) {    return;
  }  foreach ($rSocks as $socket) {    list(, $tasks) = $this->waitingForRead[(int) $socket];    unset($this->waitingForRead[(int) $socket]);    foreach ($tasks as $task) {      $this->schedule($task);
    }
  }  foreach ($wSocks as $socket) {    list(, $tasks) = $this->waitingForWrite[(int) $socket];    unset($this->waitingForWrite[(int) $socket]);    foreach ($tasks as $task) {      $this->schedule($task);
    }
  }
}

stream_select 函数接受承载读取、写入以及待检查的socket的数组(我们无需考虑最后一类)。数组将按引用传递,函数只会保留那些状态改变了的数组元素。我们可以遍历这些数组,并重新安排与之相关的任务。

为了正常地执行上面的轮询动作,我们将在调度器里增加一个特殊的任务:

<?phpprotected function ioPollTask() {  while (true) {    if ($this->taskQueue->isEmpty()) {      $this->ioPoll(null);
    } else {      $this->ioPoll(0);
    }
    yield;
  }
}

需要在某个地方注册这个任务,例如,你可以在run()方法的开始增加$this->newTask($this->ioPollTask())。然后就像其他 任务一样每执行完整任务循环一次就执行轮询操作一次(这么做一定不是最好的方法)。ioPollTask将使用0秒的超时来调用ioPoll, 这意味着stream_select将立即返回(而不是等待)。

只有任务队列为空时,我们才使用null超时,这意味着它一直等到某个套接口准备就绪。如果我们没有这么做,那么轮询任务将一而再, 再而三的循环运行,直到有新的连接建立。这将导致100%的CPU利用率。相反,让操作系统做这种等待会更有效。

现在编写服务器就相对容易了:

<?phpfunction server($port) {  echo "Starting server at port $port...\n";  $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);  if (!$socket) throw new Exception($errStr, $errNo);  stream_set_blocking($socket, 0);  while (true) {
    yield waitForRead($socket);    $clientSocket = stream_socket_accept($socket, 0);
    yield newTask(handleClient($clientSocket));
  }
}function handleClient($socket) {
  yield waitForRead($socket);  $data = fread($socket, 8192);  $msg = "Received following request:\n\n$data";  $msgLength = strlen($msg);  $response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r$msgRES;
  yield waitForWrite($socket);  fwrite($socket, $response);  fclose($socket);
}$scheduler = new Scheduler;$scheduler->newTask(server(8000));$scheduler->run();

这段代码将接收到localhost:8000上的连接,然后仅仅返回发送来的内容作为HTTP响应。要做“实际”的事情的话就爱哪个非常复杂(处理 HTTP请求可能已经超出了这篇文章的范围)。上面的代码片段只是演示了一般性的概念。

你可以使用类似于ab -n 10000 -c 100 localhost:8000/这样命令来测试服务器。这条命令将向服务器发送10000个请求,并且其中100个请求将同时到达。使用这样的数目,我得到了处于中间的10毫秒的响应时间。不过还有一个问题:有少数几个请求真正处理的很慢(如5秒), 这就是为什么总吞吐量只有2000请求/秒(如果是10毫秒的响应时间的话,总的吞吐量应该更像是10000请求/秒

协程堆栈

如果你试图用我们的调度系统建立更大的系统的话,你将很快遇到问题:我们习惯了把代码分解为更小的函数,然后调用它们。然而, 如果使用了协程的话,就不能这么做了。例如,看下面代码:

<?phpfunction echoTimes($msg, $max) {  for ($i = 1; $i <= $max; ++$i) {    echo "$msg iteration $i\n";
    yield;
  }
}function task() {
  echoTimes(&#39;foo&#39;, 10); // print foo ten times
  echo "---\n";
  echoTimes(&#39;bar&#39;, 5); // print bar five times
  yield; // force it to be a coroutine}$scheduler = new Scheduler;$scheduler->newTask(task());$scheduler->run();

这段代码试图把重复循环“输出n次“的代码嵌入到一个独立的协程里,然后从主任务里调用它。然而它无法运行。正如在这篇文章的开始 所提到的,调用生成器(或者协程)将没有真正地做任何事情,它仅仅返回一个对象。这也出现在上面的例子里。echoTimes调用除了放回一个(无用的)协程对象外不做任何事情。

为了仍然允许这么做,我们需要在这个裸协程上写一个小小的封装。我们将调用它:“协程堆栈”。因为它将管理嵌套的协程调用堆栈。 这将是通过生成协程来调用子协程成为可能:

$retval = (yield someCoroutine($foo, $bar));

使用yield,子协程也能再次返回值:

yield retval("I'm a return value!");

retval函数除了返回一个值的封装外没有做任何其他事情。这个封装将表示它是一个返回值。

<?phpclass CoroutineReturnValue {  protected $value;  public function __construct($value) {    $this->value = $value;
  }  public function getValue() {    return $this->value;
  }
}function retval($value) {  return new CoroutineReturnValue($value);
}

为了把协程转变为协程堆栈(它支持子调用),我们将不得不编写另外一个函数(很明显,它是另一个协程):

<?phpfunction stackedCoroutine(Generator $gen) {  $stack = new SplStack;  for (;;) {    $value = $gen->current();    if ($value instanceof Generator) {      $stack->push($gen);      $gen = $value;      continue;
    }    $isReturnValue = $value instanceof CoroutineReturnValue;    if (!$gen->valid() || $isReturnValue) {      if ($stack->isEmpty()) {        return;
      }      $gen = $stack->pop();      $gen->send($isReturnValue ? $value->getValue() : NULL);      continue;
    }    $gen->send(yield $gen->key() => $value);
  }
}

这个函数在调用者和当前正在运行的子协程之间扮演着简单代理的角色。在$gen->send(yield $gen->key()=>$value);这行完成了代理功能。另外它检查返回值是否是生成器,万一是生成器的话,它将开始运行这个生成器,并把前一个协程压入堆栈里。一旦它获得了CoroutineReturnValue的话,它将再次请求堆栈弹出,然后继续执行前一个协程。

为了使协程堆栈在任务里可用,任务构造器里的$this-coroutine =$coroutine;这行需要替代为$this->coroutine = StackedCoroutine($coroutine);。

现在我们可以稍微改进上面web服务器例子:把wait+read(和wait+write和warit+accept)这样的动作分组为函数。为了分组相关的 功能,我将使用下面类:

<?phpclass CoSocket {  protected $socket;  public function __construct($socket) {    $this->socket = $socket;
  }  public function accept() {
    yield waitForRead($this->socket);
    yield retval(new CoSocket(stream_socket_accept($this->socket, 0)));
  }  public function read($size) {
    yield waitForRead($this->socket);
    yield retval(fread($this->socket, $size));
  }  public function write($string) {
    yield waitForWrite($this->socket);    fwrite($this->socket, $string);
  }  public function close() {
    @fclose($this->socket);
  }
}

现在服务器可以编写的稍微简洁点了:

<?phpfunction server($port) {  echo "Starting server at port $port...\n";  $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);  if (!$socket) throw new Exception($errStr, $errNo);  stream_set_blocking($socket, 0);  $socket = new CoSocket($socket);  while (true) {
    yield newTask(
      handleClient(yield $socket->accept())
    );
  }
}function handleClient($socket) {  $data = (yield $socket->read(8192));  $msg = "Received following request:\n\n$data";  $msgLength = strlen($msg);  $response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r$msgRES;
  yield $socket->write($response);
  yield $socket->close();
}

错误处理

作为一个优秀的程序员,相信你已经察觉到上面的例子缺少错误处理。几乎所有的 socket 都是易出错的。我这样做的原因一方面固然是因为错误处理的乏味(特别是 socket!),另一方面也在于它很容易使代码体积膨胀。

不过,我仍然了一讲一下常见的协程错误处理:协程允许使用 throw() 方法在其内部抛出一个错误。尽管此方法还未在 PHP 中实现,但我很快就会提交它,就在今天。

throw() 方法接受一个 Exception,并将其抛出到协程的当前悬挂点,看看下面代码:

<?phpfunction gen() {  echo "Foo\n";  try {
    yield;
  } catch (Exception $e) {    echo "Exception: {$e->getMessage()}\n";
  }  echo "Bar\n";
}$gen = gen();$gen->rewind();                     // echos "Foo"$gen->throw(new Exception(&#39;Test&#39;)); // echos "Exception: Test"
                  // and "Bar"

这非常棒,因为我们可以使用系统调用以及子协程调用异常抛出。对与系统调用,Scheduler::run() 方法需要一些小调整:

<?phpif ($retval instanceof SystemCall) {  try {    $retval($task, $this);
  } catch (Exception $e) {    $task->setException($e);    $this->schedule($task);
  }  continue;
}

Task 类也许要添加 throw 调用处理:

<?phpclass Task {  // ...
  protected $exception = null;  public function setException($exception) {    $this->exception = $exception;
  }  public function run() {    if ($this->beforeFirstYield) {      $this->beforeFirstYield = false;      return $this->coroutine->current();
    } elseif ($this->exception) {      $retval = $this->coroutine->throw($this->exception);      $this->exception = null;      return $retval;
    } else {      $retval = $this->coroutine->send($this->sendValue);      $this->sendValue = null;      return $retval;
    }
  }  // ...}

现在,我们已经可以在系统调用中使用异常抛出了!例如,要调用 killTask,让我们在传递 ID 不可用时抛出一个异常:

<?phpfunction killTask($tid) {  return new SystemCall(    function(Task $task, Scheduler $scheduler) use ($tid) {      if ($scheduler->killTask($tid)) {        $scheduler->schedule($task);
      } else {        throw new InvalidArgumentException(&#39;Invalid task ID!&#39;);
      }
    }
  );
}

试试看:

<?phpfunction task() {    try {
        yield killTask(500);
    } catch (Exception $e) {        echo &#39;Tried to kill task 500 but failed: &#39;, $e->getMessage(), "\n";
    }
}

这些代码现在尚不能正常运作,因为 stackedCoroutine 函数无法正确处理异常。要修复需要做些调整:

<?phpfunction stackedCoroutine(Generator $gen) {  $stack = new SplStack;  $exception = null;  for (;;) {    try {      if ($exception) {        $gen->throw($exception);        $exception = null;        continue;
      }      $value = $gen->current();      if ($value instanceof Generator) {        $stack->push($gen);        $gen = $value;        continue;
      }      $isReturnValue = $value instanceof CoroutineReturnValue;      if (!$gen->valid() || $isReturnValue) {        if ($stack->isEmpty()) {          return;
        }        $gen = $stack->pop();        $gen->send($isReturnValue ? $value->getValue() : NULL);        continue;
      }      try {        $sendValue = (yield $gen->key() => $value);
      } catch (Exception $e) {        $gen->throw($e);        continue;
      }      $gen->send($sendValue);
    } catch (Exception $e) {      if ($stack->isEmpty()) {        throw $e;
      }      $gen = $stack->pop();      $exception = $e;
    }
  }
}

结束语

在这篇文章里,我使用多任务协作构建了一个任务调度器,其中包括执行“系统调用”,做非阻塞操作和处理错误。所有这些里真正很酷的事情是任务的结果代码看起来完全同步,甚至任务正在执行大量的异步操作的时候也是这样。如果你打算从套接口读取数据的话,你将不需要传递某个回调函数或者注册一个事件侦听器。相反,你只要书写yield $socket->read()。这儿大部分都是你常常也要编写的,只在它的前面增加yield。

当我第一次听到所有这一切的时候,我发现这个概念完全令人折服,而且正是这个激励我在PHP中实现了它。同时我发现协程真正令人心慌。在令人敬畏的代码和很大一堆代码之间只有单薄的一行,我认为协程正好处在这一行上。讲讲使用上面所述的方法书写异步代码是否真的有益对我来说很难。

위 내용은 PHP는 코루틴을 사용하여 다중 작업 예약 방법 예제를 구현합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.