Maison >développement back-end >Tutoriel Python >Comprendre les tâches, les courtiers, les travailleurs et les backends dans Celery

Comprendre les tâches, les courtiers, les travailleurs et les backends dans Celery

PHPz
PHPzoriginal
2024-07-23 20:37:53415parcourir

Understanding tasks, brokers, workers, and backends in Celery

Le céleri peut être intimidant à apprendre. Bien que sa documentation soit complète, elle a tendance à ignorer les bases.

Cet article définira quatre des principaux concepts du céleri, discutera de la relation entre le céleri et le Kombu et utilisera quelques exemples de code pour illustrer comment le céleri pourrait être utile dans des applications réelles. Les exemples utiliseront le framework Web Django et son décorateur @shared_task, mais les concepts sont également applicables à Flask, FastAPI et autres.

Tâches, courtiers, travailleurs et backends

Vous aurez du mal à trouver une place dans la documentation actuelle de Celery qui énonce clairement ce qu'il considère comme un courtier ou un backend, mais en creusant suffisamment, vous pouvez trouver et déduire des définitions.

Vous trouverez ci-dessous les concepts que vous devez connaître avant de commencer avec le céleri.

Tâche

Une tâche est un travail que Celery effectuera de manière asynchrone (dans ce contexte, c'est un mot sophistiqué pour "pas immédiatement"). Dans une application Web, une tâche peut consister à envoyer un e-mail après qu'un utilisateur a soumis un formulaire. L'envoi d'un e-mail peut prendre plusieurs secondes, et forcer un utilisateur à attendre l'envoi d'un e-mail avant de le rediriger peut ralentir une application.

Les tâches sont définies à l'aide de décorateurs dans Celery. Ci-dessous, nous utilisons le décorateur @shared_task pour transformer send_thank_you_email() en une tâche Celery qui peut être utilisée dans le gestionnaire de soumission de formulaire submit_feedback().

from config.celery import shared_task
from django.core.mail import send_mail
from django.shortcuts import render, redirect
from feedback.forms import FeedbackForm

@shared_task
def send_thank_you_email(email_address):
    send_mail(
        "Thank you for your feedback!",
        "We appreciate your input.",
        "noreply@example.com",
        [email_address],
    )

def submit_feedback(request):
    if request.method == "POST":
        form = FeedbackForm(request.POST)
        if form.is_valid():
            form.save()

            # Push the task to the broker using the delay() method.
            send_thank_you_email.delay(form.cleaned_data["email"])

            return redirect("/thank-you/")
    else:
        form = FeedbackForm()

    return render(request, "feedback.html", {"form": form})

Lorsqu'une tâche est définie à l'aide d'un décorateur dans Celery, elle ajoute une méthode delay() à la tâche. Vous pouvez voir la tâche send_thank_you_email appeler la méthode delay() dans l'exemple ci-dessus une fois le formulaire enregistré avec succès. Lorsque delay() est appelé, il enverra la tâche send_thank_you_email et ses données au broker où elle est stockée et sera ensuite exécutée par un worker, auquel cas l'utilisateur être envoyé par e-mail.

L'avantage de transférer le travail vers Celery devient plus évident si vous devez envoyer des e-mails supplémentaires après avoir enregistré le formulaire. Par exemple, vous souhaiterez peut-être envoyer un e-mail à l'équipe de support client pour l'informer qu'elle a reçu de nouveaux commentaires. Avec le céleri, cela n’ajoute presque aucun temps supplémentaire à la réponse.

Les tâches de céleri permettent également une configuration avancée supplémentaire. En cas d'échec de l'envoi d'un e-mail, vous pouvez coder votre tâche pour réessayer automatiquement et configurer des paramètres tels que max_retries, retry_backoff, retry_jitter, etc.

Courtier

Le glossaire des propositions d'amélioration du céleri dit ce qui suit à propos des courtiers de messages :

Les modèles d'intégration d'entreprise définissent un courtier de messages comme un élément architectural capable de recevoir des messages de plusieurs destinations, de déterminer la destination correcte et d'acheminer le message vers le canal approprié.

Pour nos besoins avec Celery, nous considérerons un courtier un "transport de messages" où les tâches créées sont stockées. Les courtiers n'exécutent réellement la tâche : c'est le travail d'un travailleur. Les courtiers sont plutôt l'endroit où les tâches planifiées sont stockées dans lorsqu'une tâche est planifiée, et extraites de lorsqu'un travailleur finit par exécuter une tâche. Un courtier est un composant requis pour que Celery fonctionne, et Celery se connectera à exactement un courtier.

La page Backends et courtiers de Celery en répertorie certains si ses courtiers sont pris en charge, et il existe d'autres courtiers expérimentaux qu'il prend en charge qui ne sont pas répertoriés (tels que SQLAlchemy). Ces courtiers (ou « transports de messages ») sont gérés par une bibliothèque Python maintenue par Celery pour les transports de messages appelée Kombu. Lorsque vous recherchez des informations sur la configuration des courtiers, il est parfois utile de consulter la documentation de Kombu plutôt que celle de Celery.

Certains courtiers disposent de fonctionnalités avancées telles que la distribution et la priorité des tâches, tandis que d'autres fonctionnent comme de simples files d'attente.

Travailleur

Un worker est une instance de Celery qui extrait les tâches du courtier et exécute les fonctions de tâche définies dans votre application Python. Celery est capable d'exécuter du code Python dans ses ouvriers car Celery lui-même est écrit en Python.

De nombreux travailleurs peuvent s'exécuter simultanément pour exécuter des tâches. Lorsque vous exécutez la commande Celery Worker, elle lancera par défaut un travailleur pour chaque cœur de votre ordinateur. Si votre ordinateur possède 16 cœurs, l'exécution de Celery Worker démarrera 16 Workers.

Si aucun travailleur n'est en cours d'exécution, les messages (tâches) s'accumuleront dans le courtier jusqu'à ce que les travailleurs soient disponibles pour les exécuter.

Back-end

La page des tâches du guide de l'utilisateur de Celery contient les informations suivantes à propos des backends :

If you want to keep track of tasks or need the return values, then Celery must store or send the states somewhere so that they can be retrieved later. There are several built-in result backends to choose from: SQLAlchemy/Django ORM, Memcached, RabbitMQ/QPid (rpc), and Redis – or you can define your own.

TLDR: a backend tracks the outcomes and returned results of async tasks. What does that actually mean, and when could it be useful?

Imagine you are building an accounting app in Django that can generate an annual report. The report could take minutes to generate.

To give your users a more responsive experience, you use an AJAX request to kick off a report generation task. That request returns an ID of the task, which it can use to poll the server every few seconds to see if the report is generated. Once the task is complete, it will return the ID of the report, which the client can use to display a link to the report via JavaScript.

We can implement this with Celery and Django using the following code:

from celery import shared_task
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from accounting.models import Asset
from accounting.reports import AnnualReportGenerator

@shared_task
def generate_report_task(year):
    # This could take minutes...
    report = AnnualReportGenerator().generate(year)
    asset = Asset.objects.create(
        name=f"{year} annual report",
        url=report.url,
    )
    return asset.id

@require_http_methods(["POST"])
def generate_annual_report_view(request):
    year = request.POST.get("year")
    task = generate_report_task.delay(year)
    return JsonResponse({"taskId": task.id})

def get_annual_report_generation_status_view(request, task_id):
    task = generate_report_task.AsyncResult(task_id)

    # The status is typically "PENDING", "SUCCESS", or "FAILURE"
    status = task.status
    return JsonResponse({"status": status, "assetId": task.result})

In this example, the asset ID returned by generate_report_task() is stored in a backend. The backend stores the outcome and returned result. The backend does not store the status of yet-to-be-processed tasks: these will only be added once there has been an outcome. A task that returns "PENDING" has a completely unknown status: an associated task might not even exist. Tasks will typically return "SUCCESS" or "FAILURE", but you can see all statuses in the Celery status docs.

Having a backend is not required for Celery to run tasks. However, it is required if you ever need to check the outcome of a task or return a task's result. If you try to check a task's status when Celery does not have a backend configured, an exception will be raised.


I hope this post helps you understand the individual pieces of Celery and why you might consider using it. While the official documentation is challenging to grok, learning Celery deeply can unlock new possibilities within your Python applications.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn