Maison  >  Article  >  développement back-end  >  Analyse du script Lua de l'implémentation de la file d'attente de retard Laravel

Analyse du script Lua de l'implémentation de la file d'attente de retard Laravel

不言
不言original
2018-04-16 14:29:122498parcourir

Le contenu principal de cet article concerne l'analyse du script Lua de l'implémentation de la file d'attente de retard de Laravel. Il a une certaine valeur de référence. Maintenant, je le partage avec vous. Les amis dans le besoin peuvent s'y référer

Laravel est en train de l'implémenter. Redis delay Les scripts Lua sont utilisés dans les files d'attente pour assurer l'atomicité des opérations entre les différentes files d'attente
Dans Laravel 5.1, quatre méthodes de script Lua sont principalement utilisées pour assurer l'atomicité des opérations dans les différentes files d'attente

1. Comptez le nombre de tâches de file d'attente Méthode

1.llen compte le nombre de files d'attente de liste

2.zcard compte la quantité de données de file d'attente zset

    /**
     * Get the Lua script for computing the size of queue.
     *
     * KEYS[1] - The name of the primary queue
     * KEYS[2] - The name of the "delayed" queue
     * KEYS[3] - The name of the "reserved" queue
     *
     * @return string
     */
    public static function size()
    {
        return <<<&#39;LUA&#39;
             return redis.call(&#39;llen&#39;, KEYS[1]) + redis.call(&#39;zcard&#39;, KEYS[2]) + 
             redis.call(&#39;zcard&#39;, KEYS[3])
        LUA;
    }

2. mettre la tâche en file d'attente dans la file d'attente réservée

    /**
     * Get the Lua script for popping the next job off of the queue.
     *
     * KEYS[1] - The queue to pop jobs from, for example: queues:foo
     * KEYS[2] - The queue to place reserved jobs on, for example: queues:foo:reserved
     * ARGV[1] - The time at which the reserved job will expire
     *
     * @return string
     */
    public static function pop()
    {
      return <<<&#39;LUA&#39;
          -- Pop the first job off of the queue...
         local job = redis.call(&#39;lpop&#39;, KEYS[1])
         local reserved = false

         if(job ~= false) then
            -- Increment the attempt count and place job on the reserved queue...
            reserved = cjson.decode(job)
            reserved[&#39;attempts&#39;] = reserved[&#39;attempts&#39;] + 1
            reserved = cjson.encode(reserved)
            redis.call(&#39;zadd&#39;, KEYS[2], ARGV[1], reserved)
         end
        return {job, reserved}
      LUA;
    }

3. Ajouter des tâches de la file d'attente réservée à la file d'attente retardée

    /**
     * Get the Lua script for releasing reserved jobs.
     *
     * KEYS[1] - The "delayed" queue we release jobs onto, for example: queues:foo:delayed
     * KEYS[2] - The queue the jobs are currently on, for example: queues:foo:reserved
     * ARGV[1] - The raw payload of the job to add to the "delayed" queue
     * ARGV[2] - The UNIX timestamp at which the job should become available
     *
     * @return string
     */
    public static function release()
    {
        return <<<&#39;LUA&#39;
           -- Remove the job from the current queue...
           redis.call(&#39;zrem&#39;, KEYS[2], ARGV[1])
           -- Add the job onto the "delayed" queue...
           redis.call(&#39;zadd&#39;, KEYS[1], ARGV[2], ARGV[1])
           return true
        LUA;
    }

4. Fusionner les tâches qui respectent le temps de file d'attente réservé dans la file d'attente d'exécution

    /**
     * Get the Lua script to migrate expired jobs back onto the queue.
     *
     * KEYS[1] - The queue we are removing jobs from, for example: queues:foo:reserved
     * KEYS[2] - The queue we are moving jobs to, for example: queues:foo
     * ARGV[1] - The current UNIX timestamp
     *
     * @return string
     */
    public static function migrateExpiredJobs()
    {
        return <<<&#39;LUA&#39;
        -- Get all of the jobs with an expired "score"...
           local val = redis.call(&#39;zrangebyscore&#39;, KEYS[1], &#39;-inf&#39;, ARGV[1])

        -- If we have values in the array, we will remove them from the first queue
        -- and add them onto the destination queue in chunks of 100, which moves
        -- all of the appropriate jobs onto the destination queue very safely.
           if(next(val) ~= nil) then
             redis.call(&#39;zremrangebyrank&#39;, KEYS[1], 0, #val - 1)

             for i = 1, #val, 100 do
               redis.call(&#39;rpush&#39;, KEYS[2], unpack(val, i, math.min(i+99, #val)))
             end
           end
          return val
        LUA;
}

                                                                                                

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn