Heim  >  Artikel  >  Backend-Entwicklung  >  Wie verwendet PHP SwooleTaskWorker, um den asynchronen Vorgang Mysql (Code) zu implementieren?

Wie verwendet PHP SwooleTaskWorker, um den asynchronen Vorgang Mysql (Code) zu implementieren?

不言
不言nach vorne
2018-10-17 16:02:112668Durchsuche

Der Inhalt dieses Artikels handelt davon, wie PHP SwooleTaskWorker verwendet, um den asynchronen Betrieb von MySQL (Code) zu implementieren. Ich hoffe, dass er für Sie hilfreich ist.

In allgemeinen Serverprogrammen gibt es einige zeitaufwändige Aufgaben, wie z. B. das Versenden von E-Mails, Chat-Server, die Broadcasts senden usw. Wenn wir zur Ausführung dieser Aufgaben eine synchron blockierende Abdichtung verwenden, wird dies auf jeden Fall sehr langsam sein.

Der TaskWorker-Prozesspool von Swoole kann zum Ausführen einiger asynchroner Aufgaben verwendet werden, ohne dass sich dies auf nachfolgende Aufgaben auswirkt. Daher eignet er sich sehr gut für die Bewältigung der oben genannten Szenarien.

Was ist also eine asynchrone Aufgabe?

Sie können sich anhand des folgenden Diagramms einen kurzen Überblick verschaffen. (Aus dem Internet, Verletzung und Löschung)

Wie verwendet PHP SwooleTaskWorker, um den asynchronen Vorgang Mysql (Code) zu implementieren?

In unserem letzten Swoole-Artikel wurde vorgestellt, wie man einen einfachen Server erstellt, und wir wussten, wie man ein paar Kerne verwendet Rückruffunktion.

Um die oben genannte asynchrone Verarbeitung zu implementieren, müssen Sie nur zwei Ereignisrückrufe hinzufügen: onTask und onFinish. Diese beiden Rückruffunktionen werden zum Ausführen von Task-Aufgaben bzw. zum Verarbeiten der Rückgabeergebnisse von Task-Aufgaben verwendet. Darüber hinaus müssen Sie die Anzahl der Aufgabenprozesse in der Set-Methode festlegen.

Verwendungsbeispiel:

class Server
{
    private $serv;
    public function __construct() {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            'worker_num' => 4,
            'daemonize' => false,
            'task_worker_num' => 8
        ));
        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));
        $this->serv->on('Task', array($this, 'onTask'));
        $this->serv->on('Finish', array($this, 'onFinish'));
        $this->serv->start();
    }

    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
        echo "Get Message From Client {$fd}:{$data}\n";
        // 发送任务到Task进程
        $param = array(
            'fd' => $fd
        );
        $serv->task( json_encode( $param ) );
        echo "继续处理之后的逻辑\n";
    }

    public function onTask($serv, $task_id, $from_id, $data) {
        echo "This Task {$task_id} from Worker {$from_id}\n";
        echo "Data: {$data}\n";
        for($i = 0 ; $i send( $fd , "Data in Task {$task_id}");
        return "Task {$task_id}'s result";
    }
    public function onFinish($serv,$task_id, $data) {
        echo "Task {$task_id} finish\n";
        echo "Result: {$data}\n";
    }
    public function onStart( $serv ) {
        echo "Server Start\n";
    }
    public function onConnect( $serv, $fd, $from_id ) {
        echo "Client {$fd} connect\n";
    }
    public function onClose( $serv, $fd, $from_id ) {
        echo "Client {$fd} close connection\n";
    }
}
$server = new Server();

Wie Sie dem obigen Beispiel entnehmen können, müssen Sie zum Initiieren einer asynchronen Aufgabe nur die Task-Methode von swoole_server aufrufen. Nach dem Senden wird der onTask-Rückruf ausgelöst und verschiedene Aufgaben verschiedener Prozesse können über $task_id und $from_id verarbeitet werden. Schließlich kann das Ausführungsergebnis durch die Rückgabe einer Zeichenfolge an den Worker-Prozess zurückgegeben werden, und der Worker-Prozess verarbeitet das Ergebnis über den onFinish-Rückruf.

Dann kann basierend auf dem obigen Code der asynchrone Betrieb von MySQL realisiert werden. Der asynchrone Betrieb von MySQL eignet sich besser für die folgenden Szenarien:

  • Gleichzeitige Lese- und Schreibvorgänge

  • Keine strikte zeitliche Beziehung

  • Beeinflusst nicht die Haupt-Thread-Logik

Vorteile:

  • Verbesserung der Parallelität

  • IO-Verbrauch reduzieren

Der Druck auf die Datenbank liegt hauptsächlich in der Anzahl der von MySQL verwalteten Verbindungen. Wenn es 1000 Parallelen gibt, muss MySQL eine entsprechende Anzahl herstellen Verbindungen. Bei der langen Verbindungsmethode bleibt die MySQL-Verbindung dabei erhalten, wodurch der Verlust beim Erstellen einer Verbindung verringert wird. Über swoole können mehrere Task-Prozesse gestartet werden, und in jedem Prozess wird eine MySQL-Long-Verbindung aufrechterhalten. Dies kann auch zur Erweiterung der MySQL-Verbindungspool-Technologie verwendet werden. Es ist auch zu beachten, dass der MySQL-Server, wenn er feststellt, dass über einen längeren Zeitraum keine Abfrage erfolgt ist, die Ressourcen trennt und wiederverwendet. Daher muss ein Mechanismus zum Trennen und erneuten Verbinden vorhanden sein.

Das Folgende ist ein Beispiel für eine einfache asynchrone Operation auf MySQL:

Mit dem obigen Code müssen wir nur die drei Funktionen onReceive, onTask und onFinish ändern.

class Server
{
    private $serv;
    public function __construct() {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            'worker_num' => 4,
            'daemonize' => false,
            'task_worker_num' => 8 // task进程数量 即为维持的MySQL连接的数量
        ));
        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));
        $this->serv->on('Task', array($this, 'onTask'));
        $this->serv->on('Finish', array($this, 'onFinish'));
        $this->serv->start();
    }

    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
        echo "收到数据". $data . PHP_EOL;
        // 发送任务到Task进程
        $param = array(
            'sql' => $data, // 接收客户端发送的 sql 
            'fd'  => $fd
        );
        $serv->task( json_encode( $param ) );  // 向 task 投递任务
        echo "继续处理之后的逻辑\n";
    }

    public function onTask($serv, $task_id, $from_id, $data) {
        echo "This Task {$task_id} from Worker {$from_id}\n";
        echo "recv SQL: {$data['sql']}\n";
        static $link = null;
        $sql = $data['sql'];
        $fd  = $data['fd'];
        HELL:
        if ($link == null) {
            $link = @mysqli_connect("127.0.0.1", "root", "root", "test");
        }
        $result = $link->query($sql);
        if (!$result) { //如果查询失败
            if(in_array(mysqli_errno($link), [2013, 2006])){
                //错误码为2013,或者2006,则重连数据库,重新执行sql
                    $link = null;
                    goto HELL;
            }
        }
        if(preg_match("/^select/i", $sql)){//如果是select操作,就返回关联数组
             $data = array();
                while ($fetchResult = mysqli_fetch_assoc($result) ){
                     $data['data'][] = $fetchResult;
                }                
        }else{//否则直接返回结果
            $data['data'] = $result;
        }
        $data['status'] = "OK";
        $data['fd'] = $fd;
        $serv->finish(json_encode($data));
    }
    public function onFinish($serv, $task_id, $data) {
        echo "Task {$task_id} finish\n";
        $result = json_decode($result, true);
        if ($result['status'] == 'OK') {
            $this->serv->send($result['fd'], json_encode($result['data']) . "\n");
        } else {
            $this->serv->send($result['fd'], $result);
        }
    }
    public function onStart( $serv ) {
        echo "Server Start\n";
    }
    public function onConnect( $serv, $fd, $from_id ) {
        echo "Client {$fd} connect\n";
    }
    public function onClose( $serv, $fd, $from_id ) {
        echo "Client {$fd} close connection\n";
    }
}
$server = new Server();

Der obige Code empfängt während onReceive direkt eine SQL und sendet sie dann direkt an die Task-Aufgabe. Zu diesem Zeitpunkt wird der nächste Schritt des Prozesses sofort ausgegeben, und auch die Asynchronität spiegelt sich hier wider. Dann werden onTask und onFinish verwendet, um SQL an die Datenbank zu senden und die Ergebnisse der Aufgabenausführung zu verarbeiten.

Das obige ist der detaillierte Inhalt vonWie verwendet PHP SwooleTaskWorker, um den asynchronen Vorgang Mysql (Code) zu implementieren?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:segmentfault.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen