응용 시나리오
왜 사용해야 하나요? 어떤 이점이 있나요? 이것은 처음부터 말해야 합니다. 어떤 일이 무엇인지, 그것이 무엇에 적합한지 이해해야만 그것을 당신의 프로젝트와 더 잘 통합할 수 있습니다. 그것을 배운 후에 그것은 당신이 그것을 모른다는 것을 의미합니다. 우리는 일반적으로 다음과 같은 질문을 더 고려해야 합니다. xx 기술과 결합할 수 있는 프로젝트 기능은 무엇입니까? 이 비즈니스 시나리오에서 이 xx 기술이 실현 가능합니까? "이 XX 기술을 배우면 어떡하지? 회사가 지금은 안 써주니까 배우면 소용이 없지." 이런 기분으로 XX 기술을 배우는 게 참 괴로울 것 같아요.
우리 모두는 대기열이 시간이 많이 걸리는 작업을 먼저 수행하지 않고 먼저 묻혀 있다가 비동기식으로 처리된다는 것을 알고 있습니다. 이러한 방식으로 사용자는 이메일이나 문자 메시지 보내기와 같은 시간이 많이 걸리는 작업을 느낄 수 없습니다. 종료를 클릭하면 서버에서 소비 대기열이 모두 완료됩니다. 이는 주로 SMS 또는 이메일 알림, 메시지 구독을 위한 제3자 인터페이스 액세스, 대기열과 결합하여 완료할 수 있는 쇼핑몰의 일부 플래시 세일 활동에 사용됩니다.
Beanstalkd 소개
Beanstalkd는 고성능, 경량 분산 메모리 큐, C 코드, 일반적인 Memcached와 유사한 디자인, 프로토콜 및 사용법이 동일하므로 Memcached를 사용해 본 사용자에게는 Beanstalkd가 친숙하게 느껴질 것입니다.
beantalkd의 원래 설계 의도는 시간이 많이 걸리는 요청을 비동기식으로 실행하고 적시에 결과를 반환하여 동시 네트워크 요청이 많은 경우 요청의 응답 지연을 줄이는 것입니다.
Ubuntu 설치
sudo apt-get install beanstalkd
구성 파일
vim /etc/default/beanstalkd
상태 보기
service beanstalkd status # 命令回显 # root@:/www/server/php/72/etc# service beanstalkd status ● beanstalkd.service - Simple, fast work queue Loaded: loaded (/lib/systemd/system/beanstalkd.service; enabled; vendor preset: enabled) Active: active (running) since Tue 2018-10-16 10:42:28 CST; 6 days ago Docs: man:beanstalkd(1) Main PID: 7033 (beanstalkd) Tasks: 1 (limit: 4634) CGroup: /system.slice/beanstalkd.service └─7033 /usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd Oct 16 10:42:28 ip-10-93-2-137 systemd[1]: Started Simple, fast work queue.
연결 + 지속성 구성
ip 0.0.0.0을 사용하여 모든 연결 허용, 보안 그룹 또는 방화벽 제약 조건 구성 연결, -b 매개변수를 해제하면(기본적으로 지속성은 없음) 지속성을 위해 메모리의 대기열 메시지를 하드 디스크 binlog에 삭제할 수 있으며 정전 후 대기열 메시지를 다시 읽을 수 있습니다.
vim /etc/default/beanstalkd BEANSTALKD_LISTEN_ADDR=0.0.0.0 BEANSTALKD_LISTEN_PORT=11300 BEANSTALKD_EXTRA="-b /var/lib/beanstalkd"
beanstalkd 작업 상태
status | note |
---|---|
delayed | delayed status |
ready | read y 상태 좋음 |
reserved | 소비자가 작업을 읽습니다. |
buried | 예약 상태 |
delete | delete 상태 |
管理工具
亲测了很多网上能找到的 beanstalkd 工具,这两款是我最中意的了,一个命令行,一个 web 的。
命令行:https://github.com/src-d/beanstool
web 界面:https://github.com/ptrofimov/beanstalk_console
编程语言客户端
PHP 客户端
https://packagist.org/packages/pda/pheanstalk
composer require pda/pheanstalk
写入 job
<?php //创建队列消息 require_once('./vendor/autoload.php'); use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1',11300); $tubeName = 'email_list'; $jobData = [ 'email' => '123456@163.com', 'message' => 'Hello World !!', 'dtime' => date('Y-m-d H:i:s'), ]; $pheanstalk->useTube( $tubeName)->put( json_encode( $jobData ) );
消费 job
<?php ini_set('default_socket_timeout', 86400*7); ini_set( 'memory_limit', '256M' ); // 消费队列消息 require_once('./vendor/autoload.php'); use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1',11300); $tubeName = 'email_list'; while ( true ) { // 获取队列信息, reserve 阻塞获取 $job = $pheanstalk->watch( $tubeName )->ignore( 'default' )->reserve(); if ( $job !== false ) { $data = $job->getData(); /* TODO 逻辑操作 */ /* 处理完成,删除 job */ $pheanstalk->delete( $job ); } }
default_socket_timeout 这个参数是一定要加的,php 默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s 之内如果没有 job 产生,脚本就会报 socket 错误,我写的是 7 天超时,您可以根据业务去调整,记住一定要配置,网上很多搜的 consumer 脚本都没有配置这个,根本不能投入生产环境使用,这是我亲自实践的结果。
关于 while true 是否死循环,很明确告诉你是死循环,但是不会一直耗性能的那样执行下去,它会在 reserve 这里阻塞不动,直到有消息产生才会往下走,所以大可放心使用,我的项目代码里面是使用了方法调用方法自身去实现循环的。
就是这样的代码,供参考:
public function watchJob() { $job = $this->pheanstalk->watch( config( 'tube' ) )->ignore( 'default' )->reserve(); if ( $job !== false ) { $job_data = $job->getData(); $this->subscribe( $job_data ); $this->pheanstalk->delete( $job ); /* 继续 Watch 下一个 job */ $this->watchJob(); } else { $this->log->error( 'reserve false', 'reserve false' ); } }
监控 beanstalkd 状态
<?php //监控服务状态 require_once('./vendor/autoload.php'); use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1',11300); $isAlive = $pheanstalk->getConnection()->isServiceListening(); var_dump( $isAlive );
可以配合 email 做一个报警邮件,脚本每分钟去执行,判断状态是 false,就给管理员发送邮件报警。
一些相关命令
查看 beanstalkd 服务内存占用
top -u beanstalkd
后台运行 consumer 脚本
nohup php googlehome_subscribe.php &
查看 consumer 脚本运行时间
ps -A -opid,stime,etime,args | grep consumer.php
手工重启 consumer 脚本
ps auxf|grep 'googlehome_subscribe.php'|grep -v grep|awk '{print $2}'|xargs kill -9 nohup php googlehome_subscribe.php &
一些总结
php 要把错误日志打开,方便收集 consumer 脚本 crash 的 log,脚本跑出一些致命的 error 一定要及时修复,因为一旦有错就会挂掉,这会影响你脚本的可用性,后期稳定之后可以上 supervisor 这种进程管理程序来管控脚本生命周期。
一些网络请求操作,一定要 try catch 到所有错误,一旦没有 catch 到,脚本就崩。我用的是 Guzzle 去做的网络请求,下面是我 catch 的一些错误,代码片段供参考。
try { /* TODO: 逻辑操作 */ } catch ( ClientException $e ) { $results['mid'] = $this->mid; $results['code'] = $e->getResponse()->getStatusCode(); $results['reason'] = $e->getResponse()->getReasonPhrase(); $this->log->error( 'properties-changed ClientException', $results ); } catch ( ServerException $e ) { $results['mid'] = $this->mid; $results['code'] = $e->getResponse()->getStatusCode(); $results['reason'] = $e->getResponse()->getReasonPhrase(); $this->log->error( 'properties-changed ServerException', $results ); } catch ( ConnectException $e ) { $results['mid'] = $this->mid; $this->log->error( 'properties-changed ConnectException', $results ); }
job 消费之后一定要删除掉,如果长时间不删除,php 客户端会有 false 返回,是因为有 DEADLINE_SOON 这个超时错误产生,所以处理完任务,一定要记得删除,这一点跟 kafka 不一样,beanstalkd 需要开发者自己去删除 job。
推荐教程:《PHP教程》
위 내용은 PHP7 프로덕션 환경 대기열 Beanstalkd 올바른 사용 자세의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!