Maison >base de données >Redis >Explication détaillée du traitement des tâches asynchrones dans Redis

Explication détaillée du traitement des tâches asynchrones dans Redis

王林
王林original
2023-06-20 08:26:531754parcourir

À mesure que les applications Web continuent de se développer, le besoin de traitement asynchrone des tâches devient de plus en plus important, car nous devons nous assurer que les utilisateurs peuvent continuer à utiliser l'application avant de terminer la tâche. Dans ce cas, à l'exception du traitement des tâches asynchrones, le traitement parallèle multitâche ne peut pas être réalisé, il est donc souvent nécessaire d'utiliser certains outils pour gérer les tâches asynchrones, parmi lesquels Redis est un outil très utile.

Redis est une base de données en mémoire hautes performances qui peut être utilisée pour stocker, lire et exploiter rapidement des données. Son utilisation principale est d'implémenter la mise en cache et la messagerie, mais il peut également être utilisé pour gérer des tâches asynchrones. Redis dispose de fonctionnalités intégrées de mise en file d'attente et de publication/abonnement, ce qui en fait un outil très utile pour le traitement des tâches asynchrones.

Dans cet article, nous présenterons comment utiliser Redis pour implémenter le traitement des tâches asynchrones.

  1. Établir une connexion Redis

Tout d'abord, nous devons utiliser un client Redis pour établir une connexion avec le serveur Redis. Tout client prenant en charge les connexions Redis peut être utilisé. Le redis-py de Python est un très bon choix. Veuillez vous assurer d'installer redis-py globalement :

pip install redis

Ensuite, vous pouvez utiliser la commande suivante pour établir une connexion Redis :

import redis

redis_conn = redis.Redis(host='localhost', port=6379, db=0)

Ici, nous avons créé une instance de connexion Redis nommée redis_conn , cette instance se connectera au serveur Redis local (host='localhost'), le numéro de port est 6379 (port=6379) et utilisera la base de données n°0 (db=0).

  1. Redis Queue

Redis Queue (RQ) est une bibliothèque Python qui utilise Redis comme backend pour implémenter une file d'attente de tâches distribuée. RQ est construit sur les commandes lpush et rpop de Redis, il offre donc de très bonnes performances.

Installer RQ et Redis :

pip install rq redis
  1. Tâche de synchronisation

Dans la tâche de synchronisation, le thread principal s'exécutera tout le code et attendez que la tâche soit terminée. Voici un exemple de code pour une tâche synchronisée :

import time

def task():
    # 等待5秒
    time.sleep(5)
    print('Task complete')

print('Starting task')
task()
print('Task ended')

Dans l'exemple ci-dessus, nous définissons une fonction nommée tâche qui attend 5 secondes puis affiche "Tâche terminée". Ensuite, nous appelons cette tâche dans le thread principal, affichons "Tâche de démarrage", attendons 5 secondes et affichons "Tâche terminée".

Cette approche fonctionne pour les tâches de courte durée, mais pour les tâches de longue durée, elle peut laisser les utilisateurs très mécontents car ils ne peuvent pas utiliser l'application.

Voyons maintenant comment convertir cette tâche en tâche asynchrone.

  1. Tâche asynchrone

L'idée de convertir une tâche en tâche asynchrone est d'exécuter la tâche dans un thread ou un processus séparé et exécutez-le dans le principal. L'autre code continue de s'exécuter dans le thread. De cette façon, l'utilisateur peut continuer à utiliser l'application pendant que les tâches sont exécutées en arrière-plan.

En Python, vous pouvez utiliser des threads ou des processus pour effectuer des tâches en arrière-plan. Mais si plusieurs tâches sont en cours d'exécution, le nombre de threads et de processus peut augmenter, et ils peuvent également développer des problèmes, tels que des blocages et des problèmes de synchronisation.

L'utilisation de Redis peut résoudre ce problème, car Redis a une structure de file d'attente intégrée qui nous permet d'éviter ces problèmes. L'idée de base de l'implémentation de tâches asynchrones dans Redis est de créer une file d'attente de tâches et d'ajouter des tâches à la file d'attente. Créez ensuite un exécuteur de tâches distinct pour placer les tâches dans la file d'attente et les exécuter.

Étant donné que Redis est une base de données en mémoire, vous pouvez l'utiliser pour stocker toutes les données de la file d'attente. De cette façon, nous pouvons stocker l'état des tâches dans Redis et n'avons pas besoin d'utiliser des threads ou des processus pour gérer les tâches.

Ce qui suit est un exemple de code pour une tâche asynchrone :

from rq import Queue
from redis import Redis

redis_conn = Redis()
q = Queue(connection=redis_conn)

def task():
    # 等待5秒
    time.sleep(5)
    print('Task complete')

print('Starting task')
job = q.enqueue(task)
print('Task started')

Dans le code ci-dessus, nous créons d'abord une file d'attente Redis nommée q, puis définissons une tâche nommée function. Lors de l'appel d'une tâche dans le thread principal, nous ajoutons la tâche à la file d'attente à l'aide de la méthode enqueue de l'objet file d'attente. Cette méthode renvoie un objet tâche nommé job, qui représente la tâche dans la file d'attente. Ensuite, nous affichons "Tâche démarrée" et l'exécuteur de file d'attente obtiendra la tâche en arrière-plan et l'exécutera.

  1. Surveillance des tâches

Dans l'exemple précédent, nous pouvons utiliser l'objet job pour surveiller l'état de la tâche et récupérer les résultats. Voici l'exemple de code expliquant comment surveiller une tâche :

from rq import Queue
from redis import Redis

redis_conn = Redis()
q = Queue(connection=redis_conn)

def task():
    # 等待5秒
    time.sleep(5)
    return 'Task complete'

print('Starting task')
job = q.enqueue(task)
print('Task started')

# 检查任务状态并获取结果
while job.result is None:
    print('Task still processing')
    time.sleep(1)

print('Task complete: {}'.format(job.result))

Dans le code ci-dessus, nous vérifions la propriété result de la tâche jusqu'à ce qu'elle ne soit pas vide. Nous affichons ensuite « Tâche terminée : » plus le résultat de l'objet tâche.

  1. Using Publish/Subscribe

Redis prend également en charge le modèle de publication/abonnement (pub/sub), ce qui en fait un message très utile. outils. Dans ce modèle, un éditeur publie des messages sur un sujet et les abonnés s'abonnent au sujet et reçoivent tous les messages sur le sujet.

Prenons comme exemple une tâche asynchrone pour illustrer l'implémentation à l'aide du modèle de publication/abonnement.

Tout d'abord, nous devons créer un identifiant unique pour chaque tâche et ajouter la tâche à la file d'attente. Nous publions ensuite l'ID de la tâche dans le sujet. L'exécuteur de tâches s'abonne au sujet et lorsqu'un ID de tâche est reçu, il récupère la tâche et l'exécute.

Ce qui suit est un exemple de code pour implémenter une tâche asynchrone en utilisant le modèle de publication/abonnement :

from rq import Queue
from redis import Redis
import uuid

redis_conn = Redis()
q = Queue(connection=redis_conn)

# 订阅任务主题并执行任务
def worker():
    while True:
        _, job_id = redis_conn.blpop('tasks')
        job = q.fetch_job(job_id.decode('utf-8'))
        job.perform()

# 发布任务并将其ID添加到队列中
def enqueue_task():
    job = q.enqueue(task)
    redis_conn.rpush('tasks', job.id)

def task():
    # 等待5秒
    time.sleep(5)
    return 'Task complete'

print('Starting workers')
for i in range(3):
    # 创建3个工作线程
    threading.Thread(target=worker).start()

print('Enqueueing task')
enqueue_task()
print('Task enqueued')

Dans le code ci-dessus, nous définissons d'abord un exécuteur de tâche nommé travailleur, ce l'exécuteur continue de boucler et de déplanifier les ID de tâches de la file d'attente. Lorsqu'il obtient l'ID de tâche, il utilise la méthode fetch_job pour obtenir l'objet tâche et l'exécuter.

Nous définissons également une fonction appelée enqueue_task, qui crée une tâche asynchrone nommée job et ajoute son ID à la file d'attente. Nous appelons ensuite cette fonction dans le thread principal et publions l'ID de la tâche dans une rubrique appelée « tâches ». L'exécuteur de la tâche obtiendra la tâche et l'exécutera lorsqu'il recevra l'ID de la tâche.

  1. Summary

Dans cet article, nous avons présenté comment utiliser Redis pour implémenter le traitement des tâches asynchrones. Nous avons utilisé des files d'attente, le modèle de publication/abonnement et la bibliothèque RQ en python, tout en montrant comment convertir des tâches en mode asynchrone et utiliser des tâches asynchrones pour résoudre les problèmes d'expérience utilisateur. Redis est très utile lors de la gestion de tâches asynchrones car il fournit des fonctionnalités intégrées de mise en file d'attente et de publication/abonnement avec de très bonnes performances. Si vous souhaitez rendre votre application Web réactive et mettre en œuvre un traitement de tâches asynchrone, Redis est un bon choix.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn