Home  >  Article  >  Backend Development  >  Lua script analysis of Laravel delay queue implementation

Lua script analysis of Laravel delay queue implementation

不言
不言Original
2018-04-16 14:29:122456browse

The main content of this article is about the Lua script analysis of Laravel delay queue implementation. It has a certain reference value. Now I share it with you. Friends in need can refer to it.

Laravel implements Redis delay Lua scripts are used in queues to ensure the atomicity of operations between different queues
In Laravel5.1, four Lua script methods are mainly used to ensure the atomicity of different queue operations

1. Count the number of queue tasks Method

1.llen counts the number of list queues

2.zcard counts the amount of zset queue data

    /**
     * Get the Lua script for computing the size of queue.
     *
     * KEYS[1] - The name of the primary queue
     * KEYS[2] - The name of the "delayed" queue
     * KEYS[3] - The name of the "reserved" queue
     *
     * @return string
     */
    public static function size()
    {
        return <<<&#39;LUA&#39;
             return redis.call(&#39;llen&#39;, KEYS[1]) + redis.call(&#39;zcard&#39;, KEYS[2]) + 
             redis.call(&#39;zcard&#39;, KEYS[3])
        LUA;
    }

2. Put the pop queue task into the reserved queue

    /**
     * Get the Lua script for popping the next job off of the queue.
     *
     * KEYS[1] - The queue to pop jobs from, for example: queues:foo
     * KEYS[2] - The queue to place reserved jobs on, for example: queues:foo:reserved
     * ARGV[1] - The time at which the reserved job will expire
     *
     * @return string
     */
    public static function pop()
    {
      return <<<&#39;LUA&#39;
          -- Pop the first job off of the queue...
         local job = redis.call(&#39;lpop&#39;, KEYS[1])
         local reserved = false

         if(job ~= false) then
            -- Increment the attempt count and place job on the reserved queue...
            reserved = cjson.decode(job)
            reserved[&#39;attempts&#39;] = reserved[&#39;attempts&#39;] + 1
            reserved = cjson.encode(reserved)
            redis.call(&#39;zadd&#39;, KEYS[2], ARGV[1], reserved)
         end
        return {job, reserved}
      LUA;
    }

3. Add the tasks from the reserved queue to the delayed queue

    /**
     * Get the Lua script for releasing reserved jobs.
     *
     * KEYS[1] - The "delayed" queue we release jobs onto, for example: queues:foo:delayed
     * KEYS[2] - The queue the jobs are currently on, for example: queues:foo:reserved
     * ARGV[1] - The raw payload of the job to add to the "delayed" queue
     * ARGV[2] - The UNIX timestamp at which the job should become available
     *
     * @return string
     */
    public static function release()
    {
        return <<<&#39;LUA&#39;
           -- Remove the job from the current queue...
           redis.call(&#39;zrem&#39;, KEYS[2], ARGV[1])
           -- Add the job onto the "delayed" queue...
           redis.call(&#39;zadd&#39;, KEYS[1], ARGV[2], ARGV[1])
           return true
        LUA;
    }

4. Merge the tasks that meet the reserved queue time into the execution queue

    /**
     * Get the Lua script to migrate expired jobs back onto the queue.
     *
     * KEYS[1] - The queue we are removing jobs from, for example: queues:foo:reserved
     * KEYS[2] - The queue we are moving jobs to, for example: queues:foo
     * ARGV[1] - The current UNIX timestamp
     *
     * @return string
     */
    public static function migrateExpiredJobs()
    {
        return <<<&#39;LUA&#39;
        -- Get all of the jobs with an expired "score"...
           local val = redis.call(&#39;zrangebyscore&#39;, KEYS[1], &#39;-inf&#39;, ARGV[1])

        -- If we have values in the array, we will remove them from the first queue
        -- and add them onto the destination queue in chunks of 100, which moves
        -- all of the appropriate jobs onto the destination queue very safely.
           if(next(val) ~= nil) then
             redis.call(&#39;zremrangebyrank&#39;, KEYS[1], 0, #val - 1)

             for i = 1, #val, 100 do
               redis.call(&#39;rpush&#39;, KEYS[2], unpack(val, i, math.min(i+99, #val)))
             end
           end
          return val
        LUA;
}

                           

The above is the detailed content of Lua script analysis of Laravel delay queue implementation. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn