背景
由于PHP不支持多线程,但是作为一个完善的系统,有很多操作都是需要异步完成的。为了完成这些异步操作,我们做了一个基于Redis队列任务系统。
大家知道,一个消息队列处理系统主要分为两大部分:消费者和生产者。
在我们的系统中,主系统作为生产者,任务系统作为消费者。
具体的工作流程如下:1、主系统将需要需要处理的任务名称+任务参数push到队列中。2、任务系统实时的对任务队列进行pop,pop出来一个任务就fork一个子进程,由子进程完成具体的任务逻辑。
具体代码如下:
/** * 启动守护进程 */public function runAction() { Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-'); while (true) { $this->fork_process(); } exit;}/** * 创建子进程 */private function fork_process() { $ppid = getmypid(); $pid = pcntl_fork(); if ($pid == 0) {//子进程 $pid = posix_getpid(); //echo "* Process {$pid} was created \n\n"; $this->mq_process(); exit; } else {//主进程 $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态 if (pcntl_wifexited($status)) { //echo "\n\n* Sub process: {$pid} exited with {$status}"; //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid ); } else { Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-'); } }}/** * 业务任务队列处理 */private function mq_process() { $data_pop = $this->masterRedis->rPop($this->redis_list_key); $data = json_decode($data_pop, 1); if (!$data) { return FALSE; } $worker = '_task_' . $data['worker']; $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel'; $params = $data['params']; $class = new $class_name(); $class->$worker($params); return TRUE;}
这是一个简单的任务处理系统。
通过这个任务系统帮助我们实现了异步,到目前为止已经稳定运行了将近一年。
但很可惜,它是一个单进程的系统。它是一直在不断的fork,如果有任务就处理,没有任务就跳过。
这样很稳定。
但问题有两个:一是不断地fork、pop会浪费服务器资源,二是不支持并发!
第一个问题还好,但第二个问题就很严重。
当主系统 同时 抛过来大量的任务时,任务的处理时间就会无限的拉长。
新的设计
为了解决并发的问题,我们计划做一个更加高效强壮的队里处理系统。
因为在PHP7之前不支持多线程,所以我们采用多进程。
从网上找了不少资料,大多所谓的多进程都是N个进程同时在后台运行。
显然这是不合适的。
我的预想是:每pop出一个任务就fork一个任务,任务执行完成后子进程结束。
遇到的问题
1、如何控制最大进程数
这个问题很简单,那就是每fork一个子进程就自增一次。而当子进程执行完成就自减一次。
自增没有问题,我们就在主进程中操作就完了。那么该如何自减呢?
可能你会说,当然是在子进程中啊。但这里你需要注意:当fork的时候是从主进程复制了一份资源给子进程,这就意味着你无法在子进程中操作主进程中的计数器!
所以,这里就需要了解一个知识点:信号。
具体的可以自行Google,这里直接看代码。
// install signal handler for dead kidspcntl_signal(SIGCHLD, array($this, "sig_handler"));
这就安装了一个信号处理器。当然还缺少一点。
declare(ticks = 1);
declare是一个控制结构语句,具体的用法也请去Google。
这句代码的意思就是每执行一条低级语句就调用一次信号处理器。
这样,每当子进程结束的时候就会调用信号处理器,我们就可以在信号处理器中进行自减。
2、如何解决进程残留
在多进程开发中,如果处理不当就会导致进程残留。
为了解决进程残留,必须得将子进程回收。
那么如何对子进程进行回收就是一个技术点了。
在pcntl的demo中,包括很多博文中都是说在主进程中回收子进程。
但我们是基于Redis的brpop的,而brpop是阻塞的。
这就导致一个问题:当执行N个任务之后,任务系统空闲的时候主进程是阻塞的,而在发生阻塞的时候子进程还在执行,所以就无法完成最后几个子进程的进程回收。。。
这里本来一直很纠结,但当我将信号处理器搞定之后就也很简单了。
进程回收也放到信号处理器中去。
新系统的评估
pcntl是一个进程处理的扩展,但很可惜它对多进程的支持非常乏力。
所以这里采用Swoole扩展中的Process。
具体代码如下:
declare(ticks = 1);class JobDaemonController extends Yaf_Controller_Abstract{ use Trait_Redis; private $maxProcesses = 800; private $child; private $masterRedis; private $redis_task_wing = 'task:wing'; //待处理队列 public function init(){ // install signal handler for dead kids pcntl_signal(SIGCHLD, array($this, "sig_handler")); set_time_limit(0); ini_set('default_socket_timeout', -1); //队列处理不超时,解决redis报错:read error on connection } private function redis_client(){ $rds = new Redis(); $rds->connect('redis.master.host',6379); return $rds; } public function process(swoole_process $worker){// 第一个处理 $GLOBALS['worker'] = $worker; swoole_event_add($worker->pipe, function($pipe) { $worker = $GLOBALS['worker']; $recv = $worker->read(); //send data to master sleep(rand(1, 3)); echo "From Master: $recv\n"; $worker->exit(0); }); exit; } public function testAction(){ for ($i = 0; $i < 10000; $i++){ $data = [ 'abc' => $i, 'timestamp' => time().rand(100,999) ]; $this->masterRedis->lpush($this->redis_task_wing, json_encode($data)); } exit; } public function runAction(){ while (1){// echo "\t now we de have $this->child child processes\n"; if ($this->child < $this->maxProcesses){ $rds = $this->redis_client(); $data_pop = $rds->brpop($this->redis_task_wing, 3);//无任务时,阻塞等待 if (!$data_pop){ continue; } echo "\t Starting new child | now we de have $this->child child processes\n"; $this->child++; $process = new swoole_process([$this, 'process']); $process->write(json_encode($data_pop)); $pid = $process->start(); } } } private function sig_handler($signo) {// echo "Recive: $signo \r\n"; switch ($signo) { case SIGCHLD: while($ret = swoole_process::wait(false)) {// echo "PID={$ret['pid']}\n"; $this->child--; } } }}
最终,经过测试,单核1G的服务器执行1到3秒的任务可以做到800的并发。
ps:欢迎各位大神与我交流,不知能否做到更好~
我是闫大伯,一只奋战了两个周末的野生程序猿

phpsessionstrackuserdataacrossmultiplepagerequestsususingauniqueIdStoredInAcookie.here'showtomanagetheMeftically : 1) STARTASESSIONSTART_START () andSTAREDATAIN $ _SESSION.2) RegenerATERATESSESSIDIDAFTERLOGINWITHSESSION_RATERATERATES (True) TopreventSES

PHP에서 세션 데이터를 통한 반복은 다음 단계를 통해 달성 할 수 있습니다. 1. Session_start ()를 사용하여 세션을 시작하십시오. 2. $ _session 배열의 모든 키 값 쌍을 통해 Foreach 루프를 통과합니다. 3. 복잡한 데이터 구조를 처리 할 때 is_array () 또는 is_object () 함수를 사용하고 print_r ()를 사용하여 자세한 정보를 출력하십시오. 4. Traversal을 최적화 할 때 페이징을 사용하여 한 번에 많은 양의 데이터를 처리하지 않도록 할 수 있습니다. 이를 통해 실제 프로젝트에서 PHP 세션 데이터를보다 효율적으로 관리하고 사용하는 데 도움이됩니다.

이 세션은 서버 측 상태 관리 메커니즘을 통해 사용자 인증을 인식합니다. 1) 세션 생성 및 고유 ID의 세션 생성, 2) ID는 쿠키를 통해 전달됩니다. 3) ID를 통해 서버 저장 및 세션 데이터에 액세스합니다. 4) 사용자 인증 및 상태 관리가 실현되어 응용 프로그램 보안 및 사용자 경험이 향상됩니다.

tostoreauser'snameinaphpsession, startSessionstart_start (), wathsignthenameto $ _session [ 'username']. 1) useSentess_start () toinitializethesession.2) assimeuser'snameto $ _session [ 'username']

phpsession 실패 이유에는 구성 오류, 쿠키 문제 및 세션 만료가 포함됩니다. 1. 구성 오류 : 올바른 세션을 확인하고 설정합니다. 2. 쿠키 문제 : 쿠키가 올바르게 설정되어 있는지 확인하십시오. 3. 세션 만료 : 세션 시간을 연장하기 위해 세션을 조정합니다 .GC_MAXLIFETIME 값을 조정하십시오.

PHP에서 세션 문제를 디버그하는 방법 : 1. 세션이 올바르게 시작되었는지 확인하십시오. 2. 세션 ID의 전달을 확인하십시오. 3. 세션 데이터의 저장 및 읽기를 확인하십시오. 4. 서버 구성을 확인하십시오. 세션 ID 및 데이터를 출력, 세션 파일 컨텐츠보기 등을 통해 세션 관련 문제를 효과적으로 진단하고 해결할 수 있습니다.

Session_Start ()로 여러 통화를하면 경고 메시지와 가능한 데이터 덮어 쓰기가 발생합니다. 1) PHP는 세션이 시작되었다는 경고를 발행합니다. 2) 세션 데이터의 예상치 못한 덮어 쓰기를 유발할 수 있습니다. 3) Session_status ()를 사용하여 반복 통화를 피하기 위해 세션 상태를 확인하십시오.

SESSION.GC_MAXLIFETIME 및 SESSION.COOKIE_LIFETIME을 설정하여 PHP에서 세션 수명을 구성 할 수 있습니다. 1) SESSION.GC_MAXLIFETIME 서버 측 세션 데이터의 생존 시간을 제어합니다. 2) 세션 .Cookie_Lifetime 클라이언트 쿠키의 수명주기를 제어합니다. 0으로 설정하면 브라우저가 닫히면 쿠키가 만료됩니다.


핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

Video Face Swap
완전히 무료인 AI 얼굴 교환 도구를 사용하여 모든 비디오의 얼굴을 쉽게 바꾸세요!

인기 기사

뜨거운 도구

SecList
SecLists는 최고의 보안 테스터의 동반자입니다. 보안 평가 시 자주 사용되는 다양한 유형의 목록을 한 곳에 모아 놓은 것입니다. SecLists는 보안 테스터에게 필요할 수 있는 모든 목록을 편리하게 제공하여 보안 테스트를 더욱 효율적이고 생산적으로 만드는 데 도움이 됩니다. 목록 유형에는 사용자 이름, 비밀번호, URL, 퍼징 페이로드, 민감한 데이터 패턴, 웹 셸 등이 포함됩니다. 테스터는 이 저장소를 새로운 테스트 시스템으로 간단히 가져올 수 있으며 필요한 모든 유형의 목록에 액세스할 수 있습니다.

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

VSCode Windows 64비트 다운로드
Microsoft에서 출시한 강력한 무료 IDE 편집기

맨티스BT
Mantis는 제품 결함 추적을 돕기 위해 설계된 배포하기 쉬운 웹 기반 결함 추적 도구입니다. PHP, MySQL 및 웹 서버가 필요합니다. 데모 및 호스팅 서비스를 확인해 보세요.

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기
