佇列
佇列
#忽略缺少的模型
{tip} Laravel 現在為你的Redis 佇列提供了Horizon,一個漂亮的儀錶板和設定係統。查看完整的 Horizon documentation 文件 以了解更多資訊。
連接 Vs. 隊列
在開始使用 Laravel 隊列前,弄清楚 「連接」 和 「隊列」 的差異是很重要的。在你的 config/queue.php
設定檔裡,有一個 connections
設定選項。這個選項給 Amazon SQS,Beanstalk,或 Redis 這樣的後端服務定義了一個獨特的連線。不管是哪一種,一個給定的連接可能會有多個 “隊列”,而 “隊列” 可以被認為是不同的堆疊或大量的隊列任務。
要注意的是,queue
設定檔中每個連線的設定範例中都包含一個 queue
屬性。這是預設佇列任務被發給指定連線的時候會被分送到這個佇列中。換句話說,如果你分發任務的時候沒有明確定義隊列,那麼它就會被放到連接配置中queue
屬性所定義的隊列中:
// 这个任务将被分发到默认队列... Job::dispatch(); // 这个任务将被发送到「emails」队列... Job::dispatch()->onQueue('emails');
有些應用可能不需要把任務發到不同的隊列,而只發到一個簡單的隊列就行了。但把任務推到不同的佇列仍然是非常有用的,因為 Laravel 佇列處理器允許你定義佇列的優先權,所以你能給不同的佇列劃分不同的優先權或區分不同任務的不同處理方式了。比方說,如果你把任務推到high
佇列中,你就能讓佇列處理器優先處理這些任務了:
php artisan queue:work --queue=high,default
驅動程式的必要設定
Database
為了使用database
佇列驅動,你需要一張資料表來存儲任務。執行 queue:table
Artisan 指令來建立這張表的遷移檔案。當遷移檔案建立好後,你就可以使用migrate
指令來進行遷移:
php artisan queue:table php artisan migrate
Redis
為了使用redis
佇列驅動,你需要在config/database.php
設定檔中設定Redis 的資料庫連線。
Redis 叢集
如果你的 Redis 佇列驅動程式使用了 Redis 叢集,你的佇列名稱必須包含一個 key hash tag 。這是為了確保所有的 Redis 鍵對於一個佇列都被放在同一雜湊中。
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => '{default}', 'retry_after' => 90, ],
阻塞
當使用Redis 佇列時,你可以用block_for
設定項來具體說明驅動應該在將任務重新放入Redis資料庫以及處理器輪詢之前阻塞多久。
基於你的隊列載入來調整這個值比把新任務放入 Redis 資料庫輪詢更有效率的多。例如,你可以將這個值設為 5
來表示這個驅動程式應該在等待任務可用時阻塞 5 秒。
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => 'default', 'retry_after' => 90, 'block_for' => 5, ],
其它佇列驅動的依賴擴充包
在使用清單裡的佇列服務前,必須安裝下列依賴擴充套件:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~4.0
- Redis:
predis/predis ~1.0
「建立任務
#產生任務類別
app/Jobs 目錄下。如果這個目錄不存在,那麼當你執行 make:job
Artisan 指令時目錄就會被自動建立。你可以用以下的Artisan 指令來產生一個新的佇列任務:php artisan make:job ProcessPodcast
Illuminate\Contracts\Queue\ShouldQueue
接口,這表示這個任務將會被推送到佇列中,而不是同步執行。#任務類別的結構很簡單,一般來說只會包含一個讓佇列用來呼叫此任務的
handle 方法。我們來看一個範例的任務類別。這個範例裡,假設我們管理著一個播客發布服務,在發布之前需要處理上傳播客文件:
<?php namespace App\Jobs; use App\Podcast; use App\AudioProcessor; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; class ProcessPodcast implements ShouldQueue{ use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * 创建一个新的任务实例。 * * @param Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * 运行任务。 * * @param AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // Process uploaded podcast... } }注意,在這個例子中,我們在任務類的建構器中直接傳遞了一個Eloquent 模型。因為我們在任務類別裡引用了
SerializesModels 這個 trait,使得 Eloquent 模型在處理任務時可以被優雅地序列化和反序列化。如果你的佇列任務類別在建構器中接收了一個 Eloquent 模型,那麼只有可辨識出該模型的屬性會被序列化到佇列裡。當任務實際運作時,佇列系統會自動從資料庫中重新取回完整的模型。這整個過程對你的應用程式來說是完全透明的,這可以避免在序列化完整的 Eloquent 模式實例時所帶來的一些問題。
在佇列處理任務時,會呼叫
handle 方法,而這裡我們也可以透過
handle 方法的參數型別提示,讓Laravel 的服務容器自動注入依賴對象。
handle方法接受一個任務和容器的回呼。雖然可以直接在回調中可以呼叫方法,可以使用容器的
bindMethodbindMethod
方法。
handle 方法,但建議應該從service provider 呼叫為佳:
use App\Jobs\ProcessPodcast; $this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) { return $job->handle($app->make(AudioProcessor::class)); });
分發任務
一旦你寫完了你的任務類別你就可以使用它自帶的 dispatch
方法來分發它。傳遞給dispatch
方法的參數將會傳遞給任務的建構子:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * 存储一个新的播客节目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... ProcessPodcast::dispatch($podcast); } }##延遲分發如果你想延遲你的佇列任務的執行,你可以在分發任務的時候使用
delay 方法。例如,讓我們詳細說明一個十分鐘之後才會執行的任務:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller;class PodcastController extends Controller{ /** * 存储一个新的播客节目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... ProcessPodcast::dispatch($podcast) ->delay(now()->addMinutes(10)); } }
{note} Amazon SQS 佇列服務最大延遲 15 分鐘的時間。同步調度如果您想要立即(同步)執行佇列任務,可以使用
dispatchNow 方法。使用此方法時,佇列任務將不會排隊,並立即在目前進程中執行:
<?php namespace App\Http\Controllers; use Illuminate\Http\Request; use App\Jobs\ProcessPodcast; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * Store a new podcast. * * @param Request $request * @return Response */ public function store(Request $request) { // Create podcast... ProcessPodcast::dispatchNow($podcast); } }工作鏈
工作鏈允許你具體定義一個按序列執行佇列任務的清單。一旦序列中的任務失敗了,剩餘的工作就不會執行。要執行一個工作鏈,你可以對可分發的任務使用
withChain 方法:
ProcessPodcast::withChain([ new OptimizePodcast, new ReleasePodcast ])->dispatch();
{note} 使用工作鏈連線& 佇列如果你想定義用於工作鏈的預設連線和佇列,你可以使用$this->delete()
方法刪除佇列任務不會阻止工作鏈任務執行。只有當工作鏈中的任務執行失敗時,工作鏈才會停止執行。
allOnConnection和
allOnQueue 方法。這些方法指定了所需佇列的連線和佇列- 除非佇列任務被明確地指定給了不同的連線/ 佇列:
ProcessPodcast::withChain([ new OptimizePodcast, new ReleasePodcast ])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');自訂連接& 隊列分發任務到指定隊列透過將任務分發到不同隊列,你可以將你的隊列任務「分類」,甚至指定給不同隊列分配的任務數量。記住,這不是推送任務到你定義的佇列設定檔的不同的連接裡,而是一個單一的連接。要指定佇列,在分發任務時使用
onQueue 方法:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * 存储一个新的播客节目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... ProcessPodcast::dispatch($podcast)->onQueue('processing'); } }分發任務到指定連線如果你在多佇列連線中工作,你可以指定將任務分發到哪個連線。要指定連接,在分發任務時使用
onConnection 方法:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * 存储一个新播客节目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... ProcessPodcast::dispatch($podcast)->onConnection('sqs'); } }當然,你可以鍊式呼叫
onConnection 和
onQueue 方法來指定連接和隊列。
ProcessPodcast::dispatch($podcast) ->onConnection('sqs') ->onQueue('processing');
指定最大任務嘗試次數/ 逾時值
最大嘗試次數
在一個任務重指定最大嘗試次數可以透過Artisan 指令的-- tries
選項指定:
php artisan queue:work --tries=3
你可能想要透過任務類別本身對最大任務嘗試次數進行一個更顆粒化的處理。如果最大嘗試次數是在任務類別中定義的,它將優先於命令列中的值提供:
<?phpnamespace App\Jobs;class ProcessPodcast implements ShouldQueue{ /** * 任务可以尝试的最大次数。 * * @var int */ public $tries = 5;}
基於時間的嘗試
作為另一個選擇來定義任務在失敗前會嘗試多少次,你可以定義一個任務逾時時間。這樣的話,在給定的時間範圍內,任務可以無限嘗試。要定義一個任務的逾時時間,在你的任務類別中新增一個retryUntil
方法:
/** * 定义任务超时时间 * * @return \DateTime */ public function retryUntil(){ return now()->addSeconds(5); }
{tip} 你也可以在你的佇列事件監聽器中使用
retryUntil
方法。
逾時
{note}
timeout
特性對於PHP 7.1 和pcntl
# PHP 擴充功能進行優化.
同樣的,任務執行最大秒數的數值可以透過Artisan 命令列的--timeout
選項來指定。
php artisan queue:work --timeout=30
然而,你可能也想在任務類別中本身定義一個逾時時間。如果在任務類別中指定,優先權將會高於命令列:
<?php namespace App\Jobs;class ProcessPodcast implements ShouldQueue{ /** * 任务可以执行的最大秒数 (超时时间)。 * * @var int */ public $timeout = 120;}#
頻率限制
{note} 這個特性要求你的應用程式可以使用Redis 伺服器.
如果你的應用程式使用了Redis,你可以透過時間或並發限制你的隊列任務。當你的佇列任務透過同樣有速率限制的 API 使用時,這個特性將會很有幫助。
例如,使用 throttle
方法,你可以限制給定類型的任務每 60 秒只執行 10 次。如果沒有獲得鎖,一般情況下你應該將任務放回佇列以使其可以稍後重試。
Redis::throttle('key')->allow(10)->every(60)->then(function () { // 任务逻辑... }, function () { // 无法获得锁... return $this->release(10); });
{tip} 在上述的例子裡,
key
可以是任何你想要限制頻率的任務類型的唯一識別字串。例如,使用構件基於任務類別名稱的 key,或它操作的 Eloquent 模型的 ID。{note} 將受限的作業釋放回佇列,仍會增加工作的總數
attempts
。
或者,你可以指定一個任務可以同時執行的最大數量。在以下情況時這會很有用處:當一個佇列中的任務正在修改資源時,一次只能被一個任務修改。例如,使用funnel
方法,你可以限制給定類型的任務一次只能執行一個處理器:
Redis::funnel('key')->limit(1)->then(function () { // 任务逻辑...}, function () { // 无法获得锁... return $this->release(10); });
{tip} 當使用頻率限制時,任務執行成功的嘗試的次數可能會難以確定。所以,將頻率限制與 時間限制 組合是很有作用的。
錯誤處理
如果在任務執行的時候出現異常,任務會自動釋放到隊列中以再次嘗試。任務將會一直被釋放直到達到應用程式允許的最大重試次數。最大重試的數值由 queue:work
Artisan 指令的 --tries
選項定義,或在任務類別中定義。更多執行佇列處理器的資訊可以 在以下找到 。
排隊閉包
你也可以直接呼叫閉包,而不是將任務類別調度到佇列中。這對於需要執行的快速、簡單的任務非常有用:
$podcast = App\Podcast::find(1); dispatch(function () use ($podcast) { $podcast->publish(); });
將閉包分派給佇列時,閉包的程式碼內容將以加密方式簽名,因此無法在傳輸過程中對其進行修改。
運行佇列處理器
Laravel 包含了一個佇列處理器以將推送到佇列中的任務執行。你可以使用 queue:work
Artisan 指令來執行處理器。注意一旦 queue:work
指令開始執行,它會一直運作直到它被手動停止或終端機被關閉。
php artisan queue:work
{tip} 要讓
queue:work
進程一直在後台運行,你應該使用進程管理器例如Supervisor 來確保佇列處理器不會停止執行
記住,佇列處理器是一個常駐的進程並且在記憶體中保存著已經啟動的應用程式狀態。因此,它們並不會在啟動後注意到你程式碼的變更。所以,在你的重新部署過程中,請記得重啟你的佇列處理器.
#指定連線& 佇列
你也可以具體說明佇列處理器應該使用哪個佇列連接。傳遞給 work
的連接名稱應該與你的 config/queue.php
設定檔中定義的連接之一相符。
php artisan queue:work redis
你甚至可以自訂你的佇列處理器使其只執行連線中指定的佇列。例如,如果你的所有郵件都由redis
連接的emails
佇列處理,你可以使用以下的指令啟動一個只執行此佇列的處理器:
php artisan queue:work redis --queue=emails
執行單一任務
--once
選項用於使佇列處理器只處理佇列中的單一任務。
php artisan queue:work --once
處理所有佇列的任務然後退出
--stop-when-empty
選項可用來處理佇列處理器處理所有作業然後優雅地退出。如果您希望在佇列為空後關閉容器,則在Docker 容器中執行Laravel 佇列時,此選項很有用:
php artisan queue:work --stop-when-empty##資源注意事項後台駐留的佇列處理器不會在執行完每個任務後「重新啟動」框架。因此,你應該在每個任務完成後釋放任何佔用過多的資源。例如,如果你正在用 GD 庫執行映像處理,你應該在完成後使用
imagedestroy 釋放記憶體。
佇列優先權
有時你可能會想決定佇列執行的優先權。例如在config/queue.php
中你可以將redis
連接的queue
佇列的優先權從default
設定為 low
。然而, 偶爾你也想像如下方式將一個任務推送到high
隊列:
dispatch((new Job)->onQueue('high'));
要運行一個處理器來確認low
隊列中的任務在全部的high
佇列任務完成後才繼續執行,你可以傳遞一個逗號分隔的佇列名稱清單作為work
指令的參數。
php artisan queue:work --queue=high,low
佇列處理器& 部署
因為佇列處理器是常駐進程,他們在重新啟動前不會應用你程式碼的變更。因此,部署使用佇列處理器的應用最簡單的方法是在部署進程中重新啟動佇列處理器。你可以平滑地重啟所有佇列處理器透過使用queue:restart
方法:
php artisan queue:restart
這個指令將會引導所有的佇列處理器在完成目前任務後平滑「中止」,這樣不會有遺失的任務。由於在執行 queue:restart
後佇列處理器將會中止,所以你應該執行一個行程管理器例如 Supervisor 來自動重啟佇列處理器。
{tip} 佇列使用 快取 儲存重新啟動訊號,所以你應該確定在使用這個功能之前配置好快取驅動。
任務過期& 逾時
任務過期
在你的config/queue.php
設定檔中,每個佇列連線都定義了一個retry_after
選項。這個選項指定了隊列連線在重試一個任務前應該等它執行多久。例如,如果 retry_after
的值設定為 90
,那麼任務在執行了 90 秒後將會放回佇列而不是刪除它。一般情況下,你應該將 retry_after
的值設定為你認為你的任務可能會執行需要最長時間的值。
{note} 只有在 Amazon SQS 中不存在
retry_after
這個值。 SQS 將會以 AWS 控制台配置的 預設可見逾時值 作為重試任務的依據。
處理器逾時
queue:work
Artisan 指令包含一個 --timeout
選項。 --timeout
選項指定了 Laravel 的佇列主程序在中止一個執行任務的子程序之前需要等到多久。有時一個子程序可能會因為各種原因而「凍結」,例如一個外部的 HTTP 請求失去回應。 --timeout
選項會移除那些超過指定時間被凍結的進程。
php artisan queue:work --timeout=60
retry_after
設定項和--timeout
命令列配置並不同,但將它們同時使用可以確保任務不會遺失且任務只會成功執行一次。
{note}
--timeout
的值應該比你在retry_after
中配置的值至少短幾秒鐘。這會確保處理器永遠會在一個任務被重試之前中止。如果你的--timeout
值比retry_after
的值長的話,你的任務可能會被執行兩次。
佇列進程睡眠時間
當任務在佇列中可用時,處理器將會一直無間隔地處理任務。然而, sleep
選項定義瞭如果沒有新任務的時候處理器將會「睡眠」多久。在處理器睡眠時,它不會處理任何新任務 —— 任務將會在佇列處理器再次啟動後執行。
php artisan queue:work --sleep=3
Supervisor 設定
安裝Supervisor
Supervisor 是Linux作業系統下中的一個行程監控器,它可以在queue:work
掛掉時自動重啟之。在Ubuntu 上安裝Supervisor,你可以使用以下指令:
sudo apt-get install supervisor
{小提醒} 如果覺得設定Supervisor 難於登天,可以考慮使用Laravel Forge,它將自動為你的Laravel 專案安裝和設定Supervisor。
設定 Supervisor
Supervisor 的設定檔通常位於 /etc/supervisor/conf.d
目錄下。在該目錄中,你可以建立任意數量的配置文件,用來控制 supervisor 將如何監控你的進程。例如,建立一個laravel-worker.conf
檔案使之啟動和監控一個queue:work
進程:
[program:laravel-worker] process_name=%(program_name)s_%(process_num)02d command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 autostart=true autorestart=true user=forge numprocs=8 redirect_stderr=true stdout_logfile=/home/forge/app.com/worker.log
在這個範例中,numprocs
指令將指定Supervisor 運行8 個queue:work
進程並對其進行監控,如果它們掛掉就自動重新啟動它們。你應該更改 command
選項中的 queue:work sqs
部分以表示你所需的佇列連線。
啟動Supervisor
設定檔創建完畢後,你就可以使用如下命令更新Supervisor 配置並啟動進程了:
sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start laravel-worker:*
獲取關於Supervisor 的更多信息,可以查閱Supervisor 文件.
#處理失敗的任務
有時你的佇列化任務會執行失敗。放平心態,好事多磨。 Laravel 包含了一個方便的方法來指定任務應該嘗試的最大次數。如果一個任務已經到達了最大嘗試次數,它就會被插入到 failed_jobs
資料庫表中。要建立failed_jobs
資料庫遷移表,你可以使用queue:failed-table
指令:
php artisan queue:failed-table php artisan migrate
然後,當你執行queue worker,你應該使用queue:work
指令中的--tries
開關指定應嘗試執行任務的最大次數。如果沒有為--tries
選項指定值,則將死循環嘗試執行任務:
php artisan queue:work redis --tries=3##任務失敗後清理你可以直接在任務類別中定義
failed 方法,讓你在任務失敗時執行針對於該任務的清理工作。這是向用戶發送警報或恢復任務執行的任何操作的絕佳位置。導致任務失敗的
Exception 將會傳遞給
failed 方法:
<?php namespace App\Jobs; use Exception;use App\Podcast; use App\AudioProcessor; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; class ProcessPodcast implements ShouldQueue{ use InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * 创建任务实例 * * @param Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * 执行任务 * * @param AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // 上传播客…… } /** * 任务失败的处理过程 * * @param Exception $exception * @return void */ public function failed(Exception $exception) { // 给用户发送任务失败的通知,等等…… } }
如果你想在任務失敗時註冊一個可呼叫的事件,你可以使用
Queue::failing 方法。該事件是透過 email 或 Slack
通知你團隊的絕佳時機。例如,我們可以在Laravel 中的AppServiceProvider 中附加一個回呼事件:<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;
class AppServiceProvider extends ServiceProvider{
/**
* 启动任意服务。
*
* @return void
*/
public function boot()
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
/**
* 注册服务提供者。
*
* @return void
*/
public function register() {
//
}
}
要想查看所有被放入
failed_jobs 資料表中的任務,你可以使用Artisan 指令queue:failed
:php artisan queue:failed
指令會列出任務ID ,佇列,以及失敗的時間。任務 ID 可能會被用於重試失敗的任務。例如,要重試一個任務ID 為5
的任務,使用以下指令:
要重試所有失敗的任務,執行php artisan queue:retry 5
指令,將all
作為ID 傳入:
如果你想要刪除一個失敗的任務,使用php artisan queue:retry all
指令:
要清空所有失敗的任務,使用php artisan queue:forget 5
指令:php artisan queue:flush
忽略缺少的模型
在向任務中註入 Eloquent 模型時,模型被放入佇列前將被自動序列化並在執行任務時還原。但是,如果在任務等待執行時刪除了模型,任務可能會失敗並拋出 ModelNotFoundException
。
為了方便,你可以選擇設定任務的 deleteWhenMissingModels
屬性為 true
來自動地刪除缺失模型的任務。
/** * 如果模型缺失即删除任务。 * * @var bool */ public $deleteWhenMissingModels = true;
任務事件
透過在Queue
facade 中使用before
和after
方法,你可以指定一個佇列任務被執行前後的回呼。這些回調是添加額外的日誌或增加統計的絕佳時機。通常,你應該在 服務提供者中呼叫這些方法。例如,我們可以使用Laravel 的AppServiceProvider
:
<?php namespace App\Providers; use Illuminate\Support\Facades\Queue; use Illuminate\Support\ServiceProvider; use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobProcessing; class AppServiceProvider extends ServiceProvider{ /** * 引导启动任意应用服务。 * * @return void */ public function boot() { Queue::before(function (JobProcessing $event) { // $event->connectionName // $event->job // $event->job->payload() }); Queue::after(function (JobProcessed $event) { // $event->connectionName // $event->job // $event->job->payload() }); } /** * 注册服务提供者。 * * @return void */ public function register() { // } }
在Queue
facade 使用looping
方法可以在處理器嘗試取得任務之前執行回調。例如,你也許想用一個閉包來回滾之前失敗的任務尚未關閉的事務:
Queue::looping(function () { while (DB::transactionLevel() > 0) { DB::rollBack(); } });