Maison  >  Article  >  base de données  >  Comment utiliser Redis pour implémenter des tâches planifiées distribuées

Comment utiliser Redis pour implémenter des tâches planifiées distribuées

王林
王林original
2023-11-07 11:05:081308parcourir

Comment utiliser Redis pour implémenter des tâches planifiées distribuées

Redis est un excellent système de stockage clé-valeur qui a de nombreuses autres utilisations en plus d'être utilisé comme cache. L'un d'eux est un outil de mise en œuvre pour les tâches planifiées distribuées. Dans cet article, nous présenterons comment utiliser Redis pour implémenter des tâches planifiées distribuées et fournirons des exemples de code correspondants.

Qu'est-ce qu'une tâche planifiée distribuée ?

Dans un environnement autonome, nous pouvons utiliser des tâches planifiées pour exécuter régulièrement une certaine fonction ou tâche. Dans un environnement distribué, chaque nœud aura ses propres tâches planifiées et des problèmes tels qu'une exécution répétée et une exécution manquée peuvent survenir. Par conséquent, les tâches planifiées distribuées doivent prendre en compte des problèmes tels que la fiabilité de l’exécution des tâches, la répartition et la coordination des tâches.

Redis comme outil d'implémentation pour les tâches planifiées distribuées

Redis fournit des structures de données et des commandes qui peuvent bien prendre en charge les tâches planifiées distribuées, telles que :

  • Ensemble trié (ensemble ordonné) : vous pouvez suivre le score (score) ) trier et enregistrer le temps d'exécution de la tâche à travers des scores.
  • commande expire : vous pouvez définir le délai d'expiration d'une certaine clé.
  • Script Lua : plusieurs commandes peuvent être exécutées dans des opérations atomiques pour garantir l'atomicité et la fiabilité des opérations.

Ensuite, nous présenterons comment utiliser Redis pour implémenter des tâches planifiées distribuées et fournirons des exemples de code.

Étapes de mise en œuvre

1. Stocker les informations sur les tâches dans l'ensemble trié de Redis

Tout d'abord, nous devons stocker les informations sur les tâches dans l'ensemble trié de Redis. Ici, nous pouvons avoir l'heure d'exécution de la tâche (horodatage) comme score et l'ID de la tâche en tant que membre. Voici un exemple de code :

import redis

# Connect to Redis
redis_conn = redis.Redis(host='localhost', port=6379, db=0)

# Add task to Sorted Set
task_id = "task_001"
execute_time = 1600000000  # timestamp (in seconds)
redis_conn.zadd("tasks", {task_id: execute_time})

Dans le code ci-dessus, nous avons exécuté une tâche nommée task_001, et le temps d'exécution était 1600000000 (ici exprimé par timestamp , en fait cela peut aussi s'exprimer d'autres manières). Stockez-le dans un ensemble trié nommé tâches. task_001的任务,执行时间为1600000000 (这里是用时间戳来表示的,实际上也可以使用其他方式来表示)。将它存入名为tasks的Sorted Set中。

2. 设置过期时间

为了避免过期任务一直存在Redis中占用空间,我们需要设置过期时间,并在过期后从Sorted Set中删除。下面是一个示例代码:

import time

# Check for expired tasks every 10 seconds
while True:
    # Get all tasks with score less than current time
    tasks = redis_conn.zrangebyscore("tasks", 0, int(time.time()))

    # Delete expired tasks
    for task in tasks:
        redis_conn.zrem("tasks", task)

以上代码中,我们每隔10秒检查一次过期任务并删除。为此,我们使用了zrangebyscore命令,获取分数在0(即当前时间) 至 time.time()(当前时间戳)之间的任务。在获取到任务后,我们使用了zrem命令,从Sorted set中删除任务。

3. 执行任务

在检查过期任务时,我们同时也要执行这些过期任务。下面是一个示例代码:

import uuid

# Consume tasks every 10 seconds
while True:
    # Get all tasks with score less than current time
    tasks = redis_conn.zrangebyscore("tasks", 0, int(time.time()))

    # Execute tasks
    for task in tasks:
        # Check if task is already being executed by another worker
        lock_id = redis_conn.get("lock_" + task)
        if lock_id is None:
            # Lock task using Lua script
            lock_id = str(uuid.uuid4())
            lua_script = """
                if redis.call("get", ARGV[1]) == false then
                    redis.call("set", ARGV[1], ARGV[2])
                    redis.call("expire", ARGV[1], 60)
                    return true
                else
                    return false
                end
            """
            if redis_conn.eval(lua_script, 0, "lock_" + task, lock_id) is True:
                # Execute task
                print("Executing task " + task)
                # task.execute()
                # ...

                # Remove task from Sorted Set and unlock
                redis_conn.zrem("tasks", task)
                redis_conn.delete("lock_" + task)

以上代码中,我们每隔10秒检查一次过期任务并执行。为此,我们使用了zrangebyscore命令,获取分数在0(即当前时间) 至 time.time()

2. Définir le délai d'expiration

Afin d'éviter que les tâches expirées n'occupent toujours de l'espace dans Redis, nous devons définir le délai d'expiration et les supprimer de l'ensemble trié après l'expiration. Voici un exemple de code :

rrreee

Dans le code ci-dessus, nous vérifions les tâches expirées toutes les 10 secondes et les supprimons. Pour ce faire, nous utilisons la commande zrangebyscore pour obtenir le score entre 0 (c'est-à-dire l'heure actuelle) et time.time() (l'heure actuelle). horodatage actuel) entre les tâches. Après avoir obtenu la tâche, nous avons utilisé la commande zrem pour supprimer la tâche de l'ensemble trié. 🎜🎜3. Exécuter des tâches🎜🎜Lors de la vérification des tâches expirées, nous devons également exécuter ces tâches expirées en même temps. Voici un exemple de code : 🎜rrreee🎜Dans le code ci-dessus, nous vérifions les tâches expirées toutes les 10 secondes et les exécutons. Pour ce faire, nous utilisons la commande zrangebyscore pour obtenir le score entre 0 (c'est-à-dire l'heure actuelle) et time.time() (l'heure actuelle). horodatage actuel) entre les tâches. Après avoir obtenu la tâche, nous vérifions d'abord si la tâche est exécutée par un autre processus. Afin d'éviter que plusieurs processus exécutent la même tâche en même temps, nous utilisons un lock_id pour identifier si la tâche a été verrouillée. Si la tâche n'est pas verrouillée, nous utilisons un script Lua pour acquérir le verrou. Après avoir acquis le verrou, nous effectuons l'opération de tâche correspondante, supprimons la tâche de l'ensemble trié et enfin libérons le verrou. 🎜🎜Résumé🎜🎜Cet article explique comment utiliser Redis pour implémenter des tâches planifiées distribuées et fournit des exemples de code correspondants. En utilisant les fonctions Redis telles que Sorted Set, la commande expire et le script Lua, nous pouvons implémenter un système de tâches planifiées distribuées hautement fiable et efficace. Bien entendu, le code ci-dessus doit encore être amélioré et optimisé pour répondre aux différents besoins et scénarios. 🎜

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn