>  기사  >  백엔드 개발  >  PHP는 SwooleTaskWorker를 사용하여 비동기 작업 Mysql을 구현하는 방법(코드)

PHP는 SwooleTaskWorker를 사용하여 비동기 작업 Mysql을 구현하는 방법(코드)

不言
不言앞으로
2018-10-17 16:02:112712검색

이 글의 내용은 PHP가 SwooleTaskWorker를 사용하여 Mysql의 비동기 작업(코드)을 구현하는 방법에 대한 내용입니다. 필요한 참고 자료가 있으면 도움이 될 것입니다.

일반 서버 프로그램에는 이메일 보내기, 채팅 서버 방송 보내기 등 시간이 많이 걸리는 작업이 있습니다. 이러한 작업을 수행하기 위해 동기식 차단 방수를 사용하면 확실히 속도가 매우 느려질 것입니다.

Swoole의 TaskWorker 프로세스 풀을 사용하면 후속 작업에 영향을 주지 않고 일부 비동기 작업을 수행할 수 있으며 이는 위의 시나리오를 처리하는 데 매우 적합합니다.

그럼 비동기 작업이란 무엇일까요?

아래 도표를 통해 간략하게 이해하실 수 있습니다. (인터넷에서 삭제됨)

PHP는 SwooleTaskWorker를 사용하여 비동기 작업 Mysql을 구현하는 방법(코드)

저번 Swoole 기사에서는 간단한 서버를 만드는 방법과 여러 핵심 콜백 기능을 사용하는 방법을 소개했습니다.

위의 비동기 처리를 구현하려면 onTask와 onFinish라는 두 개의 이벤트 콜백만 추가하면 됩니다. 이 두 콜백 함수는 각각 Task 작업을 실행하고 Task 작업의 반환 결과를 처리하는 데 사용됩니다. 또한 set 메소드에서 작업 프로세스 수를 설정해야 합니다.

사용 예시:

class Server
{
    private $serv;
    public function __construct() {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            'worker_num' => 4,
            'daemonize' => false,
            'task_worker_num' => 8
        ));
        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));
        $this->serv->on('Task', array($this, 'onTask'));
        $this->serv->on('Finish', array($this, 'onFinish'));
        $this->serv->start();
    }

    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
        echo "Get Message From Client {$fd}:{$data}\n";
        // 发送任务到Task进程
        $param = array(
            'fd' => $fd
        );
        $serv->task( json_encode( $param ) );
        echo "继续处理之后的逻辑\n";
    }

    public function onTask($serv, $task_id, $from_id, $data) {
        echo "This Task {$task_id} from Worker {$from_id}\n";
        echo "Data: {$data}\n";
        for($i = 0 ; $i send( $fd , "Data in Task {$task_id}");
        return "Task {$task_id}'s result";
    }
    public function onFinish($serv,$task_id, $data) {
        echo "Task {$task_id} finish\n";
        echo "Result: {$data}\n";
    }
    public function onStart( $serv ) {
        echo "Server Start\n";
    }
    public function onConnect( $serv, $fd, $from_id ) {
        echo "Client {$fd} connect\n";
    }
    public function onClose( $serv, $fd, $from_id ) {
        echo "Client {$fd} close connection\n";
    }
}
$server = new Server();

위 예시에서 볼 수 있듯이 비동기 작업을 시작하려면 swoole_server의 작업 메서드만 호출하면 됩니다. 전송 후 onTask 콜백이 트리거되고 $task_id 및 $from_id를 통해 다양한 프로세스의 다양한 작업을 처리할 수 있습니다. 마지막으로 실행 결과는 문자열을 반환하여 Worker 프로세스에 반환할 수 있으며, Worker 프로세스는 onFinish 콜백을 통해 결과를 처리합니다.

그러면 위의 코드를 기반으로 비동기 작업 mysql을 구현할 수 있습니다. 비동기 작업 mysql은 다음 시나리오에 더 적합합니다.

  • 동시 읽기 및 쓰기 작업

  • 타이밍에 엄격한 관계 없음

  • 메인 스레드 로직에 영향을 주지 않음

이점:

  • 동시성 향상

  • IO 소비 감소

데이터베이스에 대한 부담은 주로 mysql이 유지하는 연결 수에 있습니다. 동시성이 1000개인 경우 mysql은 해당 수의 연결을 설정해야 합니다. 긴 연결 방식을 사용하면 프로세스에서 MySQL 연결이 항상 유지되므로 연결 생성 손실이 줄어듭니다. Swoole을 통해 여러 작업 프로세스를 시작할 수 있으며 각 프로세스에서 MySQL 긴 연결이 유지됩니다. 이는 MySQL 연결 풀 기술을 확장하는 데에도 사용할 수 있습니다. 또한 mysql 서버가 오랫동안 쿼리가 없는 것을 감지하면 리소스 연결을 끊고 재활용하므로 연결 끊기 및 다시 연결 메커니즘이 있어야 합니다.

다음은 mysql의 간단한 비동기 작업의 예입니다.

여전히 위 코드를 사용하여 onReceive, onTask 및 onFinish 세 가지 함수만 수정하면 됩니다.

class Server
{
    private $serv;
    public function __construct() {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            'worker_num' => 4,
            'daemonize' => false,
            'task_worker_num' => 8 // task进程数量 即为维持的MySQL连接的数量
        ));
        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));
        $this->serv->on('Task', array($this, 'onTask'));
        $this->serv->on('Finish', array($this, 'onFinish'));
        $this->serv->start();
    }

    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
        echo "收到数据". $data . PHP_EOL;
        // 发送任务到Task进程
        $param = array(
            'sql' => $data, // 接收客户端发送的 sql 
            'fd'  => $fd
        );
        $serv->task( json_encode( $param ) );  // 向 task 投递任务
        echo "继续处理之后的逻辑\n";
    }

    public function onTask($serv, $task_id, $from_id, $data) {
        echo "This Task {$task_id} from Worker {$from_id}\n";
        echo "recv SQL: {$data['sql']}\n";
        static $link = null;
        $sql = $data['sql'];
        $fd  = $data['fd'];
        HELL:
        if ($link == null) {
            $link = @mysqli_connect("127.0.0.1", "root", "root", "test");
        }
        $result = $link->query($sql);
        if (!$result) { //如果查询失败
            if(in_array(mysqli_errno($link), [2013, 2006])){
                //错误码为2013,或者2006,则重连数据库,重新执行sql
                    $link = null;
                    goto HELL;
            }
        }
        if(preg_match("/^select/i", $sql)){//如果是select操作,就返回关联数组
             $data = array();
                while ($fetchResult = mysqli_fetch_assoc($result) ){
                     $data['data'][] = $fetchResult;
                }                
        }else{//否则直接返回结果
            $data['data'] = $result;
        }
        $data['status'] = "OK";
        $data['fd'] = $fd;
        $serv->finish(json_encode($data));
    }
    public function onFinish($serv, $task_id, $data) {
        echo "Task {$task_id} finish\n";
        $result = json_decode($result, true);
        if ($result['status'] == 'OK') {
            $this->serv->send($result['fd'], json_encode($result['data']) . "\n");
        } else {
            $this->serv->send($result['fd'], $result);
        }
    }
    public function onStart( $serv ) {
        echo "Server Start\n";
    }
    public function onConnect( $serv, $fd, $from_id ) {
        echo "Client {$fd} connect\n";
    }
    public function onClose( $serv, $fd, $from_id ) {
        echo "Client {$fd} close connection\n";
    }
}
$server = new Server();

위 코드는 onReceive 중에 sql을 직접 받은 후 Task 태스크로 바로 보내는 코드입니다. 이때 프로세스의 다음 단계가 즉시 출력되며 여기에도 비동기성이 반영됩니다. 그런 다음 onTask 및 onFinish를 사용하여 sql을 데이터베이스로 보내고 작업 실행 결과를 처리합니다.

위 내용은 PHP는 SwooleTaskWorker를 사용하여 비동기 작업 Mysql을 구현하는 방법(코드)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 segmentfault.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제