博客列表 >PHP7生产环境队列Beanstalkd用法详解

PHP7生产环境队列Beanstalkd用法详解

P粉823318658
P粉823318658原创
2022年03月24日 12:12:48657浏览
  1. 在本篇文章里小编给大家分享的是关于PHP7生产环境队列Beanstalkd用法内容,需要的朋友们可以参考下。
  2. 应用场景
  3. 为什么要用呢,有什么好处?这应该放在最开头说,一件东西你只有了解它是干什么的,适合干什么,才能更好的与自己的项目相结合,用到哪里学到哪里,学了不用等于不会,我们平时就应该多考虑一些这样的问题:自己做个什么项目功能能跟 xx 技术相结合呢?这个 xx 技术放在这种业务场景下行不行呢?而不是 “学了这个 xx 技术能干嘛呢,公司现在也没有用这个的呀,学了也没用啊”,带着这样心情去学习 xx 技术,肯定很痛苦。
  4. 队列大家都知道是将一些耗时的操作先不去做,先埋点,再异步去处理,这样对一些发邮件发短信之类的耗时操作,用户是感觉不到的,因为埋点结束,操作也就结束了,消费队列都是在服务器上做的。主要应用在短信或邮件通知,访问第三方接口订阅消息,商城的一些秒杀活动,都可以结合队列来完成。
  5. Beanstalkd 介绍
  6. Beanstalkd 是一个高性能,轻量级的分布式内存队列,C 代码,典型的类 Memcached 设计,协议和使用方式都是同样的风格,所以使用过 memcached 的用户会觉得 Beanstalkd 似曾相识。
  7. beanstalkd 的最初设计意图是在高并发的网络请求下,通过异步执行耗时较多的请求,及时返回结果,减少请求的响应延迟。
  8. Ubuntu 安装
  1. sudo apt-get install beanstalkd
  1. 配置文件
  1. vim /etc/default/beanstalkd
  1. 查看状态
  2. ``````php
  3. service beanstalkd status
  4. # 命令回显 #
  5. root@:/www/server/php/72/etc# service beanstalkd status
  6. beanstalkd.service - Simple, fast work queue
  7. Loaded: loaded (/lib/systemd/system/beanstalkd.service; enabled; vendor preset: enabled)
  8. Active: active (running) since Tue 2018-10-16 10:42:28 CST; 6 days ago
  9. Docs: man:beanstalkd(1)
  10. Main PID: 7033 (beanstalkd)
  11. Tasks: 1 (limit: 4634)
  12. CGroup: /system.slice/beanstalkd.service
  13. └─7033 /usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd
  14. Oct 16 10:42:28 ip-10-93-2-137 systemd[1]: Started Simple, fast work queue.
  1. 配置连通性 + 持久化
  2. ip 0.0.0.0 允许所有连接,靠配置安全组或防火墙去约束连接,放开 -b 参数 (默认没有持久化),内存的队列消息可以落地到硬盘 binlog 实现持久化,断电可重新读取队列消息
  1. vim /etc/default/beanstalkd
  2. BEANSTALKD_LISTEN_ADDR=0.0.0.0
  3. BEANSTALKD_LISTEN_PORT=11300
  4. BEANSTALKD_EXTRA="-b /var/lib/beanstalkd"
  1. beanstalkd 任务状态
  2. 状态 注释
  3. delayed 延迟状态
  4. ready 准备好状态
  5. reserved 消费者把任务读出来,处理时
  6. buried 预留状态
  7. delete 删除状态
  8. 管理工具
  9. 亲测了很多网上能找到的 beanstalkd 工具,这两款是我最中意的了,一个命令行,一个 web 的。
  10. 命令行:https://github.com/src-d/beanstool
  11. web 界面:https://github.com/ptrofimov/beanstalk_console
  12. 编程语言客户端
  13. PHP 客户端
  1. composer require pda/pheanstalk
  1. 写入 job
  1. <?php
  2. //创建队列消息
  3. require_once('./vendor/autoload.php');
  4. use Pheanstalk\Pheanstalk;
  5. $pheanstalk = new Pheanstalk('127.0.0.1',11300);
  6. $tubeName = 'email_list';
  7. $jobData = [
  8. 'email' => '123456@163.com',
  9. 'message' => 'Hello World !!',
  10. 'dtime' => date('Y-m-d H:i:s'),
  11. ];
  12. $pheanstalk->useTube( $tubeName)->put( json_encode( $jobData ) );
  1. 消费 job
  2. ```php
  3. <?php
  4. ini_set('default_socket_timeout', 86400*7);
  5. ini_set( 'memory_limit', '256M' );
  6. // 消费队列消息
  7. require_once('./vendor/autoload.php');
  8. use Pheanstalk\Pheanstalk;
  9. $pheanstalk = new Pheanstalk('127.0.0.1',11300);
  10. $tubeName = 'email_list';
  11. while ( true )
  12. {
  13. // 获取队列信息, reserve 阻塞获取
  14. $job = $pheanstalk->watch( $tubeName )->ignore( 'default' )->reserve();
  15. if ( $job !== false )
  16. {
  17. $data = $job->getData();
  18. /* TODO 逻辑操作 */
  19. /* 处理完成,删除 job */
  20. $pheanstalk->delete( $job );
  21. }
  22. }
  1. ```php
  2. default_socket_timeout 这个参数是一定要加的,php 默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s 之内如果没有 job 产生,脚本就会报 socket 错误,我写的是 7 天超时,您可以根据业务去调整,记住一定要配置,网上很多搜的 consumer 脚本都没有配置这个,根本不能投入生产环境使用,这是我亲自实践的结果。
  3. 关于 while true 是否死循环,很明确告诉你是死循环,但是不会一直耗性能的那样执行下去,它会在 reserve 这里阻塞不动,直到有消息产生才会往下走,所以大可放心使用,我的项目代码里面是使用了方法调用方法自身去实现循环的。
  4. 就是这样的代码,供参考:
  5. ```php
  6. {
  7. $job = $this->pheanstalk->watch( config( 'tube' ) )->ignore( 'default' )->reserve();
  8. if ( $job !== false )
  9. {
  10. $job_data = $job->getData();
  11. $this->subscribe( $job_data );
  12. $this->pheanstalk->delete( $job );
  13. /* 继续 Watch 下一个 job */
  14. $this->watchJob();
  15. }
  16. else
  17. {
  18. $this->log->error( 'reserve false', 'reserve false' );
  19. }
  20. }
  1. ```php
  2. 监控 beanstalkd 状态
  1. <?php
  2. //监控服务状态
  3. require_once('./vendor/autoload.php');
  4. use Pheanstalk\Pheanstalk;
  5. $pheanstalk = new Pheanstalk('127.0.0.1',11300);
  6. $isAlive = $pheanstalk->getConnection()->isServiceListening();
  7. var_dump( $isAlive );
  1. 可以配合 email 做一个报警邮件,脚本每分钟去执行,判断状态是 false,就给管理员发送邮件报警。
  2. 一些相关命令
  3. 查看 beanstalkd 服务内存占用
  4. ``````php
  5. top -u beanstalkd
  6. ``````php
  7. 后台运行 consumer 脚本
  8. ``````php
  9. nohup php googlehome_subscribe.php &
  10. ``````php
  11. 查看 consumer 脚本运行时间
  12. ``````php
  13. ps -A -opid,stime,etime,args | grep consumer.php
  14. ``````php
  15. 手工重启 consumer 脚本
  16. ``````php
  17. ps auxf|grep 'googlehome_subscribe.php'|grep -v grep|awk '{print $2}'|xargs kill -9
  18. nohup php googlehome_subscribe.php &
  19. ``````php
  20. 一些总结
  21. php 要把错误日志打开,方便收集 consumer 脚本 crash log,脚本跑出一些致命的 error 一定要及时修复,因为一旦有错就会挂掉,这会影响你脚本的可用性,后期稳定之后可以上 supervisor 这种进程管理程序来管控脚本生命周期。
  22. 一些网络请求操作,一定要 try catch 到所有错误,一旦没有 catch 到,脚本就崩。我用的是 Guzzle 去做的网络请求,下面是我 catch 的一些错误,代码片段供参考。
  23. ``````php
  24. try
  25. {
  26. /* TODO: 逻辑操作 */
  27. }
  28. catch ( ClientException $e )
  29. {
  30. $results['mid'] = $this->mid;
  31. $results['code'] = $e->getResponse()->getStatusCode();
  32. $results['reason'] = $e->getResponse()->getReasonPhrase();
  33. $this->log->error( 'properties-changed ClientException', $results );
  34. }
  35. catch ( ServerException $e )
  36. {
  37. $results['mid'] = $this->mid;
  38. $results['code'] = $e->getResponse()->getStatusCode();
  39. $results['reason'] = $e->getResponse()->getReasonPhrase();
  40. $this->log->error( 'properties-changed ServerException', $results );
  41. }
  42. catch ( ConnectException $e )
  43. {
  44. $results['mid'] = $this->mid;
  45. $this->log->error( 'properties-changed ConnectException', $results );
  46. }
声明:本文内容转载自脚本之家,由网友自发贡献,版权归原作者所有,如您发现涉嫌抄袭侵权,请联系admin@php.cn 核实处理。
全部评论
文明上网理性发言,请遵守新闻评论服务协议