search

Home  >  Q&A  >  body text

Concurrent queue processing system based on Swoole and Redis

Since PHP does not support multi-threading, as a complete system, many operations need to be completed asynchronously. In order to complete these asynchronous operations, we built a Redis queue task system.

As we all know, a message queue processing system is mainly divided into two parts: consumers and producers.

In our system, the main system acts as the producer and the task system acts as the consumer.

The specific workflow is as follows: 1. The main system pushes the name of the task that needs to be processed and the task parameters into the queue. 2. The task system pops the task queue in real time. When a task pops out, it forks a sub-process, and the sub-process completes the specific task logic.

/**
 * 启动守护进程
 */
public function runAction() {
    Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');
    while (true) {
        $this->fork_process();
    }
    exit;
}
 
/**
 * 创建子进程
 */
private function fork_process() {
    $ppid = getmypid();
    $pid = pcntl_fork();
    if ($pid == 0) {//子进程
        $pid = posix_getpid();
        //echo "* Process {$pid} was created \n\n";
        $this->mq_process();
        exit;
    } else {//主进程
        $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态
        if (pcntl_wifexited($status)) {
            //echo "\n\n* Sub process: {$pid} exited with {$status}";
            //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );
        } else {
            Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');
        }
    }
}
 
/**
 * 业务任务队列处理
 */
private function mq_process() {
    $data_pop = $this->masterRedis->rPop($this->redis_list_key);
    $data = json_decode($data_pop, 1);
    if (!$data) {
        return FALSE;
    }
    $worker = '_task_' . $data['worker'];
    $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';
    $params = $data['params'];
    $class = new $class_name();
    $class->$worker($params);
    return TRUE;
}


PHP中高级教程分享PHP中高级教程分享2256 days ago1547

reply all(2)I'll reply

No reply
  • Cancelreply