Maison >développement back-end >Tutoriel Python >Garantir une transformation équitable du céleri - Partie I

Garantir une transformation équitable du céleri - Partie I

Mary-Kate Olsen
Mary-Kate Olsenoriginal
2024-11-16 09:10:03664parcourir

Ensuring Fair Processing with Celery — Part I

Si vous connaissez Python, il y a de fortes chances que vous ayez entendu parler du Céleri. C'est souvent le choix idéal pour gérer des tâches de manière asynchrone, comme le traitement d'images ou l'envoi d'e-mails.

En discutant avec certaines personnes, j'ai commencé à remarquer que de nombreux développeurs trouvent Celery impressionnant au début, mais à mesure que l'ampleur et la complexité de leurs projets augmentent, leur enthousiasme commence à s'estomper. Alors que certains s'éloignent du céleri pour des raisons légitimes, d'autres peuvent tout simplement ne pas en explorer suffisamment le cœur pour l'adapter à leurs besoins.

Dans ce blog, je souhaite aborder l'une des raisons pour lesquelles certains développeurs commencent à rechercher des alternatives ou même à créer des frameworks de travail en arrière-plan personnalisés : le traitement équitable. Dans les environnements où les utilisateurs/locataires soumettent des tâches de tailles variables, le risque que la lourde charge de travail d’un locataire affecte les autres peut créer des goulots d’étranglement et conduire à de la frustration.

Je vais vous expliquer des stratégies pour mettre en œuvre un traitement équitable du céleri, garantissant une répartition équilibrée des tâches afin qu'aucun locataire ne puisse dominer vos ressources.

Le problème

Plongeons-nous dans un défi courant auquel sont confrontées les applications multi-tenant, en particulier celles qui gèrent le traitement par lots. Imaginez que vous disposiez d'un système dans lequel les utilisateurs peuvent mettre en file d'attente leurs tâches de traitement d'images, leur permettant de recevoir leurs images traitées après une brève attente. Cette configuration permet non seulement de maintenir la réactivité de votre API, mais vous permet également de faire évoluer vos collaborateurs selon vos besoins pour gérer la charge efficacement.

Tout se passe bien, jusqu'à ce qu'un locataire décide de soumettre un énorme lot d'images pour traitement. Vous disposez de plusieurs travailleurs en place, et ils peuvent même évoluer automatiquement pour répondre à une demande accrue, vous avez donc confiance en votre infrastructure. Cependant, les problèmes commencent lorsque d'autres locataires tentent de mettre en file d'attente des lots plus petits (peut-être juste quelques images) et se retrouvent soudainement confrontés à de longs délais d'attente sans aucune mise à jour. Avant que vous ne vous en rendiez compte, les tickets d'assistance commencent à affluer, les utilisateurs se plaignant que votre service est lent, voire ne répond pas.

Ce scénario est trop courant car Celery, par défaut, traite les tâches dans l'ordre dans lequel elles sont reçues. Lorsqu'un locataire submerge vos collaborateurs avec un afflux massif de tâches, même les meilleures stratégies d'autoscaling peuvent ne pas suffire à éviter les retards pour les autres locataires. En conséquence, ces utilisateurs peuvent bénéficier de niveaux de service inférieurs à ceux promis ou attendus.

Limitation du taux avec le céleri

Une stratégie efficace pour garantir un traitement équitable consiste à mettre en œuvre des limites de taux. Il vous permet de contrôler le nombre de tâches que chaque locataire peut soumettre dans un délai précis. Cela empêche un seul locataire de monopoliser vos travailleurs et garantit que tous les locataires ont une chance équitable d'accomplir leurs tâches.

Le céleri dispose d'une fonctionnalité intégrée pour limiter le débit au niveau des tâches :

# app.py
from celery import Celery

app = Celery("app", broker="redis://localhost:6379/0")

@app.task(rate_limit="10/m") # Limit to 10 tasks per minute
def process_data(data):
    print(f"Processing data: {data}")

# Call the task
if __name__ == "__main__":
    for i in range(20):
        process_data.delay(f"data_{i}")

Vous pouvez exécuter le travailleur en exécutant :

celery -A app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1

Maintenant, exécutez le script app.py pour déclencher 20 tâches :

python app.py

Si vous parvenez à l'exécuter localement, vous remarquerez qu'il y a un délai entre chaque tâche pour garantir que la limite de débit est appliquée. Maintenant, vous pensez probablement que cela ne nous aide pas vraiment à résoudre notre problème, et vous avez tout à fait raison. Cette limite de débit intégrée par Celery est utile pour les scénarios dans lesquels notre tâche peut impliquer des appels vers des services externes ayant des limites de débit strictes.

Cet exemple montre à quel point la fonctionnalité intégrée peut être trop simple pour des scénarios complexes. Cependant, nous pouvons surmonter cette limitation en explorant plus en profondeur le cadre de Celery. Voyons comment nous pouvons définir une limite de débit appropriée avec une nouvelle tentative automatique par locataire.

Nous utiliserons Redis pour suivre la limite de débit par locataire. Redis est une base de données et un courtier populaires pour Celery, alors exploitons ce composant qui est probablement déjà dans votre pile.

Importons quelques bibliothèques :

import time
import redis
from celery import Celery, Task

Nous allons maintenant implémenter une classe de tâches de base personnalisée pour notre tâche à taux limité :

# Initialize a Redis client
redis_client = redis.StrictRedis(host="localhost", port=6379, db=0)

