Maison  >  Questions et réponses  >  le corps du texte

Système de traitement de file d'attente simultané basé sur Swoole et Redis

Étant donné que PHP ne prend pas en charge le multi-threading, en tant que système complet, de nombreuses opérations doivent être effectuées de manière asynchrone. Afin de réaliser ces opérations asynchrones, nous avons construit un système de tâches de file d'attente Redis.

Comme nous le savons tous, un système de traitement de file d'attente de messages est principalement divisé en deux parties : les consommateurs et les producteurs.

Dans notre système, le système principal agit en tant que producteur et le système de tâches agit en tant que consommateur.

Le flux de travail spécifique est le suivant : 1. Le système principal pousse le nom de la tâche + les paramètres de la tâche qui doivent être traités dans la file d'attente. 2. Le système de tâches affiche la file d'attente des tâches en temps réel. Lorsqu'une tâche apparaît, il lance un sous-processus et le sous-processus termine la logique de tâche spécifique.

/**
 * 启动守护进程
 */
public function runAction() {
    Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');
    while (true) {
        $this->fork_process();
    }
    exit;
}
 
/**
 * 创建子进程
 */
private function fork_process() {
    $ppid = getmypid();
    $pid = pcntl_fork();
    if ($pid == 0) {//子进程
        $pid = posix_getpid();
        //echo "* Process {$pid} was created \n\n";
        $this->mq_process();
        exit;
    } else {//主进程
        $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态
        if (pcntl_wifexited($status)) {
            //echo "\n\n* Sub process: {$pid} exited with {$status}";
            //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );
        } else {
            Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');
        }
    }
}
 
/**
 * 业务任务队列处理
 */
private function mq_process() {
    $data_pop = $this->masterRedis->rPop($this->redis_list_key);
    $data = json_decode($data_pop, 1);
    if (!$data) {
        return FALSE;
    }
    $worker = '_task_' . $data['worker'];
    $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';
    $params = $data['params'];
    $class = new $class_name();
    $class->$worker($params);
    return TRUE;
}


PHP中高级教程分享PHP中高级教程分享2136 Il y a quelques jours1438

répondre à tous(2)je répondrai

Pas de réponse
  • Annulerrépondre