Maison >développement back-end >Tutoriel Python >Garantir une transformation équitable du céleri - Partie II

Garantir une transformation équitable du céleri - Partie II

Barbara Streisand
Barbara Streisandoriginal
2024-12-28 14:22:10413parcourir

Cet article explore les priorités des tâches dans le céleri, en s'appuyant sur l'article précédent sur le traitement équitable. Les priorités des tâches offrent un moyen d'améliorer l'équité et l'efficacité du traitement en arrière-plan en attribuant différents niveaux de priorité aux tâches en fonction de critères personnalisés.

Pourquoi une priorité au niveau des tâches ?

La priorité au niveau des tâches offre un contrôle précis sur l'exécution des tâches sans mise en œuvre complexe. En soumettant toutes les tâches à une seule file d'attente avec des valeurs de priorité attribuées, les travailleurs peuvent traiter les tâches en fonction de leur urgence. Cela garantit un traitement équitable quel que soit le délai de soumission.

Par exemple, si un locataire soumet 100 tâches et qu'un autre en soumet 5 peu de temps après, la priorité au niveau des tâches empêche le deuxième locataire d'attendre que les 100 tâches soient terminées.

Cette approche attribue dynamiquement la priorité en fonction du nombre de tâches d'un locataire. La première tâche de chaque locataire commence avec une priorité élevée, mais toutes les 10 tâches simultanées, la priorité diminue. Cela garantit que les locataires ayant moins de tâches ne subissent pas de retards inutiles.

Mise en œuvre de la priorité des tâches

Tout d'abord, installez Celery et Redis :

pip install celery redis

Configurez Celery pour utiliser Redis comme courtier et activer le traitement des tâches basé sur la priorité :

from celery import Celery

app = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    broker_connection_retry_on_startup=True,
)

app.conf.broker_transport_options = {
    "priority_steps": list(range(10)),
    "sep": ":",
    "queue_order_strategy": "priority",
}

Définissez une méthode pour calculer la priorité dynamique à l'aide de Redis pour mettre en cache le nombre de tâches de chaque locataire :

import redis

redis_client = redis.StrictRedis(host="localhost", port=6379, db=1)

def calculate_priority(tenant_id):
    """
    Calculate task priority based on the number of tasks for the tenant.
    """
    key = f"tenant:{tenant_id}:task_count"
    task_count = int(redis_client.get(key) or 0)
    return min(10, task_count // 10)

Créez une classe de tâches personnalisée pour diminuer le nombre de tâches une fois terminées avec succès :

from celery import Task

class TenantAwareTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        tenant_id = kwargs.get("tenant_id")

        if tenant_id:
            key = f"tenant:{tenant_id}:task_count"
            redis_client.decr(key, 1)

        return super().on_success(retval, task_id, args, kwargs)

@app.task(name="tasks.send_email", base=TenantAwareTask)
def send_email(tenant_id, task_data):
    """
    Simulate sending an email.
    """
    sleep(1)
    key = f"tenant:{tenant_id}:task_count"
    task_count = int(redis_client.get(key) or 0)
    logger.info("Tenant %s tasks: %s", tenant_id, task_count)

Déclenchez des tâches pour différents locataires, en vous assurant que le tenant_id est inclus dans les arguments de mot-clé de la tâche :

if __name__ == "__main__":
    tenant_id = 1
    for _ in range(100):
        priority = calculate_priority(tenant_id)
        key = f"tenant:{tenant_id}:task_count"
        redis_client.incr(key, 1)
        send_email.apply_async(
            kwargs={"tenant_id": tenant_id, "task_data": {}}, priority=priority
        )


    tenant_id = 2
    for _ in range(10):
        priority = calculate_priority(tenant_id)
        key = f"tenant:{tenant_id}:task_count"
        redis_client.incr(key, 1)
        send_email.apply_async(
            kwargs={"tenant_id": tenant_id, "task_data": {}}, priority=priority
        )

Vous pouvez voir le code complet ici.

Démarrez le travailleur Céleri et déclenchez les tâches :

# Run the worker
celery -A tasks worker --loglevel=info

# Trigger the tasks
python tasks.py

Cette configuration montre comment la file d'attente prioritaire de Celery, combinée à Redis, garantit un traitement équitable des tâches en ajustant dynamiquement les priorités en fonction de l'activité des locataires. Voyons une sortie simplifiée du travailleur :

Ensuring Fair Processing with Celery - Part II

Conclusion

La priorité au niveau des tâches avec Celery et Redis fournit une solution robuste pour garantir un traitement équitable dans les systèmes multi-locataires. En attribuant dynamiquement des priorités et en exploitant une file d'attente unique, vous pouvez maintenir la simplicité tout en répondant aux exigences de l'entreprise.

Il existe de nombreuses façons d'implémenter la priorité au niveau des tâches, l'utilisation de RabbitMQ par exemple est plus efficace car elle prend en charge la priorité à la base, mais comme nous utilisons également Redis pour le comptage des tâches, cela simplifie notre architecture globale.

J'espère que cela vous sera utile et à voir pour le prochain !

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn