首頁  >  問答  >  主體

基於Swoole和Redis實現的並發佇列處理系統

由於PHP不支援多線程,但是作為一個完善的系統,有很多操作都是需要非同步完成的。為了完成這些非同步操作,我們做了一個基於Redis隊列任務系統。

大家知道,一個訊息佇列處理系統主要分成兩大部分:消費者和製造者。

在我們的系統中,主系統作為製造者,任務系統作為消費者。

具體的工作流程如下: 1、主系統將需要需要處理的任務名稱 任務參數push到佇列中。 2.任務系統即時的對任務隊列進行pop,pop出來一個任務就fork一個子進程,由子進程完成具體的任務邏輯。

/**
 * 启动守护进程
 */
public function runAction() {
    Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');
    while (true) {
        $this->fork_process();
    }
    exit;
}
 
/**
 * 创建子进程
 */
private function fork_process() {
    $ppid = getmypid();
    $pid = pcntl_fork();
    if ($pid == 0) {//子进程
        $pid = posix_getpid();
        //echo "* Process {$pid} was created \n\n";
        $this->mq_process();
        exit;
    } else {//主进程
        $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态
        if (pcntl_wifexited($status)) {
            //echo "\n\n* Sub process: {$pid} exited with {$status}";
            //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );
        } else {
            Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');
        }
    }
}
 
/**
 * 业务任务队列处理
 */
private function mq_process() {
    $data_pop = $this->masterRedis->rPop($this->redis_list_key);
    $data = json_decode($data_pop, 1);
    if (!$data) {
        return FALSE;
    }
    $worker = '_task_' . $data['worker'];
    $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';
    $params = $data['params'];
    $class = new $class_name();
    $class->$worker($params);
    return TRUE;
}


#
PHP中高级教程分享PHP中高级教程分享2187 天前1492

全部回覆(2)我來回復

無回覆
  • 取消回覆