Home >Backend Development >PHP Tutorial >How does php use SwooleTaskWorker to implement asynchronous operation Mysql (code)

How does php use SwooleTaskWorker to implement asynchronous operation Mysql (code)

不言
不言forward
2018-10-17 16:02:112724browse

The content of this article is about how PHP uses SwooleTaskWorker to implement asynchronous operation of Mysql (code). It has certain reference value. Friends in need can refer to it. I hope it will be helpful to you.

In general Server programs, there are some time-consuming tasks, such as sending emails, chat servers sending broadcasts, etc. If we use synchronous blocking waterproofing to perform these tasks, it will definitely be very slow.

Swoole's TaskWorker process pool can be used to perform some asynchronous tasks without affecting subsequent tasks, which is very suitable for handling the above scenarios.

So what is an asynchronous task?

You can get a brief understanding from the diagram below. (From the Internet, intrusion and deletion)

How does php use SwooleTaskWorker to implement asynchronous operation Mysql (code)

Our last Swoole article introduced how to create a simple server and knew a few cores How to use the callback function.

To implement the above asynchronous processing, you only need to add two event callbacks: onTask and onFinish. These two callback functions are used to execute Task tasks and process the return results of Task tasks respectively. In addition, you need to set the number of task processes in the set method.

Usage example:

class Server
{
    private $serv;
    public function __construct() {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            'worker_num' => 4,
            'daemonize' => false,
            'task_worker_num' => 8
        ));
        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));
        $this->serv->on('Task', array($this, 'onTask'));
        $this->serv->on('Finish', array($this, 'onFinish'));
        $this->serv->start();
    }

    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
        echo "Get Message From Client {$fd}:{$data}\n";
        // 发送任务到Task进程
        $param = array(
            'fd' => $fd
        );
        $serv->task( json_encode( $param ) );
        echo "继续处理之后的逻辑\n";
    }

    public function onTask($serv, $task_id, $from_id, $data) {
        echo "This Task {$task_id} from Worker {$from_id}\n";
        echo "Data: {$data}\n";
        for($i = 0 ; $i send( $fd , "Data in Task {$task_id}");
        return "Task {$task_id}'s result";
    }
    public function onFinish($serv,$task_id, $data) {
        echo "Task {$task_id} finish\n";
        echo "Result: {$data}\n";
    }
    public function onStart( $serv ) {
        echo "Server Start\n";
    }
    public function onConnect( $serv, $fd, $from_id ) {
        echo "Client {$fd} connect\n";
    }
    public function onClose( $serv, $fd, $from_id ) {
        echo "Client {$fd} close connection\n";
    }
}
$server = new Server();

As you can see from the above example, to initiate an asynchronous task, you only need to call the task method of swoole_server. After sending, the onTask callback will be triggered, and different tasks of different processes can be processed through $task_id and $from_id. Finally, the execution result can be returned to the Worker process by returning a string, and the Worker process processes the result through the onFinish callback.

Then based on the above code, asynchronous operation mysql can be realized. Asynchronous operation mysql is more suitable for the following scenarios:

  • Concurrent read and write operations

  • There is no strict relationship in timing

  • Does not affect the main thread logic

Benefits:

  • Improve concurrency

  • Reduce IO consumption

The pressure on the database mainly lies in the number of connections maintained by mysql. If there are 1,000 concurrencies, then mysql needs to establish a corresponding number of connections. With the long connection method, the MySQL connection is maintained in the process, reducing the loss of creating a connection. Multiple task processes can be started through swoole, and a MySQL long connection is maintained in each process. This can also be used to extend the MySQL connection pool technology. It should also be noted that if the mysql server detects that there has been no query for a long time, it will disconnect and recycle resources, so there must be a disconnection and reconnection mechanism.

The following is an example of a simple asynchronous operation mysql:

Still with the above code, we only need to modify the three functions onReceive, onTask and onFinish.

class Server
{
    private $serv;
    public function __construct() {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            'worker_num' => 4,
            'daemonize' => false,
            'task_worker_num' => 8 // task进程数量 即为维持的MySQL连接的数量
        ));
        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));
        $this->serv->on('Task', array($this, 'onTask'));
        $this->serv->on('Finish', array($this, 'onFinish'));
        $this->serv->start();
    }

    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
        echo "收到数据". $data . PHP_EOL;
        // 发送任务到Task进程
        $param = array(
            'sql' => $data, // 接收客户端发送的 sql 
            'fd'  => $fd
        );
        $serv->task( json_encode( $param ) );  // 向 task 投递任务
        echo "继续处理之后的逻辑\n";
    }

    public function onTask($serv, $task_id, $from_id, $data) {
        echo "This Task {$task_id} from Worker {$from_id}\n";
        echo "recv SQL: {$data['sql']}\n";
        static $link = null;
        $sql = $data['sql'];
        $fd  = $data['fd'];
        HELL:
        if ($link == null) {
            $link = @mysqli_connect("127.0.0.1", "root", "root", "test");
        }
        $result = $link->query($sql);
        if (!$result) { //如果查询失败
            if(in_array(mysqli_errno($link), [2013, 2006])){
                //错误码为2013,或者2006,则重连数据库,重新执行sql
                    $link = null;
                    goto HELL;
            }
        }
        if(preg_match("/^select/i", $sql)){//如果是select操作,就返回关联数组
             $data = array();
                while ($fetchResult = mysqli_fetch_assoc($result) ){
                     $data['data'][] = $fetchResult;
                }                
        }else{//否则直接返回结果
            $data['data'] = $result;
        }
        $data['status'] = "OK";
        $data['fd'] = $fd;
        $serv->finish(json_encode($data));
    }
    public function onFinish($serv, $task_id, $data) {
        echo "Task {$task_id} finish\n";
        $result = json_decode($result, true);
        if ($result['status'] == 'OK') {
            $this->serv->send($result['fd'], json_encode($result['data']) . "\n");
        } else {
            $this->serv->send($result['fd'], $result);
        }
    }
    public function onStart( $serv ) {
        echo "Server Start\n";
    }
    public function onConnect( $serv, $fd, $from_id ) {
        echo "Client {$fd} connect\n";
    }
    public function onClose( $serv, $fd, $from_id ) {
        echo "Client {$fd} close connection\n";
    }
}
$server = new Server();

The above code directly receives a sql during onReceive, and then sends it directly to the Task task. At this time, the next step of the process is output immediately, and asynchronousness is also reflected here. Then onTask and onFinish are used to send sql to the database and process the task execution results.

The above is the detailed content of How does php use SwooleTaskWorker to implement asynchronous operation Mysql (code). For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:segmentfault.com. If there is any infringement, please contact admin@php.cn delete