The logic of this task is to push content to users. Users need to be retrieved and traversed based on the queue content, and sent through the request backend HTTP interface. For example, if there are 10,000 users, if the number of users is large or the interface processing speed is not that fast, the execution time will definitely be greater than 60 seconds, so the task will be re-added to the queue.
The situation is even worse. If the previous tasks are not executed within 60 seconds, they will be rejoined in the queue, so that the same task will be executed not only once, but multiple times.Let’s find the culprit from the Laravel source code. Source code file: vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php
/** * The expiration time of a job. * * @var int|null */ protected $expire = 60;
This $expire member variable is a fixed value, Laravel thinks that a queue is no matter how 60 It should be completed within seconds. How to get the queue:
public function pop($queue = null) { $original = $queue ?: $this->default; $queue = $this->getQueue($queue); $this->migrateExpiredJobs($queue.':delayed', $queue); if (! is_null($this->expire)) { $this->migrateExpiredJobs($queue.':reserved', $queue); } list($job, $reserved) = $this->getConnection()->eval( LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire ); if ($reserved) { return new RedisJob($this->container, $this, $job, $reserved, $original); } }
There are several steps to get the queue. Because the queue execution fails, or the execution times out, etc., it will be put into another collection and saved for retry. The process is as follows:
1 .Repush the queue that failed due to execution from the delayed collection to the currently executed queue.
2. Re-rpush the queue due to execution timeout from the reserved collection to the currently executed queue.
3. Then take the task from the queue and start executing it, and put the queue into the reserved ordered collection.
The eval command is used here to execute this process, and several lua scripts are used.
Get the task from the queue to be executed:
local job = redis.call('lpop', KEYS[1]) local reserved = false if(job ~= false) then reserved = cjson.decode(job) reserved['attempts'] = reserved['attempts'] + 1 reserved = cjson.encode(reserved) redis.call('zadd', KEYS[2], ARGV[1], reserved) end return {job, reserved}You can see that when Laravel gets the queue to be executed by Redis, it will also put a copy into an ordered collection. , and use the expiration timestamp as the score.
Only when the task is completed, the task will be removed from the ordered set. The code to remove the queue from this ordered collection is omitted. Let's take a look at how Laravel handles queues whose execution time is greater than 60 seconds. This is the operation performed by this Lua script:
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1]) if(next(val) ~= nil) then redis.call('zremrangebyrank', KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val))) end end return true
Here zrangebyscore finds the elements whose scores range from infinitesimal to the current timestamp, that is, the tasks added to the collection 60 seconds ago. These elements are then removed from the set via zremrangebyrank and rpush to the queue.
You should suddenly realize it when you see this.
If a queue has not been executed within 60 seconds, the process will rpush the tasks from the reserved set to the queue again when fetching the queue.
Related recommendations: