重复代码的集合中又添新成员:追踪 Celery 任务的执行时间。
每个 Celery 任务实际上有两个不同的“执行”时间:
- 实际执行时间: 代码运行的时间。
- “完成时间”: 包括在队列中等待可用工作进程的时间。
两者都很重要,因为我们的最终目标是了解任务何时完成。
触发任务后,我们需要知道任务何时完成以及何时可以预期结果。这就像项目估算一样。管理者真正想知道的是项目何时完成,而不是它在一周内就能完成,但没有人有空在接下来的六个月内去做。
使用 Celery 信号
我们可以使用 Celery 信号来计时任务。
提示 1: Celery 信号的所有参数都是关键字参数。这意味着我们可以只列出我们感兴趣的关键字参数,并将其余参数打包到 **kwargs
中。这是个非常棒的设计!所有信号都应该采用这种方式!
提示 2: 我们可以将执行开始和结束时间存储在任务对象的“headers”属性中。
任务入队
当 Celery 任务进入队列时,记录当前时间:
from celery import signals from dateutil.parser import isoparse from datetime import datetime, timezone @signals.before_task_publish.connect def before_task_publish(*, headers: dict, **kwargs): raw_eta = headers.get("eta") publish_time = isoparse(raw_eta) if raw_eta else datetime.now(tz=timezone.utc) headers["__publish_time"] = publish_time.isoformat()
任务开始执行
当工作进程接收到任务时,记录当前时间:
from celery import signals from datetime import datetime, timezone @signals.task_prerun.connect def task_prerun(*, task: Task, **kwargs): setattr(task.request, "__prerun_time", datetime.now(tz=timezone.utc).isoformat())
任务执行结束
任务完成后,计算执行时间并将其存储到某个地方,例如 StatsD 或其他监控工具。
StatsD 是用于监控应用程序和检测任何软件以提供自定义指标的行业标准技术栈。
- Netdata: StatsD 简介 [1]
from celery import signals, Task from dateutil.parser import isoparse from datetime import datetime, timezone, timedelta def to_milliseconds(td: timedelta) -> int: return int(td.total_seconds() * 1000) @signals.task_postrun.connect def task_postrun(*, task: Task, **kwargs): now = datetime.now(tz=timezone.utc) publish_time = isoparse(getattr(task.request, "__publish_time", "")) prerun_time = isoparse(getattr(task.request, "__prerun_time", "")) exec_time = now - prerun_time if prerun_time else timedelta(0) waiting_time = prerun_time - publish_time if publish_time and prerun_time else timedelta(0) waiting_and_exec_time = now - publish_time if publish_time else timedelta(0) stats = { "exec_time_ms": to_milliseconds(exec_time), "waiting_time_ms": to_milliseconds(waiting_time), "waiting_and_exec_time_ms": to_milliseconds(waiting_and_exec_time), } # TODO: 将统计数据发送到 StatsD 或其他监控工具 statsd.timing(f"celery.task.exec_time_ms", stats["exec_time_ms"], tags=[f"task:{task.name}"]) # ... 发送其他统计数据 ...
额外功能:设置执行时间过长警告
可以在上述函数中添加硬编码阈值:
if exec_time > timedelta(hours=1): logger.error(f"任务 {task.name} 执行时间过长: {exec_time}。请检查!")
或者,可以根据任务定义设置多层阈值或阈值,或者任何可以在代码中表达的内容。
以上是如何测量 Celery 任务的执行时间?的详细内容。更多信息请关注PHP中文网其他相关文章!

Tomergelistsinpython,YouCanusethe操作员,estextMethod,ListComprehension,Oritertools

在Python3中,可以通过多种方法连接两个列表:1)使用 运算符,适用于小列表,但对大列表效率低;2)使用extend方法,适用于大列表,内存效率高,但会修改原列表;3)使用*运算符,适用于合并多个列表,不修改原列表;4)使用itertools.chain,适用于大数据集,内存效率高。

使用join()方法是Python中从列表连接字符串最有效的方法。1)使用join()方法高效且易读。2)循环使用 运算符对大列表效率低。3)列表推导式与join()结合适用于需要转换的场景。4)reduce()方法适用于其他类型归约,但对字符串连接效率低。完整句子结束。

pythonexecutionistheprocessoftransformingpypythoncodeintoExecutablestructions.1)InternterPreterReadSthecode,ConvertingTingitIntObyTecode,whepythonvirtualmachine(pvm)theglobalinterpreterpreterpreterpreterlock(gil)the thepythonvirtualmachine(pvm)

Python的关键特性包括:1.语法简洁易懂,适合初学者;2.动态类型系统,提高开发速度;3.丰富的标准库,支持多种任务;4.强大的社区和生态系统,提供广泛支持;5.解释性,适合脚本和快速原型开发;6.多范式支持,适用于各种编程风格。

Python是解释型语言,但也包含编译过程。1)Python代码先编译成字节码。2)字节码由Python虚拟机解释执行。3)这种混合机制使Python既灵活又高效,但执行速度不如完全编译型语言。

useeAforloopWheniteratingOveraseQuenceOrforAspecificnumberoftimes; useAwhiLeLoopWhenconTinuingUntilAcIntiment.ForloopSareIdeAlforkNownsences,而WhileLeleLeleLeleLoopSituationSituationSituationsItuationSuationSituationswithUndEtermentersitations。

pythonloopscanleadtoerrorslikeinfiniteloops,modifyingListsDuringteritation,逐个偏置,零indexingissues,andnestedloopineflinefficiencies


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

SublimeText3 英文版
推荐:为Win版本,支持代码提示!

EditPlus 中文破解版
体积小,语法高亮,不支持代码提示功能

VSCode Windows 64位 下载
微软推出的免费、功能强大的一款IDE编辑器

Dreamweaver Mac版
视觉化网页开发工具

Atom编辑器mac版下载
最流行的的开源编辑器