首頁  >  文章  >  後端開發  >  原始碼分析 Laravel 重複執行同一個佇列任務的原因講解

原始碼分析 Laravel 重複執行同一個佇列任務的原因講解

jacklove
jacklove原創
2018-07-04 17:58:401759瀏覽

laravel 的佇列服務對各種不同的後台佇列服務提供了統一的API,以下這篇文章透過原始碼分析給大家介紹了關於Laravel 重複執行同一個佇列任務的原因,文中透過範例程式碼介紹的非常詳細,需要的朋友可以參考借鑒,下面來一起看看吧。

前言

laravel 的佇列服務對各種不同的後台佇列服務提供了統一的 API。佇列允許你延遲執行消耗時間的任務,例如發送一封郵件。這樣可以有效的降低請求回應的時間。

發現問題

在Laravel 中使用Redis 處理佇列任務,框架提供的功能非常強大,但最近遇到一個問題,就是發現一個任務被多次執行,這是為什麼?

先說原因:

因為Laravel 中如果一個佇列(任務)執行時間大於60 秒,就會被認為執行失敗並重新加入佇列中,這樣就會導致重複執行同一個任務。

這個任務的邏輯就是給使用者推送內容,需要根據佇列內容取出使用者並遍歷,透過請求後端 HTTP 介面發送。例如有 10,000 個用戶,在用戶數量多或介面處理速度沒那麼快的情況下,執行時間一定會大於 60 秒,於是這個任務就被重新加入佇列。情況更糟一點,前面的任務如果都沒有在 60 秒執行完,就都會重新加入隊列,這樣同一個任務就不止重複執行一次了,而是多次。

下面從 Laravel 原始碼找一下罪魁禍首。

原始碼檔案:vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php

##

/**
 * The expiration time of a job.
 *
 * @var int|null
 */
protected $expire = 60;

#這個$expire 成員變數是固定的值,Laravel 認為一個佇列再怎麼60 秒也該執行完了吧。取佇列方法:

public function pop($queue = null)
{
 $original = $queue ?: $this->default; 
 $queue = $this->getQueue($queue); 
 $this->migrateExpiredJobs($queue.':delayed', $queue); 
 if (! is_null($this->expire)) {
  $this->migrateExpiredJobs($queue.':reserved', $queue);
 } 
 list($job, $reserved) = $this->getConnection()->eval(
  LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire
 ); 
 if ($reserved) {
  return new RedisJob($this->container, $this, $job, $reserved, $original);
 }
}

取佇列有幾個步驟操作,因為佇列執行失敗,或執行逾時等都會放入另外的集合保存起來,以便重試,流程如下:

    1.把因執行失敗的佇列從delayed 集合重新rpush 到目前執行的佇列。

    2.把執行逾時的佇列從 reserved 集合重新 rpush 到目前執行的佇列。

    3.然後才從佇列中取任務開始執行,同時把佇列放入 reserved 的有序集合。

這裡使用了 eval 指令執行這個過程,用到了幾個 lua 腳本。

從要執行的佇列中取任務:

local job = redis.call('lpop', KEYS[1])
local reserved = false
if(job ~= false) then
 reserved = cjson.decode(job)
 reserved['attempts'] = reserved['attempts'] + 1
 reserved = cjson.encode(reserved)
 redis.call('zadd', KEYS[2], ARGV[1], reserved)
end
return {job, reserved}

可以看到Laravel 在取Redis 要執行的佇列的時候,同時會放一份到一個有序集合中,並使用過期時間戳作為分數。

只有當這個任務完成後,再把有序集合中這個任務移除。從這個有序集合移除佇列的程式碼就省略,我們來看看 Laravel 如何處理執行時間大於 60 秒的佇列。

也就是這段lua 腳本執行的動作:

#

local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
if(next(val) ~= nil) then
 redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
 for i = 1, #val, 100 do
  redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
 end
end
return true

這裡zrangebyscore 找出分數從無限小到目前時間戳記的元素,也就是60 秒之前加入到集合的任務,然後透過zremrangebyrank 從集合移除這些元素並rpush 到佇列中。

看到這裡應該就恍然大悟了。

如果一個佇列 60 秒沒執行完,那麼進程在取佇列的時候從 reserved 集合中把這些任務又重新 rpush 到佇列。

總結

您可能感興趣的文章:

關於Laravel Redis多個進程同時取佇列的問題詳解

php-msf原始碼的詳解

thinkphp5 URL和路由的功能詳解與實例講解

#

以上是原始碼分析 Laravel 重複執行同一個佇列任務的原因講解的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn