Maison >développement back-end >Tutoriel Python >Garantir une transformation équitable du céleri - Partie II
Cet article explore les priorités des tâches dans le céleri, en s'appuyant sur l'article précédent sur le traitement équitable. Les priorités des tâches offrent un moyen d'améliorer l'équité et l'efficacité du traitement en arrière-plan en attribuant différents niveaux de priorité aux tâches en fonction de critères personnalisés.
La priorité au niveau des tâches offre un contrôle précis sur l'exécution des tâches sans mise en œuvre complexe. En soumettant toutes les tâches à une seule file d'attente avec des valeurs de priorité attribuées, les travailleurs peuvent traiter les tâches en fonction de leur urgence. Cela garantit un traitement équitable quel que soit le délai de soumission.
Par exemple, si un locataire soumet 100 tâches et qu'un autre en soumet 5 peu de temps après, la priorité au niveau des tâches empêche le deuxième locataire d'attendre que les 100 tâches soient terminées.
Cette approche attribue dynamiquement la priorité en fonction du nombre de tâches d'un locataire. La première tâche de chaque locataire commence avec une priorité élevée, mais toutes les 10 tâches simultanées, la priorité diminue. Cela garantit que les locataires ayant moins de tâches ne subissent pas de retards inutiles.
Tout d'abord, installez Celery et Redis :
pip install celery redis
Configurez Celery pour utiliser Redis comme courtier et activer le traitement des tâches basé sur la priorité :
from celery import Celery app = Celery( "tasks", broker="redis://localhost:6379/0", broker_connection_retry_on_startup=True, ) app.conf.broker_transport_options = { "priority_steps": list(range(10)), "sep": ":", "queue_order_strategy": "priority", }
Définissez une méthode pour calculer la priorité dynamique à l'aide de Redis pour mettre en cache le nombre de tâches de chaque locataire :
import redis redis_client = redis.StrictRedis(host="localhost", port=6379, db=1) def calculate_priority(tenant_id): """ Calculate task priority based on the number of tasks for the tenant. """ key = f"tenant:{tenant_id}:task_count" task_count = int(redis_client.get(key) or 0) return min(10, task_count // 10)
Créez une classe de tâches personnalisée pour diminuer le nombre de tâches une fois terminées avec succès :
from celery import Task class TenantAwareTask(Task): def on_success(self, retval, task_id, args, kwargs): tenant_id = kwargs.get("tenant_id") if tenant_id: key = f"tenant:{tenant_id}:task_count" redis_client.decr(key, 1) return super().on_success(retval, task_id, args, kwargs) @app.task(name="tasks.send_email", base=TenantAwareTask) def send_email(tenant_id, task_data): """ Simulate sending an email. """ sleep(1) key = f"tenant:{tenant_id}:task_count" task_count = int(redis_client.get(key) or 0) logger.info("Tenant %s tasks: %s", tenant_id, task_count)
Déclenchez des tâches pour différents locataires, en vous assurant que le tenant_id est inclus dans les arguments de mot-clé de la tâche :
if __name__ == "__main__": tenant_id = 1 for _ in range(100): priority = calculate_priority(tenant_id) key = f"tenant:{tenant_id}:task_count" redis_client.incr(key, 1) send_email.apply_async( kwargs={"tenant_id": tenant_id, "task_data": {}}, priority=priority ) tenant_id = 2 for _ in range(10): priority = calculate_priority(tenant_id) key = f"tenant:{tenant_id}:task_count" redis_client.incr(key, 1) send_email.apply_async( kwargs={"tenant_id": tenant_id, "task_data": {}}, priority=priority )
Vous pouvez voir le code complet ici.
Démarrez le travailleur Céleri et déclenchez les tâches :
# Run the worker celery -A tasks worker --loglevel=info # Trigger the tasks python tasks.py
Cette configuration montre comment la file d'attente prioritaire de Celery, combinée à Redis, garantit un traitement équitable des tâches en ajustant dynamiquement les priorités en fonction de l'activité des locataires. Voyons une sortie simplifiée du travailleur :
La priorité au niveau des tâches avec Celery et Redis fournit une solution robuste pour garantir un traitement équitable dans les systèmes multi-locataires. En attribuant dynamiquement des priorités et en exploitant une file d'attente unique, vous pouvez maintenir la simplicité tout en répondant aux exigences de l'entreprise.
Il existe de nombreuses façons d'implémenter la priorité au niveau des tâches, l'utilisation de RabbitMQ par exemple est plus efficace car elle prend en charge la priorité à la base, mais comme nous utilisons également Redis pour le comptage des tâches, cela simplifie notre architecture globale.
J'espère que cela vous sera utile et à voir pour le prochain !
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!