class RateLimitedTask(Task):
    def __init__(self, *args, **kwargs):
        # Set default rate limit
        if not hasattr(self, "custom_rate_limit"):
            self.custom_rate_limit = 10

        super().__init__(*args, **kwargs)

    def __call__(self, tenant_id, *args, **kwargs):
        # Rate limiting logic
        key = f"rate_limit:{tenant_id}:{self.name}"

        # Increment the count for this minute
        current_count = redis_client.incr(key)

        if current_count == 1:
            # Set expiration for the key if it's the first request
            redis_client.expire(key, 10)

        if current_count > self.custom_rate_limit:
            print(f"Rate limit exceeded for tenant {tenant_id}. Retrying...")
            raise self.retry(countdown=10)

        return super().__call__(tenant_id, *args, **kwargs)

Cette classe personnalisée suivra le nombre de tâches déclenchées par un locataire spécifique à l'aide de Redis et définira un TTL de 10 secondes. Si la limite de débit est dépassée, la tâche sera réessayée dans 10 secondes. Donc, fondamentalement, notre limite de débit par défaut est de 10 tâches en 10 secondes.

Définissons un exemple de tâche qui émule le traitement :

@app.task(base=RateLimitedTask, custom_rate_limit=5)
def process(tenant_id: int, data):
    """
    Mock processing task that takes 0.3 seconds to complete.
    """
    print(f"Processing data: {data} for tenant: {tenant_id}")
    time.sleep(0.3)

Ici, nous avons défini une tâche de processus et vous pouvez voir que je peux modifier le custom_rate_limit au niveau de la tâche. Si nous ne spécifions pas de custom_rate_limit, la valeur par défaut de 10 serait attribuée. Désormais, notre limite de taux est passée à 5 tâches en 10 secondes.

Déclenchons maintenant quelques tâches pour différents locataires :

if __name__ == "__main__":
    for i in range(20):
        process.apply_async(args=(1, f"data_{i}"))

    for i in range(10):
        process.apply_async(args=(2, f"data_{i}"))

Nous définissons 20 tâches pour le locataire ID 1 et 10 tâches pour le locataire ID 2.

Notre code complet ressemblerait donc à ceci :

# app.py
import time
import redis
from celery import Celery, Task

app = Celery(
    "app",
    broker="redis://localhost:6379/0",
    broker_connection_retry_on_startup=False,
)

# Initialize a Redis client
redis_client = redis.StrictRedis(host="localhost", port=6379, db=0)


class RateLimitedTask(Task):
    def __init__(self, *args, **kwargs):
        if not hasattr(self, "custom_rate_limit"):
            self.custom_rate_limit = 10

        super().__init__(*args, **kwargs)

    def __call__(self, tenant_id, *args, **kwargs):
        # Rate limiting logic
        key = f"rate_limit:{tenant_id}:{self.name}"

        # Increment the count for this minute
        current_count = redis_client.incr(key)

        if current_count == 1:
            # Set expiration for the key if it's the first request
            redis_client.expire(key, 10)

        if current_count > self.custom_rate_limit:
            print(f"Rate limit exceeded for tenant {tenant_id}. Retrying...")
            raise self.retry(countdown=10)

        return super().__call__(tenant_id, *args, **kwargs)


@app.task(base=RateLimitedTask, custom_rate_limit=5)
def process(tenant_id: int, data):
    """
    Mock processing task that takes 0.3 seconds to complete.
    """
    print(f"Processing data: {data} for tenant: {tenant_id}")
    time.sleep(0.3)

if __name__ == "__main__":
    for i in range(20):
        process.apply_async(args=(1, f"data_{i}"))

    for i in range(10):
        process.apply_async(args=(2, f"data_{i}"))

Exécutons notre travailleur :

celery -A app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1

Maintenant, exécutez le script app.py pour déclencher les tâches :

python app.py

Comme vous pouvez le voir, le travailleur traite 5 tâches du premier locataire et met en place une nouvelle tentative pour toutes les autres tâches. Il prend ensuite 5 tâches du deuxième locataire et met en place une nouvelle tentative pour les autres tâches, et cela continue.

Cette approche permet de définir une limite de débit par locataire mais comme vous pouvez le voir dans notre exemple, pour une tâche qui s'exécute très vite, être trop strict avec la limite de débit finit par laisser le travailleur ne rien faire pendant un moment. Le réglage fin des paramètres de limite de débit est crucial et dépend de la tâche et du volume spécifiques. N'hésitez pas à expérimenter jusqu'à trouver un équilibre optimal.

Conclusion

Nous avons exploré comment le traitement des tâches par défaut de Celery peut conduire à des injustices dans les environnements multi-locataires et comment la limitation du débit peut aider à résoudre ce problème. En mettant en œuvre des limites de débit spécifiques au locataire, nous pouvons empêcher un seul locataire de monopoliser les ressources et garantir une répartition plus équitable de la puissance de traitement.

Cette approche fournit une base solide pour parvenir à une transformation équitable du céleri. Cependant, il existe d'autres techniques qui méritent d'être explorées pour optimiser davantage la gestion des tâches dans les applications multi-locataires. Alors que j’avais initialement prévu de tout aborder dans un seul article, ce sujet s’avère assez vaste ! Pour garantir la clarté et garder cet article concentré, j'ai décidé de le diviser en deux parties.

Dans la prochaine partie de cette série, nous examinerons les priorités des tâches en tant que autre mécanisme visant à améliorer l'équité et l'efficacité. Cette approche vous permet d'attribuer différents niveaux de priorité aux tâches en fonction de différents critères, garantissant ainsi que les tâches critiques sont traitées rapidement, même pendant les périodes de forte demande.

Restez à l'écoute pour le prochain épisode !

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn