首頁 >後端開發 >Python教學 >如何測量 Celery 任務的執行時間?

如何測量 Celery 任務的執行時間?

Barbara Streisand
Barbara Streisand原創
2025-01-13 22:28:44930瀏覽

How do I measure the execution time of Celery tasks?

重複程式碼的集合中又添新成員:追蹤 Celery 任務的執行時間。

每個 Celery 任務其實有兩個不同的「執行」時間:

  • 實際執行時間: 程式碼運作的時間。
  • 「完成時間」: 包含在佇列中等待可用工作進程的時間。

兩者都很重要,因為我們的最終目標是了解任務何時完成

觸發任務後,我們需要知道任務何時完成以及何時可以預期結果。這就像項目估算一樣。管理者真正想知道的是專案何時完成,而不是它在一週內就能完成,但沒有人有空在接下來的六個月內去做。

使用 Celery 訊號

我們可以使用 Celery 訊號來計時任務。

提示 1: Celery 訊號的所有參數都是關鍵字參數。這意味著我們可以只列出我們感興趣的關鍵字參數,並將其餘參數打包到 **kwargs 中。這是個非常棒的設計!所有訊號都應該採用這種方式!

提示 2: 我們可以將執行開始和結束時間儲存在任務物件的「headers」屬性中。

任務入隊

當 Celery 任務進入佇列時,記錄目前時間:

<code class="language-python">from celery import signals
from dateutil.parser import isoparse
from datetime import datetime, timezone

@signals.before_task_publish.connect
def before_task_publish(*, headers: dict, **kwargs):
    raw_eta = headers.get("eta")
    publish_time = isoparse(raw_eta) if raw_eta else datetime.now(tz=timezone.utc)
    headers["__publish_time"] = publish_time.isoformat()</code>

任務開始執行

當工作流程接收到任務時,記錄當前時間:

<code class="language-python">from celery import signals
from datetime import datetime, timezone

@signals.task_prerun.connect
def task_prerun(*, task: Task, **kwargs):
    setattr(task.request, "__prerun_time", datetime.now(tz=timezone.utc).isoformat())</code>

任務執行結束

任務完成後,計算執行時間並將其儲存到某個地方,例如 StatsD 或其他監控工具。

StatsD 是用於監控應用程式和偵測任何軟體以提供自訂指標的業界標準技術堆疊。

  • Netdata: StatsD 簡介 [1]
<code class="language-python">from celery import signals, Task
from dateutil.parser import isoparse
from datetime import datetime, timezone, timedelta

def to_milliseconds(td: timedelta) -> int:
    return int(td.total_seconds() * 1000)

@signals.task_postrun.connect
def task_postrun(*, task: Task, **kwargs):
    now = datetime.now(tz=timezone.utc)
    publish_time = isoparse(getattr(task.request, "__publish_time", ""))
    prerun_time = isoparse(getattr(task.request, "__prerun_time", ""))

    exec_time = now - prerun_time if prerun_time else timedelta(0)
    waiting_time = prerun_time - publish_time if publish_time and prerun_time else timedelta(0)
    waiting_and_exec_time = now - publish_time if publish_time else timedelta(0)

    stats = {
        "exec_time_ms": to_milliseconds(exec_time),
        "waiting_time_ms": to_milliseconds(waiting_time),
        "waiting_and_exec_time_ms": to_milliseconds(waiting_and_exec_time),
    }
    # TODO: 将统计数据发送到 StatsD 或其他监控工具
    statsd.timing(f"celery.task.exec_time_ms", stats["exec_time_ms"], tags=[f"task:{task.name}"])
    # ... 发送其他统计数据 ...</code>

額外功能:設定執行時間過長警告

可以在上述函數中加入硬編碼閾值:

<code class="language-python">if exec_time > timedelta(hours=1):
    logger.error(f"任务 {task.name} 执行时间过长: {exec_time}。请检查!")</code>

或者,可以根據任務定義設定多層閾值或閾值,或任何可以在程式碼中表達的內容。

以上是如何測量 Celery 任務的執行時間?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn