PHP コルーチンの初体験

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBオリジナル
2016-06-23 13:44:371156ブラウズ

PHP コルーチンの初体験

By warezhou 2014.11.24

C 拡張機能を介して PHP にコルーチンを追加する最後の試みが失敗した後、短期的に Zend を使用する可能性はほぼゼロであるため、ネイティブ言語に頼るしかありません能力。その後 Google は、PHP5.5 で Generator と Coroutine の新機能が導入されたことを発見し、この記事が生まれました。

背景資料

「C/C++ バックグラウンド開発が Coroutine と出会うとき」

http://km.oa.com/group/906/articles/show/165396

「失敗した PHP 拡張機能開発の旅」

http:/ /km.oa.com/group/906/articles/show/208269

予備知識

Generator

function my_range($start, $end, $step = 1) {    for ($i = $start; $i <= $end; $i += $step) {        yield $i;    }}foreach (my_range(1, 1000) as $num) {    echo $num, "\n";}/* * 1 * 2 * ... * 1000 */

図 1 ジェネレーターに基づく range() の実装

$range = my_range(1, 1000);var_dump($range);/* * object(Generator)#1 (0) { * } */var_dump($range instanceof Iterator);/* * bool(true) */

図 2 my_range() の実装 推測

私はまだ PHP に慣れておらず、言語実装の詳細については詳しく調べていないため、現象に基づいて推測することしかできませんが、以下は私の個人的な理解の一部です:

  • yield キーワードを含む関数は非常に特殊です。戻り値は Generator オブジェクトですが、この時点では関数内のステートメントは実際には実行されていません
  • Generator オブジェクトは Iterator インターフェイスのインスタンスであり、rewind()、current()、next( を通じて操作できます) )、および valid() シリーズのインターフェイス
  • Generator は「中断できる」機能と見なすことができ、yield は一連の「中断ポイント」を構成します
  • Generator は工場生産の組立ラインに似ています。が必要な場合、そこから 1 つが取得され、アセンブリ ラインはそこで停止して次のテイクを待ちます。
  • コルーチン

    注意深い読者は、これまでのところ、Generator がコルーチンの主要な機能である割り込み実行と実装を実行していることに気づいたかもしれません。実行を再開します。 「When C/C++ Backend Development Meets Coroutine」の考え方によれば、情報の受け渡しや非同期サーバーの実装には「グローバル変数」などの言語機能を使えば十分なはずです。

    実際、swapcontext ファミリの関数と比較して、Generator は大きな進歩を遂げており、「データを返す」機能も備えています。また、「データを送信する」機能も備えているので、これらのつまらない機能を使用する必要はなくなりました。迂回路。 PHP では、ジェネレーターの send() インターフェイス (注: これは next() インターフェイスではありません) を通じて、「データを送信する」というタスクを完了することができ、真の「双方向通信」を実現します。

    function gen() {    $ret = (yield 'yield1');    echo "[gen]", $ret, "\n";    $ret = (yield 'yield2');    echo "[gen]", $ret, "\n";}$gen = gen();$ret = $gen->current();echo "[main]", $ret, "\n";$ret = $gen->send("send1");echo "[main]", $ret, "\n";$ret = $gen->send("send2");echo "[main]", $ret, "\n";/* * [main]yield1 * [gen]send1 * [main]yield2 * [gen]send2 * [main] */

    図 3 コルーチンの双方向通信の例

    C/C++ のプログラマーとして、「リエントランシー」と「双方向通信」機能を発見した後は、これ以上贅沢はできないように思えますが、PHP はまだより寛大な、引き続き追加 例外メカニズムが追加され、「エラー処理」メカニズムがさらに改善されました。

    function gen() {    $ret = (yield 'yield1');    echo "[gen]", $ret, "\n";    try {        $ret = (yield 'yield2');        echo "[gen]", $ret, "\n";    } catch (Exception $ex) {        echo "[gen][Exception]", $ex->getMessage(), "\n";    }       echo "[gen]finish\n";}$gen = gen();$ret = $gen->current();echo "[main]", $ret, "\n";$ret = $gen->send("send1");echo "[main]", $ret, "\n";$ret = $gen->throw(new Exception("Test"));echo "[main]", $ret, "\n";/* * [main]yield1 * [gen]send1 * [main]yield2 * [gen][Exception]Test * [gen]finish * [main] */

    図 4 コルーチンのエラー処理例

    実践的な演習

    関連する言語機能については以前簡単に紹介しましたが、実際のプロジェクトではどのように使用すればよいでしょうか? 「失敗した PHP 拡張機能開発の旅」で説明したシナリオを続けて、上記の機能を使用して、同期的な方法で非同期コードを記述するという美しい願いを実現しましょう。

    初稿

    <?phpclass AsyncServer {    protected $handler;    protected $socket;    protected $tasks = [];    public function __construct($handler) {        $this->handler = $handler;        $this->socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);        if(!$this->socket) {            die(socket_strerror(socket_last_error())."\n");        }        if (!socket_set_nonblock($this->socket)) {            die(socket_strerror(socket_last_error())."\n");        }        if(!socket_bind($this->socket, "0.0.0.0", 1234)) {            die(socket_strerror(socket_last_error())."\n");        }    }    public function Run() {        while (true) {            $reads = array($this->socket);            foreach ($this->tasks as list($socket)) {                $reads[] = $socket;            }            $writes = NULL;            $excepts= NULL;            if (!socket_select($reads, $writes, $excepts, 0, 1000)) {                continue;            }            foreach ($reads as $one) {                $len = socket_recvfrom($one, $data, 65535, 0, $ip, $port);                if (!$len) {                    //echo "socket_recvfrom fail.\n";                    continue;                }                if ($one == $this->socket) {                    //echo "[Run]request recvfrom succ. data=$data ip=$ip port=$port\n";                    $handler = $this->handler;                    $coroutine = $handler($one, $data, $len, $ip, $port);                    $task = $coroutine->current();                    //echo "[Run]AsyncTask recv. data=$task->data ip=$task->ip port=$task->port timeout=$task->timeout\n";                    $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);                    if(!$socket) {                        //echo socket_strerror(socket_last_error())."\n";                        $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error()));                        continue;                    }                    if (!socket_set_nonblock($socket)) {                        //echo socket_strerror(socket_last_error())."\n";                        $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error()));                        continue;                    }                    socket_sendto($socket, $task->data, $task->len, 0, $task->ip, $task->port);                    $this->tasks[$socket] = [$socket, $coroutine];                } else {                    //echo "[Run]response recvfrom succ. data=$data ip=$ip port=$port\n";                    if (!isset($this->tasks[$one])) {                        //echo "no async_task found.\n";                    } else {                        list($socket, $coroutine) = $this->tasks[$one];                        unset($this->tasks[$one]);                        socket_close($socket);                        $coroutine->send(array($data, $len));                    }                }            }        }    }}class AsyncTask {    public $data;    public $len;    public $ip;    public $port;    public $timeout;    public function __construct($data, $len, $ip, $port, $timeout) {        $this->data = $data;        $this->len = $len;        $this->ip = $ip;        $this->port = $port;        $this->timeout = $timeout;    }}function RequestHandler($socket, $req_buf, $req_len, $ip, $port) {    //echo "[RequestHandler] before yield AsyncTask. REQ=$req_buf\n";    list($rsp_buf, $rsp_len) = (yield new AsyncTask($req_buf, $req_len, "127.0.0.1", 2345, 1000));    //echo "[RequestHandler] after yield AsyncTask. RSP=$rsp_buf\n";    socket_sendto($socket, $rsp_buf, $rsp_len, 0, $ip, $port);}$server = new AsyncServer(RequestHandler);$server->Run();?>

    コード解釈:

  • 問題の説明を容易にするために、ここでの基礎となる通信はすべて UDP に基づいており、TCP 接続などの面倒な詳細は省略しています
  • AsyncServer は基礎となるフレームワーク クラスであり、カプセル化されていますネットワーク通信の詳細とコルーチンの切り替え詳細、ソケットを介したコルーチンのバインディング
  • RequestHandler は業務処理関数であり、非同期ネットワーク インタラクションは yield new AsyncTask() によって実現されます
  • 2 番目のバージョンは改善されています

    最初のバージョンの残された問題:

  • 非同期ネットワークインタラクションのタイムアウト 未実装、インターフェイスパラメータのみが予約されています
  • yield new AsyncTask() 呼び出しメソッドは不自然で少しぎこちなく感じます
  • <?phpclass AsyncServer {    protected $handler;    protected $socket;    protected $tasks = [];    protected $timers = [];    public function __construct(callable $handler) {        $this->handler = $handler;        $this->socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);        if(!$this->socket) {            die(socket_strerror(socket_last_error())."\n");        }        if (!socket_set_nonblock($this->socket)) {            die(socket_strerror(socket_last_error())."\n");        }        if(!socket_bind($this->socket, "0.0.0.0", 1234)) {            die(socket_strerror(socket_last_error())."\n");        }    }    public function Run() {        while (true) {            $now = microtime(true) * 1000;            foreach ($this->timers as $time => $sockets) {                if ($time > $now) break;                foreach ($sockets as $one) {                    list($socket, $coroutine) = $this->tasks[$one];                    unset($this->tasks[$one]);                    socket_close($socket);                    $coroutine->throw(new Exception("Timeout"));                }                unset($this->timers[$time]);            }            $reads = array($this->socket);            foreach ($this->tasks as list($socket)) {                $reads[] = $socket;            }            $writes = NULL;            $excepts= NULL;            if (!socket_select($reads, $writes, $excepts, 0, 1000)) {                continue;            }            foreach ($reads as $one) {                $len = socket_recvfrom($one, $data, 65535, 0, $ip, $port);                if (!$len) {                    //echo "socket_recvfrom fail.\n";                    continue;                }                if ($one == $this->socket) {                    //echo "[Run]request recvfrom succ. data=$data ip=$ip port=$port\n";                    $handler = $this->handler;                    $coroutine = $handler($one, $data, $len, $ip, $port);                    if (!$coroutine) {                        //echo "[Run]everything is done.\n";                        continue;                    }                    $task = $coroutine->current();                    //echo "[Run]AsyncTask recv. data=$task->data ip=$task->ip port=$task->port timeout=$task->timeout\n";                    $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);                    if(!$socket) {                        //echo socket_strerror(socket_last_error())."\n";                        $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error()));                        continue;                    }                    if (!socket_set_nonblock($socket)) {                        //echo socket_strerror(socket_last_error())."\n";                        $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error()));                        continue;                    }                    socket_sendto($socket, $task->data, $task->len, 0, $task->ip, $task->port);                    $deadline = $now + $task->timeout;                    $this->tasks[$socket] = [$socket, $coroutine, $deadline];                    $this->timers[$deadline][$socket] = $socket;                } else {                    //echo "[Run]response recvfrom succ. data=$data ip=$ip port=$port\n";                    list($socket, $coroutine, $deadline) = $this->tasks[$one];                    unset($this->tasks[$one]);                    unset($this->timers[$deadline][$one]);                    socket_close($socket);                    $coroutine->send(array($data, $len));                }            }        }    }}class AsyncTask {    public $data;    public $len;    public $ip;    public $port;    public $timeout;    public function __construct($data, $len, $ip, $port, $timeout) {        $this->data = $data;        $this->len = $len;        $this->ip = $ip;        $this->port = $port;        $this->timeout = $timeout;    }}function AsyncSendRecv($req_buf, $req_len, $ip, $port, $timeout) {    return new AsyncTask($req_buf, $req_len, $ip, $port, $timeout);}function RequestHandler($socket, $req_buf, $req_len, $ip, $port) {    //echo "[RequestHandler] before yield AsyncTask. REQ=$req_buf\n";    try {        list($rsp_buf, $rsp_len) = (yield AsyncSendRecv($req_buf, $req_len, "127.0.0.1", 2345, 3000));    } catch (Exception $ex) {        $rsp_buf = $ex->getMessage();        $rsp_len = strlen($rsp_buf);        //echo "[Exception]$rsp_buf\n";    }    //echo "[RequestHandler] after yield AsyncTask. RSP=$rsp_buf\n";    socket_sendto($socket, $rsp_buf, $rsp_len, 0, $ip, $port);}$server = new AsyncServer(RequestHandler);$server->Run();?>

    コード解釈:

  • PHP の組み込み配列を使用しますミリ秒単位で簡単な「タイムアウト管理」を実現する機能 精度を高めるためにタイム スライスを使用する
  • AsyncSendRecv インターフェイスをカプセル化し、より自然な yield AsyncSendRecv() の形式で呼び出します
  • エラー処理メカニズムとして例外を追加すると、次のことが可能になりますret_code も追加します。これは表示専用です
  • パフォーマンス テスト

    テスト環境


    100Byte/REQ1000Byte/REQasync_svr_v1.php16000/秒 15000/s11000/s 、send、recv、sleep... Python などの言語にも同様の言語機能がありますので、興味のある読者は自分で調べてください。
    aasync_svr_v2.php10000/s
    私はまだ PHP に慣れていないのですが、多くの使用法は最適化されていません。専門家であれば、ターゲットを絞った最適化を行うことができ、パフォーマンスは向上し続けるはずです。 現在、コルーチン バインディングは以下に基づいています。 TCP 通信に基づいている場合、接続/クローズのそれぞれにコストがかかりすぎるため、接続プールの実装を検討する必要があります
    声明:
    この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
    前の記事:str_replace の問題次の記事:str_replace の問題