首頁  >  文章  >  後端開發  >  PHP7 生產環境佇列 Beanstalkd 正確使用姿勢

PHP7 生產環境佇列 Beanstalkd 正確使用姿勢

Guanhui
Guanhui轉載
2020-05-18 17:47:303294瀏覽

PHP7 生產環境佇列 Beanstalkd 正確使用姿勢

應用場景

  為什麼要用呢,有什麼好處?這應該放在最開頭說,一件東西你只有了解它是做什麼的,適合幹什麼,才能更好的與自己的項目相結合,用到哪裡學到哪裡,學了不用等於不會,我們平時就應該多考慮一些這樣的問題:自己做個什麼專案功能能跟xx 技術結合呢?這個 xx 技術放在這種業務場景下行不行呢?而不是 “學了這個 xx 技術能幹什麼呢,公司現在也沒有用這個的呀,學了也沒用啊”,帶著這樣心情去學習 xx 技術,肯定很痛苦。

  隊列大家都知道是將一些耗時的操作先不去做,先埋點,再異步去處理,這樣對一些發郵件發短信之類的耗時操作,用戶是感覺不到的,因為埋點結束,操作也就結束了,消費佇列都是在伺服器上做的。主要應用在簡訊或郵件通知,存取第三方介面訂閱訊息,商城的一些秒殺活動,都可以結合隊列來完成。

Beanstalkd 介紹

  Beanstalkd 是一個高效能,輕量級的分散式記憶體佇列,C 程式碼,典型的類別Memcached 設計,協定和使用方式都是同樣的風格,所以使用過memcached 的用戶會覺得Beanstalkd 似曾相識。

  beanstalkd 的最初設計意圖是在高並發的網路請求下,透過非同步執行耗時較多的請求,及時返回結果,減少請求的回應延遲。

Ubuntu 安裝

sudo apt-get install beanstalkd

設定檔

vim /etc/default/beanstalkd

查看狀態

service beanstalkd status
# 命令回显 #
root@:/www/server/php/72/etc# service beanstalkd status
● beanstalkd.service - Simple, fast work queue
   Loaded: loaded (/lib/systemd/system/beanstalkd.service; enabled; vendor preset: enabled)
   Active: active (running) since Tue 2018-10-16 10:42:28 CST; 6 days ago
     Docs: man:beanstalkd(1)
 Main PID: 7033 (beanstalkd)
    Tasks: 1 (limit: 4634)
   CGroup: /system.slice/beanstalkd.service
           └─7033 /usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd
Oct 16 10:42:28 ip-10-93-2-137 systemd[1]: Started Simple, fast work queue.

配置連通性持久化

ip 用0.0.0.0 允許所有連接,靠配置安全群組或防火牆去約束連接,放開-b 參數(預設沒有持久化),內存的佇列訊息可以落地到硬碟binlog 實作持久化,斷電可重新讀取佇列訊息。

vim /etc/default/beanstalkd
BEANSTALKD_LISTEN_ADDR=0.0.0.0
BEANSTALKD_LISTEN_PORT=11300
BEANSTALKD_EXTRA="-b /var/lib/beanstalkd"

beanstalkd 任務狀態

##delayed延遲狀態
#狀態 註解
#########ready######準備好狀態############ reserved######消費者把任務讀出來,處理時############buried######預留狀態############ delete######刪除狀態#############

管理工具

亲测了很多网上能找到的 beanstalkd 工具,这两款是我最中意的了,一个命令行,一个 web 的。

命令行:https://github.com/src-d/beanstool

web 界面:https://github.com/ptrofimov/beanstalk_console

编程语言客户端

PHP 客户端

https://packagist.org/packages/pda/pheanstalk

composer require pda/pheanstalk

写入 job

<?php
//创建队列消息
require_once(&#39;./vendor/autoload.php&#39;);
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk(&#39;127.0.0.1&#39;,11300);
$tubeName = &#39;email_list&#39;;
$jobData = [
    &#39;email&#39; => &#39;123456@163.com&#39;,
    &#39;message&#39; => &#39;Hello World !!&#39;,
    &#39;dtime&#39; => date(&#39;Y-m-d H:i:s&#39;),
];
$pheanstalk->useTube( $tubeName)->put( json_encode( $jobData ) );

消费 job

<?php
ini_set(&#39;default_socket_timeout&#39;, 86400*7);
ini_set( &#39;memory_limit&#39;, &#39;256M&#39; );
// 消费队列消息
require_once(&#39;./vendor/autoload.php&#39;);
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk(&#39;127.0.0.1&#39;,11300);
$tubeName = &#39;email_list&#39;;
while ( true )
{
    // 获取队列信息, reserve 阻塞获取
    $job = $pheanstalk->watch( $tubeName )->ignore( &#39;default&#39; )->reserve();
    if ( $job !== false )
    {
        $data = $job->getData();
        /* TODO 逻辑操作 */
        /* 处理完成,删除 job */
        $pheanstalk->delete( $job );
    }
}

default_socket_timeout 这个参数是一定要加的,php 默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s 之内如果没有 job 产生,脚本就会报 socket 错误,我写的是 7 天超时,您可以根据业务去调整,记住一定要配置,网上很多搜的 consumer 脚本都没有配置这个,根本不能投入生产环境使用,这是我亲自实践的结果。

  关于 while true 是否死循环,很明确告诉你是死循环,但是不会一直耗性能的那样执行下去,它会在 reserve 这里阻塞不动,直到有消息产生才会往下走,所以大可放心使用,我的项目代码里面是使用了方法调用方法自身去实现循环的。

就是这样的代码,供参考:

    public function watchJob()
    {
        $job = $this->pheanstalk->watch( config( &#39;tube&#39; ) )->ignore( &#39;default&#39; )->reserve();
        if ( $job !== false )
        {
            $job_data = $job->getData();
            $this->subscribe( $job_data );
            $this->pheanstalk->delete( $job );
            /* 继续 Watch 下一个 job */
            $this->watchJob();
        }
        else
        {
            $this->log->error( &#39;reserve false&#39;, &#39;reserve false&#39; );
        }
    }

监控 beanstalkd 状态

<?php
//监控服务状态
require_once(&#39;./vendor/autoload.php&#39;);
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk(&#39;127.0.0.1&#39;,11300);
$isAlive = $pheanstalk->getConnection()->isServiceListening();
var_dump( $isAlive );

可以配合 email 做一个报警邮件,脚本每分钟去执行,判断状态是 false,就给管理员发送邮件报警。

一些相关命令

查看 beanstalkd 服务内存占用

top -u beanstalkd

后台运行 consumer 脚本

nohup php googlehome_subscribe.php &

查看 consumer 脚本运行时间

ps -A -opid,stime,etime,args | grep consumer.php

手工重启 consumer 脚本

ps auxf|grep &#39;googlehome_subscribe.php&#39;|grep -v grep|awk &#39;{print $2}&#39;|xargs kill -9 
nohup php googlehome_subscribe.php &amp;

一些总结

  php 要把错误日志打开,方便收集 consumer 脚本 crash 的 log,脚本跑出一些致命的 error 一定要及时修复,因为一旦有错就会挂掉,这会影响你脚本的可用性,后期稳定之后可以上 supervisor 这种进程管理程序来管控脚本生命周期。

  一些网络请求操作,一定要 try catch 到所有错误,一旦没有 catch 到,脚本就崩。我用的是 Guzzle 去做的网络请求,下面是我 catch 的一些错误,代码片段供参考。

try
{
    /* TODO: 逻辑操作 */
}
catch ( ClientException $e )
{
    $results[&#39;mid&#39;]    = $this->mid;
    $results[&#39;code&#39;]   = $e->getResponse()->getStatusCode();
    $results[&#39;reason&#39;] = $e->getResponse()->getReasonPhrase();
    $this->log->error( &#39;properties-changed ClientException&#39;, $results );
}
catch ( ServerException $e )
{
    $results[&#39;mid&#39;]    = $this->mid;
    $results[&#39;code&#39;]   = $e->getResponse()->getStatusCode();
    $results[&#39;reason&#39;] = $e->getResponse()->getReasonPhrase();
    $this->log->error( &#39;properties-changed ServerException&#39;, $results );
}
catch ( ConnectException $e )
{
    $results[&#39;mid&#39;] = $this->mid;
    $this->log->error( &#39;properties-changed ConnectException&#39;, $results );
}

  job 消费之后一定要删除掉,如果长时间不删除,php 客户端会有 false 返回,是因为有 DEADLINE_SOON 这个超时错误产生,所以处理完任务,一定要记得删除,这一点跟 kafka 不一样,beanstalkd 需要开发者自己去删除 job。

推荐教程:《PHP教程

以上是PHP7 生產環境佇列 Beanstalkd 正確使用姿勢的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:learnku.com。如有侵權,請聯絡admin@php.cn刪除