Maison  >  Article  >  développement back-end  >  Comment PHP utilise SwooleTaskWorker pour implémenter le fonctionnement asynchrone Mysql (code)

Comment PHP utilise SwooleTaskWorker pour implémenter le fonctionnement asynchrone Mysql (code)

不言
不言avant
2018-10-17 16:02:112704parcourir

Le contenu de cet article explique comment PHP utilise SwooleTaskWorker pour implémenter le fonctionnement asynchrone de Mysql (code). Il a une certaine valeur de référence. Les amis dans le besoin peuvent s'y référer.

Dans les programmes serveur généraux, certaines tâches prennent beaucoup de temps, telles que l'envoi d'e-mails, l'envoi de diffusions par les serveurs de chat, etc. Si nous utilisons une imperméabilisation à blocage synchrone pour effectuer ces tâches, cela sera certainement très lent.

Le pool de processus TaskWorker de Swoole peut être utilisé pour effectuer certaines tâches asynchrones sans affecter les tâches suivantes, il est donc très approprié pour gérer les scénarios ci-dessus.

Alors, qu'est-ce qu'une tâche asynchrone ?

Vous pouvez obtenir une brève compréhension à partir du diagramme ci-dessous. (Depuis Internet, violation et suppression)

Comment PHP utilise SwooleTaskWorker pour implémenter le fonctionnement asynchrone Mysql (code)

Notre dernier article Swoole présentait comment créer un serveur simple et connaissait quelques cœurs Comment utiliser le fonction de rappel.

Pour implémenter le traitement asynchrone ci-dessus, il vous suffit d'ajouter deux rappels d'événements : onTask et onFinish. Ces deux fonctions de rappel sont utilisées respectivement pour exécuter des tâches de tâche et traiter les résultats de retour des tâches de tâche. De plus, vous devez définir le nombre de processus de tâches dans la méthode set.

Exemple d'utilisation :

class Server
{
    private $serv;
    public function __construct() {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            'worker_num' => 4,
            'daemonize' => false,
            'task_worker_num' => 8
        ));
        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));
        $this->serv->on('Task', array($this, 'onTask'));
        $this->serv->on('Finish', array($this, 'onFinish'));
        $this->serv->start();
    }

    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
        echo "Get Message From Client {$fd}:{$data}\n";
        // 发送任务到Task进程
        $param = array(
            'fd' => $fd
        );
        $serv->task( json_encode( $param ) );
        echo "继续处理之后的逻辑\n";
    }

    public function onTask($serv, $task_id, $from_id, $data) {
        echo "This Task {$task_id} from Worker {$from_id}\n";
        echo "Data: {$data}\n";
        for($i = 0 ; $i send( $fd , "Data in Task {$task_id}");
        return "Task {$task_id}'s result";
    }
    public function onFinish($serv,$task_id, $data) {
        echo "Task {$task_id} finish\n";
        echo "Result: {$data}\n";
    }
    public function onStart( $serv ) {
        echo "Server Start\n";
    }
    public function onConnect( $serv, $fd, $from_id ) {
        echo "Client {$fd} connect\n";
    }
    public function onClose( $serv, $fd, $from_id ) {
        echo "Client {$fd} close connection\n";
    }
}
$server = new Server();

Comme vous pouvez le voir dans l'exemple ci-dessus, pour lancer une tâche asynchrone, il vous suffit d'appeler la méthode de tâche de swoole_server. Après l'envoi, le rappel onTask sera déclenché et différentes tâches de différents processus pourront être traitées via $task_id et $from_id. Enfin, le résultat de l'exécution peut être renvoyé au processus Worker en renvoyant une chaîne, et le processus Worker traite le résultat via le rappel onFinish.

Ensuite, sur la base du code ci-dessus, une opération asynchrone mysql peut être réalisée. L'opération asynchrone mysql est plus adaptée aux scénarios suivants :

  • Opérations de lecture et d'écriture simultanées

  • Aucune relation stricte dans le timing

  • N'affecte pas la logique du thread principal

Avantages :

  • Améliorer la concurrence

  • Réduire la consommation d'IO

La pression sur la base de données réside principalement dans le nombre de connexions maintenues par MySQL. S'il y a 1000 simultanéités, alors MySQL doit établir un nombre correspondant de connexions. relations. Avec la méthode de connexion longue, la connexion MySQL est maintenue dans le processus, réduisant ainsi la perte liée à la création d'une connexion. Plusieurs processus de tâches peuvent être démarrés via swoole, et une longue connexion MySQL est maintenue dans chaque processus. Cela peut également être utilisé pour étendre la technologie du pool de connexions MySQL. Il convient également de noter que si le serveur mysql détecte qu'il n'y a pas eu de requête depuis longtemps, il déconnectera et recyclera les ressources, il doit donc y avoir un mécanisme de déconnexion et de reconnexion.

Ce qui suit est un exemple d'opération asynchrone simple mysql :

Toujours avec le code ci-dessus, il suffit de modifier les trois fonctions onReceive, onTask et onFinish.

class Server
{
    private $serv;
    public function __construct() {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            'worker_num' => 4,
            'daemonize' => false,
            'task_worker_num' => 8 // task进程数量 即为维持的MySQL连接的数量
        ));
        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));
        $this->serv->on('Task', array($this, 'onTask'));
        $this->serv->on('Finish', array($this, 'onFinish'));
        $this->serv->start();
    }

    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
        echo "收到数据". $data . PHP_EOL;
        // 发送任务到Task进程
        $param = array(
            'sql' => $data, // 接收客户端发送的 sql 
            'fd'  => $fd
        );
        $serv->task( json_encode( $param ) );  // 向 task 投递任务
        echo "继续处理之后的逻辑\n";
    }

    public function onTask($serv, $task_id, $from_id, $data) {
        echo "This Task {$task_id} from Worker {$from_id}\n";
        echo "recv SQL: {$data['sql']}\n";
        static $link = null;
        $sql = $data['sql'];
        $fd  = $data['fd'];
        HELL:
        if ($link == null) {
            $link = @mysqli_connect("127.0.0.1", "root", "root", "test");
        }
        $result = $link->query($sql);
        if (!$result) { //如果查询失败
            if(in_array(mysqli_errno($link), [2013, 2006])){
                //错误码为2013,或者2006,则重连数据库,重新执行sql
                    $link = null;
                    goto HELL;
            }
        }
        if(preg_match("/^select/i", $sql)){//如果是select操作,就返回关联数组
             $data = array();
                while ($fetchResult = mysqli_fetch_assoc($result) ){
                     $data['data'][] = $fetchResult;
                }                
        }else{//否则直接返回结果
            $data['data'] = $result;
        }
        $data['status'] = "OK";
        $data['fd'] = $fd;
        $serv->finish(json_encode($data));
    }
    public function onFinish($serv, $task_id, $data) {
        echo "Task {$task_id} finish\n";
        $result = json_decode($result, true);
        if ($result['status'] == 'OK') {
            $this->serv->send($result['fd'], json_encode($result['data']) . "\n");
        } else {
            $this->serv->send($result['fd'], $result);
        }
    }
    public function onStart( $serv ) {
        echo "Server Start\n";
    }
    public function onConnect( $serv, $fd, $from_id ) {
        echo "Client {$fd} connect\n";
    }
    public function onClose( $serv, $fd, $from_id ) {
        echo "Client {$fd} close connection\n";
    }
}
$server = new Server();

Le code ci-dessus reçoit directement un sql lors de onReceive, puis l'envoie directement à la tâche Task. À ce stade, l'étape suivante du processus est immédiatement générée et l'asynchronisme se reflète également ici. Ensuite, onTask et onFinish sont utilisés pour envoyer SQL à la base de données et traiter les résultats de l'exécution de la tâche.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer