首页  >  文章  >  后端开发  >  了解 Celery 中的任务、代理、工作人员和后端

了解 Celery 中的任务、代理、工作人员和后端

PHPz
PHPz原创
2024-07-23 20:37:53357浏览

Understanding tasks, brokers, workers, and backends in Celery

芹菜学起来可能令人望而生畏。虽然它的文档很全面,但它倾向于跳过基础知识。

这篇文章将定义 Celery 中的四个主要概念,讨论 Celery 和 Kombu 之间的关系,并使用一些代码示例来说明 Celery 在实际应用程序中如何有用。这些示例将使用 Django Web 框架及其 @shared_task 装饰器,但这些概念也适用于 Flask、FastAPI 等。

任务、代理、工作人员和后端

你很难在当前的 Celery 文档中找到一个地方来清楚地说明它所认为的经纪人后端,但是通过足够的挖掘,你可以找到和推断定义。

以下是开始使用 Celery 之前您应该了解的概念。

任务

任务是Celery将异步执行的一些工作(在这种情况下,这是“不立即”的一个奇特词)。在 Web 应用程序中,一项任务可能是在用户提交表单后发送电子邮件。发送电子邮件可能是一个需要多秒的操作,并且强制用户在重定向之前等待电子邮件发送可能会让应用程序感觉很慢。

任务是使用 Celery 中的装饰器定义的。下面我们使用 @shared_task 装饰器将 send_thank_you_email() 转换为 Celery 任务,可以在 Submit_feedback() 表单提交处理程序中使用。

from config.celery import shared_task
from django.core.mail import send_mail
from django.shortcuts import render, redirect
from feedback.forms import FeedbackForm

@shared_task
def send_thank_you_email(email_address):
    send_mail(
        "Thank you for your feedback!",
        "We appreciate your input.",
        "noreply@example.com",
        [email_address],
    )

def submit_feedback(request):
    if request.method == "POST":
        form = FeedbackForm(request.POST)
        if form.is_valid():
            form.save()

            # Push the task to the broker using the delay() method.
            send_thank_you_email.delay(form.cleaned_data["email"])

            return redirect("/thank-you/")
    else:
        form = FeedbackForm()

    return render(request, "feedback.html", {"form": form})

当在 Celery 中使用装饰器定义任务时,它会向任务添加一个delay() 方法。您可以看到在成功保存表单后,send_thank_you_email 任务调用了上例中的delay() 方法。当delay()被调用时,它会将send_thank_you_email任务及其数据发送到存储它的broker,稍后将由worker执行,此时用户将已通过电子邮件发送。

如果您在保存表单后需要发送额外的电子邮件,那么将工作推送到 Celery 的好处就变得更加明显。例如,您可能想向客户支持团队发送电子邮件,告知他们收到了新的反馈。对于 Celery,这几乎不会增加响应时间。

Celery 任务还允许额外的高级配置。如果电子邮件发送失败,您可以对任务进行编码以自动重试并配置 max_retries、retry_backoff、retry_jitter 等设置。

经纪人

Celery 增强提案的术语表对 消息代理有以下说法:

企业集成模式将消息代理定义为一个架构构建块,它可以从多个目的地接收消息,确定正确的目的地并将消息路由到正确的通道。

出于我们使用 Celery 的目的,我们将考虑 代理 存储创建的任务的“消息传输”。经纪人实际上并不执行任务:那是工人的工作。相反,代理是计划任务在计划任务时存储到的地方,以及在工作人员最终执行任务时从拉取的地方。代理是 Celery 工作的必需的组件,并且 Celery 将只连接到一个代理。

Celery 的后端和代理页面列出了一些其支持的代理,并且还有其他未列出的实验性代理(例如 SQLAlchemy)。这些代理(或“消息传输”)由 Celery 维护的 Python 消息传输库(称为 Kombu)管理。当寻找有关配置代理的信息时,查阅 Kombu 的文档而不是 Celery 的文档有时会很有帮助。

一些代理具有任务扇出和优先级等高级功能,而其他代理则作为简单队列运行。

工人

worker 是 Celery 的一个实例,它从代理中提取任务并执行 Python 应用程序中定义的任务函数。 Celery 能够在其工作线程中运行 Python 代码,因为 Celery 本身是用 Python 编写的。

许多worker可以同时运行来执行任务。当您运行 celery worker 命令时,默认情况下它会为计算机的每个核心启动一个工作线程。如果你的电脑有 16 个核心,运行 celery Worker 将启动 16 个 Worker。

如果没有工作线程在运行,消息(任务)将在代理中累积,直到工作线程可以执行它们。

后端

Celery 用户指南中的任务页面有以下关于后端的说明:

If you want to keep track of tasks or need the return values, then Celery must store or send the states somewhere so that they can be retrieved later. There are several built-in result backends to choose from: SQLAlchemy/Django ORM, Memcached, RabbitMQ/QPid (rpc), and Redis – or you can define your own.

TLDR: a backend tracks the outcomes and returned results of async tasks. What does that actually mean, and when could it be useful?

Imagine you are building an accounting app in Django that can generate an annual report. The report could take minutes to generate.

To give your users a more responsive experience, you use an AJAX request to kick off a report generation task. That request returns an ID of the task, which it can use to poll the server every few seconds to see if the report is generated. Once the task is complete, it will return the ID of the report, which the client can use to display a link to the report via JavaScript.

We can implement this with Celery and Django using the following code:

from celery import shared_task
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from accounting.models import Asset
from accounting.reports import AnnualReportGenerator

@shared_task
def generate_report_task(year):
    # This could take minutes...
    report = AnnualReportGenerator().generate(year)
    asset = Asset.objects.create(
        name=f"{year} annual report",
        url=report.url,
    )
    return asset.id

@require_http_methods(["POST"])
def generate_annual_report_view(request):
    year = request.POST.get("year")
    task = generate_report_task.delay(year)
    return JsonResponse({"taskId": task.id})

def get_annual_report_generation_status_view(request, task_id):
    task = generate_report_task.AsyncResult(task_id)

    # The status is typically "PENDING", "SUCCESS", or "FAILURE"
    status = task.status
    return JsonResponse({"status": status, "assetId": task.result})

In this example, the asset ID returned by generate_report_task() is stored in a backend. The backend stores the outcome and returned result. The backend does not store the status of yet-to-be-processed tasks: these will only be added once there has been an outcome. A task that returns "PENDING" has a completely unknown status: an associated task might not even exist. Tasks will typically return "SUCCESS" or "FAILURE", but you can see all statuses in the Celery status docs.

Having a backend is not required for Celery to run tasks. However, it is required if you ever need to check the outcome of a task or return a task's result. If you try to check a task's status when Celery does not have a backend configured, an exception will be raised.


I hope this post helps you understand the individual pieces of Celery and why you might consider using it. While the official documentation is challenging to grok, learning Celery deeply can unlock new possibilities within your Python applications.

以上是了解 Celery 中的任务、代理、工作人员和后端的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn