搜索
首页后端开发php教程基于Swoole和Redis实现的并发队列处理系统

背景

由于PHP不支持多线程,但是作为一个完善的系统,有很多操作都是需要异步完成的。为了完成这些异步操作,我们做了一个基于Redis队列任务系统。

大家知道,一个消息队列处理系统主要分为两大部分:消费者和生产者。

在我们的系统中,主系统作为生产者,任务系统作为消费者。

具体的工作流程如下:1、主系统将需要需要处理的任务名称+任务参数push到队列中。2、任务系统实时的对任务队列进行pop,pop出来一个任务就fork一个子进程,由子进程完成具体的任务逻辑。

具体代码如下:

/** * 启动守护进程 */public function runAction() {    Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');    while (true) {        $this->fork_process();    }    exit;}/** * 创建子进程 */private function fork_process() {    $ppid = getmypid();    $pid = pcntl_fork();    if ($pid == 0) {//子进程        $pid = posix_getpid();        //echo "* Process {$pid} was created \n\n";        $this->mq_process();        exit;    } else {//主进程        $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态        if (pcntl_wifexited($status)) {            //echo "\n\n* Sub process: {$pid} exited with {$status}";            //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );        } else {            Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');        }    }}/** * 业务任务队列处理 */private function mq_process() {    $data_pop = $this->masterRedis->rPop($this->redis_list_key);    $data = json_decode($data_pop, 1);    if (!$data) {        return FALSE;    }    $worker = '_task_' . $data['worker'];    $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';    $params = $data['params'];    $class = new $class_name();    $class->$worker($params);    return TRUE;}

这是一个简单的任务处理系统。

通过这个任务系统帮助我们实现了异步,到目前为止已经稳定运行了将近一年。

但很可惜,它是一个单进程的系统。它是一直在不断的fork,如果有任务就处理,没有任务就跳过。

这样很稳定。

但问题有两个:一是不断地fork、pop会浪费服务器资源,二是不支持并发!

第一个问题还好,但第二个问题就很严重。

当主系统 同时 抛过来大量的任务时,任务的处理时间就会无限的拉长。

新的设计

为了解决并发的问题,我们计划做一个更加高效强壮的队里处理系统。

因为在PHP7之前不支持多线程,所以我们采用多进程。

从网上找了不少资料,大多所谓的多进程都是N个进程同时在后台运行。

显然这是不合适的。

我的预想是:每pop出一个任务就fork一个任务,任务执行完成后子进程结束。

遇到的问题


1、如何控制最大进程数

这个问题很简单,那就是每fork一个子进程就自增一次。而当子进程执行完成就自减一次。

自增没有问题,我们就在主进程中操作就完了。那么该如何自减呢?

可能你会说,当然是在子进程中啊。但这里你需要注意:当fork的时候是从主进程复制了一份资源给子进程,这就意味着你无法在子进程中操作主进程中的计数器!

所以,这里就需要了解一个知识点:信号。

具体的可以自行Google,这里直接看代码。

// install signal handler for dead kidspcntl_signal(SIGCHLD, array($this, "sig_handler"));

这就安装了一个信号处理器。当然还缺少一点。

declare(ticks = 1);

declare是一个控制结构语句,具体的用法也请去Google。

这句代码的意思就是每执行一条低级语句就调用一次信号处理器。

这样,每当子进程结束的时候就会调用信号处理器,我们就可以在信号处理器中进行自减。

2、如何解决进程残留

在多进程开发中,如果处理不当就会导致进程残留。

为了解决进程残留,必须得将子进程回收。

那么如何对子进程进行回收就是一个技术点了。

在pcntl的demo中,包括很多博文中都是说在主进程中回收子进程。

但我们是基于Redis的brpop的,而brpop是阻塞的。

这就导致一个问题:当执行N个任务之后,任务系统空闲的时候主进程是阻塞的,而在发生阻塞的时候子进程还在执行,所以就无法完成最后几个子进程的进程回收。。。

这里本来一直很纠结,但当我将信号处理器搞定之后就也很简单了。

进程回收也放到信号处理器中去。

新系统的评估

pcntl是一个进程处理的扩展,但很可惜它对多进程的支持非常乏力。

所以这里采用Swoole扩展中的Process。

具体代码如下:

declare(ticks = 1);class JobDaemonController extends Yaf_Controller_Abstract{    use Trait_Redis;    private $maxProcesses = 800;    private $child;    private $masterRedis;    private $redis_task_wing = 'task:wing'; //待处理队列    public function init(){        // install signal handler for dead kids        pcntl_signal(SIGCHLD, array($this, "sig_handler"));        set_time_limit(0);        ini_set('default_socket_timeout', -1); //队列处理不超时,解决redis报错:read error on connection    }    private function redis_client(){        $rds = new Redis();        $rds->connect('redis.master.host',6379);        return $rds;    }    public function process(swoole_process $worker){// 第一个处理        $GLOBALS['worker'] = $worker;        swoole_event_add($worker->pipe, function($pipe) {            $worker = $GLOBALS['worker'];            $recv = $worker->read();            //send data to master            sleep(rand(1, 3));            echo "From Master: $recv\n";            $worker->exit(0);        });        exit;    }    public function testAction(){        for ($i = 0; $i < 10000; $i++){            $data = [                'abc' => $i,                'timestamp' => time().rand(100,999)            ];            $this->masterRedis->lpush($this->redis_task_wing, json_encode($data));        }        exit;    }    public function runAction(){        while (1){//            echo "\t now we de have $this->child child processes\n";            if ($this->child < $this->maxProcesses){                $rds = $this->redis_client();                $data_pop = $rds->brpop($this->redis_task_wing, 3);//无任务时,阻塞等待                if (!$data_pop){                    continue;                }                echo "\t Starting new child | now we de have $this->child child processes\n";                $this->child++;                $process = new swoole_process([$this, 'process']);                $process->write(json_encode($data_pop));                $pid = $process->start();            }        }    }    private function sig_handler($signo) {//        echo "Recive: $signo \r\n";        switch ($signo) {            case SIGCHLD:                while($ret = swoole_process::wait(false)) {//                    echo "PID={$ret['pid']}\n";                    $this->child--;                }        }    }}

最终,经过测试,单核1G的服务器执行1到3秒的任务可以做到800的并发。

ps:欢迎各位大神与我交流,不知能否做到更好~


我是闫大伯,一只奋战了两个周末的野生程序猿

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
PHP与Python:了解差异PHP与Python:了解差异Apr 11, 2025 am 12:15 AM

PHP和Python各有优势,选择应基于项目需求。1.PHP适合web开发,语法简单,执行效率高。2.Python适用于数据科学和机器学习,语法简洁,库丰富。

php:死亡还是简单地适应?php:死亡还是简单地适应?Apr 11, 2025 am 12:13 AM

PHP不是在消亡,而是在不断适应和进化。1)PHP从1994年起经历多次版本迭代,适应新技术趋势。2)目前广泛应用于电子商务、内容管理系统等领域。3)PHP8引入JIT编译器等功能,提升性能和现代化。4)使用OPcache和遵循PSR-12标准可优化性能和代码质量。

PHP的未来:改编和创新PHP的未来:改编和创新Apr 11, 2025 am 12:01 AM

PHP的未来将通过适应新技术趋势和引入创新特性来实现:1)适应云计算、容器化和微服务架构,支持Docker和Kubernetes;2)引入JIT编译器和枚举类型,提升性能和数据处理效率;3)持续优化性能和推广最佳实践。

您什么时候使用特质与PHP中的抽象类或接口?您什么时候使用特质与PHP中的抽象类或接口?Apr 10, 2025 am 09:39 AM

在PHP中,trait适用于需要方法复用但不适合使用继承的情况。1)trait允许在类中复用方法,避免多重继承复杂性。2)使用trait时需注意方法冲突,可通过insteadof和as关键字解决。3)应避免过度使用trait,保持其单一职责,以优化性能和提高代码可维护性。

什么是依赖性注入容器(DIC),为什么在PHP中使用一个?什么是依赖性注入容器(DIC),为什么在PHP中使用一个?Apr 10, 2025 am 09:38 AM

依赖注入容器(DIC)是一种管理和提供对象依赖关系的工具,用于PHP项目中。DIC的主要好处包括:1.解耦,使组件独立,代码易维护和测试;2.灵活性,易替换或修改依赖关系;3.可测试性,方便注入mock对象进行单元测试。

与常规PHP阵列相比,解释SPL SplfixedArray及其性能特征。与常规PHP阵列相比,解释SPL SplfixedArray及其性能特征。Apr 10, 2025 am 09:37 AM

SplFixedArray在PHP中是一种固定大小的数组,适用于需要高性能和低内存使用量的场景。1)它在创建时需指定大小,避免动态调整带来的开销。2)基于C语言数组,直接操作内存,访问速度快。3)适合大规模数据处理和内存敏感环境,但需谨慎使用,因其大小固定。

PHP如何安全地上载文件?PHP如何安全地上载文件?Apr 10, 2025 am 09:37 AM

PHP通过$\_FILES变量处理文件上传,确保安全性的方法包括:1.检查上传错误,2.验证文件类型和大小,3.防止文件覆盖,4.移动文件到永久存储位置。

什么是无效的合并操作员(??)和无效分配运算符(?? =)?什么是无效的合并操作员(??)和无效分配运算符(?? =)?Apr 10, 2025 am 09:33 AM

JavaScript中处理空值可以使用NullCoalescingOperator(??)和NullCoalescingAssignmentOperator(??=)。1.??返回第一个非null或非undefined的操作数。2.??=将变量赋值为右操作数的值,但前提是该变量为null或undefined。这些操作符简化了代码逻辑,提高了可读性和性能。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前By尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
3 周前By尊渡假赌尊渡假赌尊渡假赌

热工具

螳螂BT

螳螂BT

Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

功能强大的PHP集成开发环境

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

PhpStorm Mac 版本

PhpStorm Mac 版本

最新(2018.2.1 )专业的PHP集成开发工具

SecLists

SecLists

SecLists是最终安全测试人员的伙伴。它是一个包含各种类型列表的集合,这些列表在安全评估过程中经常使用,都在一个地方。SecLists通过方便地提供安全测试人员可能需要的所有列表,帮助提高安全测试的效率和生产力。列表类型包括用户名、密码、URL、模糊测试有效载荷、敏感数据模式、Web shell等等。测试人员只需将此存储库拉到新的测试机上,他就可以访问到所需的每种类型的列表。