ホームページ >バックエンド開発 >PHPチュートリアル >PHP は SwooleTaskWorker を使用して非同期操作 MySQL を実装する方法 (コード)

PHP は SwooleTaskWorker を使用して非同期操作 MySQL を実装する方法 (コード)

不言
不言転載
2018-10-17 16:02:112757ブラウズ

この記事の内容は、PHP が SwooleTaskWorker を使用して Mysql の非同期操作を実装する方法 (コード) に関するもので、一定の参考値があります。必要な友人は参考にしてください。お役に立てば幸いです。

一般的なサーバー プログラムには、電子メールの送信、チャット サーバーによるブロードキャストの送信など、時間のかかるタスクがいくつかあります。これらのタスクを同期ブロッキング防水を使用して実行すると、間違いなく非常に遅くなります。

Swoole の TaskWorker プロセス プールを使用すると、後続のタスクに影響を与えることなく一部の非同期タスクを実行でき、上記のシナリオの処理に非常に適しています。

それでは、非同期タスクとは何でしょうか?

下の図から簡単に理解できます。 (インターネットからの侵入と削除)

PHP は SwooleTaskWorker を使用して非同期操作 MySQL を実装する方法 (コード)

前回の Swoole 記事では、簡単なサーバーの作成方法といくつかのコアの使い方を紹介しました。コールバック関数。

上記の非同期処理を実装するには、onTask と onFinish の 2 つのイベント コールバックを追加するだけです。これら 2 つのコールバック関数は、それぞれ Task タスクの実行と Task タスクの返された結果の処理に使用されます。また、setメソッドでタスクの処理数を設定する必要があります。

使用例:

class Server
{
    private $serv;
    public function __construct() {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            'worker_num' => 4,
            'daemonize' => false,
            'task_worker_num' => 8
        ));
        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));
        $this->serv->on('Task', array($this, 'onTask'));
        $this->serv->on('Finish', array($this, 'onFinish'));
        $this->serv->start();
    }

    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
        echo "Get Message From Client {$fd}:{$data}\n";
        // 发送任务到Task进程
        $param = array(
            'fd' => $fd
        );
        $serv->task( json_encode( $param ) );
        echo "继续处理之后的逻辑\n";
    }

    public function onTask($serv, $task_id, $from_id, $data) {
        echo "This Task {$task_id} from Worker {$from_id}\n";
        echo "Data: {$data}\n";
        for($i = 0 ; $i send( $fd , "Data in Task {$task_id}");
        return "Task {$task_id}'s result";
    }
    public function onFinish($serv,$task_id, $data) {
        echo "Task {$task_id} finish\n";
        echo "Result: {$data}\n";
    }
    public function onStart( $serv ) {
        echo "Server Start\n";
    }
    public function onConnect( $serv, $fd, $from_id ) {
        echo "Client {$fd} connect\n";
    }
    public function onClose( $serv, $fd, $from_id ) {
        echo "Client {$fd} close connection\n";
    }
}
$server = new Server();

上記の例からわかるように、非同期タスクを開始するには、swoole_server のタスク メソッドを呼び出すだけです。送信後、onTask コールバックがトリガーされ、$task_id と $from_id を通じてさまざまなプロセスのさまざまなタスクを処理できます。最後に、文字列を返すことで実行結果をワーカー プロセスに返すことができ、ワーカー プロセスは onFinish コールバックを通じて結果を処理します。

そして、上記のコードに基づいて、mysql の非同期操作を実現できます。非同期操作 mysql は、次のシナリオに適しています。

  • 同時読み取りおよび書き込み操作

  • タイミングに厳密な関係はありません

  • メインスレッドのロジックには影響しません

利点:

  • 同時実行性の向上

  • IO 消費量の削減

データベースに対する圧力は主に、mysql によって維持される接続数にあります。同時実行数が 1,000 の場合、mysql は対応する接続​​を確立する必要があります。接続数。ロング接続方式では、プロセス内で MySQL 接続が維持されるため、接続作成のロスが軽減されます。 swoole を介して複数のタスクプロセスを起動でき、各プロセスで MySQL の長い接続を維持できるため、MySQL 接続プール技術の拡張にも使用できます。また、mysql サーバーは、長期間クエリがないことを検出すると、切断してリソースをリサイクルするため、切断と再接続のメカニズムが必要であることにも注意してください。

以下は、mysql の単純な非同期操作の例です。

上記のコードでも、変更する必要があるのは、onReceive、onTask、および onFinish の 3 つの関数だけです。

class Server
{
    private $serv;
    public function __construct() {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            'worker_num' => 4,
            'daemonize' => false,
            'task_worker_num' => 8 // task进程数量 即为维持的MySQL连接的数量
        ));
        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));
        $this->serv->on('Task', array($this, 'onTask'));
        $this->serv->on('Finish', array($this, 'onFinish'));
        $this->serv->start();
    }

    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
        echo "收到数据". $data . PHP_EOL;
        // 发送任务到Task进程
        $param = array(
            'sql' => $data, // 接收客户端发送的 sql 
            'fd'  => $fd
        );
        $serv->task( json_encode( $param ) );  // 向 task 投递任务
        echo "继续处理之后的逻辑\n";
    }

    public function onTask($serv, $task_id, $from_id, $data) {
        echo "This Task {$task_id} from Worker {$from_id}\n";
        echo "recv SQL: {$data['sql']}\n";
        static $link = null;
        $sql = $data['sql'];
        $fd  = $data['fd'];
        HELL:
        if ($link == null) {
            $link = @mysqli_connect("127.0.0.1", "root", "root", "test");
        }
        $result = $link->query($sql);
        if (!$result) { //如果查询失败
            if(in_array(mysqli_errno($link), [2013, 2006])){
                //错误码为2013,或者2006,则重连数据库,重新执行sql
                    $link = null;
                    goto HELL;
            }
        }
        if(preg_match("/^select/i", $sql)){//如果是select操作,就返回关联数组
             $data = array();
                while ($fetchResult = mysqli_fetch_assoc($result) ){
                     $data['data'][] = $fetchResult;
                }                
        }else{//否则直接返回结果
            $data['data'] = $result;
        }
        $data['status'] = "OK";
        $data['fd'] = $fd;
        $serv->finish(json_encode($data));
    }
    public function onFinish($serv, $task_id, $data) {
        echo "Task {$task_id} finish\n";
        $result = json_decode($result, true);
        if ($result['status'] == 'OK') {
            $this->serv->send($result['fd'], json_encode($result['data']) . "\n");
        } else {
            $this->serv->send($result['fd'], $result);
        }
    }
    public function onStart( $serv ) {
        echo "Server Start\n";
    }
    public function onConnect( $serv, $fd, $from_id ) {
        echo "Client {$fd} connect\n";
    }
    public function onClose( $serv, $fd, $from_id ) {
        echo "Client {$fd} close connection\n";
    }
}
$server = new Server();

上記のコードは、onReceive 中に SQL を直接受信し、それを Task タスクに直接送信します。このとき、処理の次のステップがすぐに出力され、ここでも非同期性が反映されます。次に、onTask と onFinish を使用して SQL をデータベースに送信し、タスクの実行結果を処理します。

以上がPHP は SwooleTaskWorker を使用して非同期操作 MySQL を実装する方法 (コード)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はsegmentfault.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。