PHP コルーチンの初体験
By warezhou 2014.11.24
C 拡張機能を介して PHP にコルーチンを追加する最後の試みが失敗した後、短期的に Zend を使用する可能性はほぼゼロであるため、ネイティブ言語に頼るしかありません能力。その後 Google は、PHP5.5 で Generator と Coroutine の新機能が導入されたことを発見し、この記事が生まれました。
背景資料「C/C++ バックグラウンド開発が Coroutine と出会うとき」
http://km.oa.com/group/906/articles/show/165396
「失敗した PHP 拡張機能開発の旅」
http:/ /km.oa.com/group/906/articles/show/208269
予備知識
function my_range($start, $end, $step = 1) { for ($i = $start; $i <= $end; $i += $step) { yield $i; }}foreach (my_range(1, 1000) as $num) { echo $num, "\n";}/* * 1 * 2 * ... * 1000 */
図 1 ジェネレーターに基づく range() の実装
$range = my_range(1, 1000);var_dump($range);/* * object(Generator)#1 (0) { * } */var_dump($range instanceof Iterator);/* * bool(true) */
図 2 my_range() の実装 推測
私はまだ PHP に慣れておらず、言語実装の詳細については詳しく調べていないため、現象に基づいて推測することしかできませんが、以下は私の個人的な理解の一部です:
注意深い読者は、これまでのところ、Generator がコルーチンの主要な機能である割り込み実行と実装を実行していることに気づいたかもしれません。実行を再開します。 「When C/C++ Backend Development Meets Coroutine」の考え方によれば、情報の受け渡しや非同期サーバーの実装には「グローバル変数」などの言語機能を使えば十分なはずです。
実際、swapcontext ファミリの関数と比較して、Generator は大きな進歩を遂げており、「データを返す」機能も備えています。また、「データを送信する」機能も備えているので、これらのつまらない機能を使用する必要はなくなりました。迂回路。 PHP では、ジェネレーターの send() インターフェイス (注: これは next() インターフェイスではありません) を通じて、「データを送信する」というタスクを完了することができ、真の「双方向通信」を実現します。
function gen() { $ret = (yield 'yield1'); echo "[gen]", $ret, "\n"; $ret = (yield 'yield2'); echo "[gen]", $ret, "\n";}$gen = gen();$ret = $gen->current();echo "[main]", $ret, "\n";$ret = $gen->send("send1");echo "[main]", $ret, "\n";$ret = $gen->send("send2");echo "[main]", $ret, "\n";/* * [main]yield1 * [gen]send1 * [main]yield2 * [gen]send2 * [main] */
図 3 コルーチンの双方向通信の例
C/C++ のプログラマーとして、「リエントランシー」と「双方向通信」機能を発見した後は、これ以上贅沢はできないように思えますが、PHP はまだより寛大な、引き続き追加 例外メカニズムが追加され、「エラー処理」メカニズムがさらに改善されました。
function gen() { $ret = (yield 'yield1'); echo "[gen]", $ret, "\n"; try { $ret = (yield 'yield2'); echo "[gen]", $ret, "\n"; } catch (Exception $ex) { echo "[gen][Exception]", $ex->getMessage(), "\n"; } echo "[gen]finish\n";}$gen = gen();$ret = $gen->current();echo "[main]", $ret, "\n";$ret = $gen->send("send1");echo "[main]", $ret, "\n";$ret = $gen->throw(new Exception("Test"));echo "[main]", $ret, "\n";/* * [main]yield1 * [gen]send1 * [main]yield2 * [gen][Exception]Test * [gen]finish * [main] */
図 4 コルーチンのエラー処理例
実践的な演習関連する言語機能については以前簡単に紹介しましたが、実際のプロジェクトではどのように使用すればよいでしょうか? 「失敗した PHP 拡張機能開発の旅」で説明したシナリオを続けて、上記の機能を使用して、同期的な方法で非同期コードを記述するという美しい願いを実現しましょう。
<?phpclass AsyncServer { protected $handler; protected $socket; protected $tasks = []; public function __construct($handler) { $this->handler = $handler; $this->socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); if(!$this->socket) { die(socket_strerror(socket_last_error())."\n"); } if (!socket_set_nonblock($this->socket)) { die(socket_strerror(socket_last_error())."\n"); } if(!socket_bind($this->socket, "0.0.0.0", 1234)) { die(socket_strerror(socket_last_error())."\n"); } } public function Run() { while (true) { $reads = array($this->socket); foreach ($this->tasks as list($socket)) { $reads[] = $socket; } $writes = NULL; $excepts= NULL; if (!socket_select($reads, $writes, $excepts, 0, 1000)) { continue; } foreach ($reads as $one) { $len = socket_recvfrom($one, $data, 65535, 0, $ip, $port); if (!$len) { //echo "socket_recvfrom fail.\n"; continue; } if ($one == $this->socket) { //echo "[Run]request recvfrom succ. data=$data ip=$ip port=$port\n"; $handler = $this->handler; $coroutine = $handler($one, $data, $len, $ip, $port); $task = $coroutine->current(); //echo "[Run]AsyncTask recv. data=$task->data ip=$task->ip port=$task->port timeout=$task->timeout\n"; $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); if(!$socket) { //echo socket_strerror(socket_last_error())."\n"; $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error())); continue; } if (!socket_set_nonblock($socket)) { //echo socket_strerror(socket_last_error())."\n"; $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error())); continue; } socket_sendto($socket, $task->data, $task->len, 0, $task->ip, $task->port); $this->tasks[$socket] = [$socket, $coroutine]; } else { //echo "[Run]response recvfrom succ. data=$data ip=$ip port=$port\n"; if (!isset($this->tasks[$one])) { //echo "no async_task found.\n"; } else { list($socket, $coroutine) = $this->tasks[$one]; unset($this->tasks[$one]); socket_close($socket); $coroutine->send(array($data, $len)); } } } } }}class AsyncTask { public $data; public $len; public $ip; public $port; public $timeout; public function __construct($data, $len, $ip, $port, $timeout) { $this->data = $data; $this->len = $len; $this->ip = $ip; $this->port = $port; $this->timeout = $timeout; }}function RequestHandler($socket, $req_buf, $req_len, $ip, $port) { //echo "[RequestHandler] before yield AsyncTask. REQ=$req_buf\n"; list($rsp_buf, $rsp_len) = (yield new AsyncTask($req_buf, $req_len, "127.0.0.1", 2345, 1000)); //echo "[RequestHandler] after yield AsyncTask. RSP=$rsp_buf\n"; socket_sendto($socket, $rsp_buf, $rsp_len, 0, $ip, $port);}$server = new AsyncServer(RequestHandler);$server->Run();?>
コード解釈:
最初のバージョンの残された問題:
<?phpclass AsyncServer { protected $handler; protected $socket; protected $tasks = []; protected $timers = []; public function __construct(callable $handler) { $this->handler = $handler; $this->socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); if(!$this->socket) { die(socket_strerror(socket_last_error())."\n"); } if (!socket_set_nonblock($this->socket)) { die(socket_strerror(socket_last_error())."\n"); } if(!socket_bind($this->socket, "0.0.0.0", 1234)) { die(socket_strerror(socket_last_error())."\n"); } } public function Run() { while (true) { $now = microtime(true) * 1000; foreach ($this->timers as $time => $sockets) { if ($time > $now) break; foreach ($sockets as $one) { list($socket, $coroutine) = $this->tasks[$one]; unset($this->tasks[$one]); socket_close($socket); $coroutine->throw(new Exception("Timeout")); } unset($this->timers[$time]); } $reads = array($this->socket); foreach ($this->tasks as list($socket)) { $reads[] = $socket; } $writes = NULL; $excepts= NULL; if (!socket_select($reads, $writes, $excepts, 0, 1000)) { continue; } foreach ($reads as $one) { $len = socket_recvfrom($one, $data, 65535, 0, $ip, $port); if (!$len) { //echo "socket_recvfrom fail.\n"; continue; } if ($one == $this->socket) { //echo "[Run]request recvfrom succ. data=$data ip=$ip port=$port\n"; $handler = $this->handler; $coroutine = $handler($one, $data, $len, $ip, $port); if (!$coroutine) { //echo "[Run]everything is done.\n"; continue; } $task = $coroutine->current(); //echo "[Run]AsyncTask recv. data=$task->data ip=$task->ip port=$task->port timeout=$task->timeout\n"; $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); if(!$socket) { //echo socket_strerror(socket_last_error())."\n"; $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error())); continue; } if (!socket_set_nonblock($socket)) { //echo socket_strerror(socket_last_error())."\n"; $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error())); continue; } socket_sendto($socket, $task->data, $task->len, 0, $task->ip, $task->port); $deadline = $now + $task->timeout; $this->tasks[$socket] = [$socket, $coroutine, $deadline]; $this->timers[$deadline][$socket] = $socket; } else { //echo "[Run]response recvfrom succ. data=$data ip=$ip port=$port\n"; list($socket, $coroutine, $deadline) = $this->tasks[$one]; unset($this->tasks[$one]); unset($this->timers[$deadline][$one]); socket_close($socket); $coroutine->send(array($data, $len)); } } } }}class AsyncTask { public $data; public $len; public $ip; public $port; public $timeout; public function __construct($data, $len, $ip, $port, $timeout) { $this->data = $data; $this->len = $len; $this->ip = $ip; $this->port = $port; $this->timeout = $timeout; }}function AsyncSendRecv($req_buf, $req_len, $ip, $port, $timeout) { return new AsyncTask($req_buf, $req_len, $ip, $port, $timeout);}function RequestHandler($socket, $req_buf, $req_len, $ip, $port) { //echo "[RequestHandler] before yield AsyncTask. REQ=$req_buf\n"; try { list($rsp_buf, $rsp_len) = (yield AsyncSendRecv($req_buf, $req_len, "127.0.0.1", 2345, 3000)); } catch (Exception $ex) { $rsp_buf = $ex->getMessage(); $rsp_len = strlen($rsp_buf); //echo "[Exception]$rsp_buf\n"; } //echo "[RequestHandler] after yield AsyncTask. RSP=$rsp_buf\n"; socket_sendto($socket, $rsp_buf, $rsp_len, 0, $ip, $port);}$server = new AsyncServer(RequestHandler);$server->Run();?>
コード解釈:
async_svr_v1.php | 16000/秒 | |
aasync_svr_v2.php | 11000/s10000/s | |
私はまだ PHP に慣れていないのですが、多くの使用法は最適化されていません。専門家であれば、ターゲットを絞った最適化を行うことができ、パフォーマンスは向上し続けるはずです。 | 現在、コルーチン バインディングは以下に基づいています。 TCP 通信に基づいている場合、接続/クローズのそれぞれにコストがかかりすぎるため、接続プールの実装を検討する必要があります | Python などの言語にも同様の言語機能がありますので、興味のある読者は自分で調べてください。