搜索
首页后端开发Python教程怎么用Python Celery动态添加定时任务

怎么用Python Celery动态添加定时任务

May 13, 2023 pm 03:43 PM
pythoncelery

    一、背景

    实际工作中会有一些耗时的异步任务需要使用定时调度,比如发送邮件,拉取数据,执行定时脚本

    通过celery 实现调度主要思想是 通过引入中间人redis,启动 worker 进行任务执行 ,celery-beat进行定时任务数据存储

    二、Celery动态添加定时任务的官方文档

    celery文档:https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

    celery 自定义调度类说明: 

    自定义调度器类可以在命令行中指定(--scheduler参数)

    django-celery-beat文档 : https://pypi.org/project/django-celery-beat/

    关于django-celery-beat 插件的说明: 

    此扩展使您能够将定期任务计划存储在数据库中,可以从 Django 管理界面管理周期性任务,您可以在其中创建、编辑和删除周期性任务以及它们应该运行的频率

    三、celery简单实用

    3.1 基础环境配置

    1. 安装最新版本的Django

    pip3 install django #当前我安装的版本是 3.0.6

    2. 创建项目

    django-admin startproject typeidea
    django-admin startapp blog

    3.安装 celery

    pip3 install django-celery
    pip3 install -U Celery 
    pip3 install "celery[librabbitmq,redis,auth,msgpack]" 
    pip3 install django-celery-beat # 用于动态添加定时任务
    pip3 install django-celery-results
    pip3 install redis

    3.2 测试使用Celery应用

    1. 创建blog目录、新建task.py

    首先在Django项目中创建一个blog文件夹,并且在blog文件夹下创建tasks.py模块, 如下:

    怎么用Python Celery动态添加定时任务

     tasks.py代码如下: 

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: tasks.py
    #Time: 2022/3/30 2:26 下午
    #Author: julius
    """
    from celery import Celery
     
    # 使用redis做为broker
    app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0')
     
    # 创建任务函数
    @app.task
    def my_task():
        print('任务正在执行...')

    Celery第一个参数是给其设定一个名字, 第二参数我们设定一个中间人broker, 在这里我们使用Redis作为中间人。my_task函数是我们编写的一个任务函数, 通过加上装饰器app.task, 将其注册到broker的队列中。

    2. 启动redis、创建worker

    现在我们在创建一个worker, 等待处理队列中的任务。

    进入项目的根目录,执行命令: celery -A celery_tasks.tasks worker -l info

    怎么用Python Celery动态添加定时任务

     3. 调用任务

    下面来测试一下功能,创建一个任务,加入任务队列中,提供worker执行。

    进入python终端, 执行如下代码:

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> my_task.delay()
    <AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>

    调用一个任务函数,将会返回一个AsyncResult对象,这个对象可以用来检查任务的状态或者获得任务的返回值。

    4. 查看结果

    在worker的终端查看任务执行情况,可以看到已经收到83484dfe-f729-417b-8e51-6c7ae32a1377 任务,并打印了任务执行信息

    怎么用Python Celery动态添加定时任务

    5. 存储并查看任务执行状态

    把任务执行结果赋值给ret,然后调用result() 会产生 DisabledBackend 报错,可见没有配置后端存储的时候并不能保存任务执行的状态信息,下一节我们会讲到如何配置backend保存任务执行结果

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> ret=my_task.delay()
    >>> ret.result()

    怎么用Python Celery动态添加定时任务

    四、配置backend存储任务执行结果 

    如果我们想跟踪任务的状态,Celery需要将结果保存到某个地方。有几种保存的方案可选:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

    1. 添加backend参数

    在本例中我们使用Redis作为存储结果的方案,通过Celery的backend参数来设定任务结果存储地址。我们将tasks模块修改如下:

    from celery import Celery
     
    # 使用redis作为broker以及backend
    app = Celery(&#39;celery_tasks.tasks&#39;,
                 broker=&#39;redis://127.0.0.1:6379/8&#39;,
                 backend=&#39;redis://127.0.0.1:6379/9&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b):
        print("任务函数正在执行....")
        return a + b

    给Celery增加了backend参数,指定redis作为结果存储,并将任务函数修改为两个参数,并且有返回值。

    2. 调用任务/查看任务执行结果

    下面再来执行调用一下这个任务看看。

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> res=my_task.delay(10,40)
    >>> res.result
    50
    >>> res.failed()
    False

    再来看看worker的执行情况,如下:

    怎么用Python Celery动态添加定时任务

    可以看到celery任务已经执行成功了。

    但是这只是一个开始,下一步要看看如何添加定时的任务。

    四、优化Celery目录结构

    上面直接将Celery的应用创建、配置、tasks任务全部写在了一个文件,这样在后面项目越来越大,也是不方便的。下面来拆分一下,并且添加一些常用的参数。

    基本结构如下

    怎么用Python Celery动态添加定时任务

    $ vim typeidea/celery.py (Celery应用文件)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celery.py
    #Time: 2022/3/30 12:25 下午
    #Author: julius
    """
    import os
    from celery import Celery
    from blog import celeryconfig
    project_name=&#39;typeidea&#39;
    # set the default django setting module for the &#39;celery&#39; program
    os.environ.setdefault(&#39;DJANGO_SETTINGS_MODULE&#39;,&#39;typeidea.settings&#39;)
    app = Celery(project_name)
     
    app.config_from_object(&#39;django.conf:settings&#39;)
     
    app.autodiscover_tasks()

    vim blog/celeryconfig.py (配置Celery的参数文件)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celeryconfig.py
    #Time: 2022/3/30 2:54 下午
    #Author: julius
    """
    
    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;

    vim blog/tasks.py (tasks 任务文件)

    import time
    from blog.celery import app
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c

    五、开始使用django-celery-beat调度器

    使用 django-celery-beat 动态添加定时任务  celery 4.x 版本在 django 框架中是使用 django-celery-beat 进行动态添加定时任务的。前面虽然已经安装了这个库,但是还要再说明一下。

    1. 安装 django-celery-beat

    pip3 install django-celery-beat

    2.在项目的 settings 文件配置 django-celery-beat 

    INSTALLED_APPS = [
        &#39;blog&#39;,
        &#39;django_celery_beat&#39;,
        ...
    ]
     
    # Django设置时区
    LANGUAGE_CODE = &#39;zh-hans&#39;  # 使用中国语言
    TIME_ZONE = &#39;Asia/Shanghai&#39;  # 设置Django使用中国上海时间
    # 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用
    # 如果USE_TZ 设置为False,TIME_ZONE = &#39;Asia/Shanghai&#39;, 则使用上海的UTC时间。
    USE_TZ = False

    3. 创建 django-celery-beat 相关表

    执行Django数据库迁移: python manage.py migrate

    怎么用Python Celery动态添加定时任务

    4. 配置Celery使用 django-celery-beat

    配置 celery.py

    import os
     
    from celery import Celery
     
    from blog import celeryconfig
     
    # 为celery 设置环境变量
    os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings")
    # 创建celery app
    app = Celery(&#39;blog&#39;)
    # 从单独的配置模块中加载配置
    app.config_from_object(celeryconfig)
     
    # 设置app自动加载任务
    app.autodiscover_tasks([
        &#39;blog&#39;,
    ])

    配置 celeryconfig.py

    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;

    编写任务 tasks.py

    import time
    from celery import Celery
    from blog.celery import app
     
    # 使用redis做为broker
    # app = Celery(&#39;blog.tasks2&#39;,broker=&#39;redis://127.0.0.1:6379/0&#39;,backend=&#39;redis://127.0.0.1:6379/1&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c
     
    @app.task
    def my_task2():
        print("任务2函数正在执行....")
        print(&#39;任务2函数休眠10s&#39;)
        time.sleep(10)

    5. 启动定时任务work

    启动定时任务首先需要有一个work执行异步任务,然后再启动一个定时器触发任务。

    启动任务 work

    $ celery -A blog worker -l info

    怎么用Python Celery动态添加定时任务

    启动定时器触发 beat

    celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

    怎么用Python Celery动态添加定时任务

    六、具体操作演练

    6.1 创建基于间隔时间的周期性任务

    1. 初始化周期间隔对象interval 对象

    >>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
    >>> schedule, created = IntervalSchedule.objects.get_or_create( 
    ...       every=10, 
    ...       period=IntervalSchedule.SECONDS, 
    ...  )
    >>> IntervalSchedule.objects.all()
    <QuerySet [<IntervalSchedule: every 10 seconds>]>

    2.创建一个无参数的周期性间隔任务

    >>>PeriodicTask.objects.create(interval=schedule,name=&#39;my_task2&#39;,task=&#39;blog.tasks.my_task2&#39;,)
    <PeriodicTask: my_task2: every 10 seconds>

    beat 调度服务日志显示如下:

    怎么用Python Celery动态添加定时任务

     worker 服务日志显示如下:

    怎么用Python Celery动态添加定时任务

    3.创建一个带参数的周期性间隔任务

    >>> PeriodicTask.objects.create(interval=schedule,name=&#39;my_task&#39;,task=&#39;blog.tasks.my_task&#39;,args=json.dumps([10,20,30]))
    <PeriodicTask: my_task: every 10 seconds>

    beat 调度服务日志结果:

    怎么用Python Celery动态添加定时任务

     worker 服务日志结果:

    怎么用Python Celery动态添加定时任务

    4.如何高并发执行任务

    需要并行执行任务的时候,就需要设置多个worker来执行任务。 

    6.2 创建一个不带参数的周期性间隔任务

    1.初始化 crontab 的调度对象

    >>> import pytz
    >>> schedule, _ = CrontabSchedule.objects.get_or_create(
    ... minute=&#39;*&#39;,
    ... hour=&#39;*&#39;,
    ... day_of_week=&#39;*&#39;,
    ... day_of_month=&#39;*&#39;,
    ... timezone=pytz.timezone(&#39;Asia/Shanghai&#39;)
    ... )

    2. 创建不带参数的定时任务

    PeriodicTask.objects.create(crontab=schedule,name=&#39;my_task2_crontab&#39;,task=&#39;blog.tasks.my_task2&#39;,)

    beat 调度服务执行结果 

    怎么用Python Celery动态添加定时任务

     worker 执行服务结果

    怎么用Python Celery动态添加定时任务

    6.3 周期性任务的查询、删除操作

    1. 周期性任务的查询

    >>> PeriodicTask.objects.all()
    <ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> for task in PeriodicTask.objects.all():
    ...     print(task.id)
    ... 
    1
    13
    >>> PeriodicTask.objects.get(id=13)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>

     控制台实际操作记录

    怎么用Python Celery动态添加定时任务

    2.周期性任务的暂停/启动

    2.1 设置my_taks2_crontab 暂停任务

    >>> my_task2_crontab = PeriodicTask.objects.get(id=13)
    >>> my_task2_crontab.enabled
    True
    >>> my_task2_crontab.enabled=False
    >>> my_task2_crontab.save()

    查看worker输出:

    怎么用Python Celery动态添加定时任务

     可以看到worker从19:31以后已经没有输出了,说明已经成功吧my_task2_crontab 任务暂停

    2.2 设置my_task2_crontab 开启任务

    把任务的 enabled 为 True 即可:

    >>> my_task2_crontab.enabled
    False
    >>> my_task2_crontab.enabled=True
    >>> my_task2_crontab.save()

    查看worker输出:

    怎么用Python Celery动态添加定时任务

     可以看到worker从19:36开始有输出,说明已把my_task2_crontab 任务重新启动

    3. 周期性任务的删除

    获取到指定的任务后调用delete(),再次查询指定任务会发现已经不存在了

    PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;).delete()
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    Traceback (most recent call last):
      File "<console>", line 1, in <module>
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method
        return getattr(self.get_queryset(), name)(*args, **kwargs)
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get
        raise self.model.DoesNotExist(
    django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.

    以上是怎么用Python Celery动态添加定时任务的详细内容。更多信息请关注PHP中文网其他相关文章!

    声明
    本文转载于:亿速云。如有侵权,请联系admin@php.cn删除
    Python与C:学习曲线和易用性Python与C:学习曲线和易用性Apr 19, 2025 am 12:20 AM

    Python更易学且易用,C 则更强大但复杂。1.Python语法简洁,适合初学者,动态类型和自动内存管理使其易用,但可能导致运行时错误。2.C 提供低级控制和高级特性,适合高性能应用,但学习门槛高,需手动管理内存和类型安全。

    Python vs. C:内存管理和控制Python vs. C:内存管理和控制Apr 19, 2025 am 12:17 AM

    Python和C 在内存管理和控制方面的差异显着。 1.Python使用自动内存管理,基于引用计数和垃圾回收,简化了程序员的工作。 2.C 则要求手动管理内存,提供更多控制权但增加了复杂性和出错风险。选择哪种语言应基于项目需求和团队技术栈。

    科学计算的Python:详细的外观科学计算的Python:详细的外观Apr 19, 2025 am 12:15 AM

    Python在科学计算中的应用包括数据分析、机器学习、数值模拟和可视化。1.Numpy提供高效的多维数组和数学函数。2.SciPy扩展Numpy功能,提供优化和线性代数工具。3.Pandas用于数据处理和分析。4.Matplotlib用于生成各种图表和可视化结果。

    Python和C:找到合适的工具Python和C:找到合适的工具Apr 19, 2025 am 12:04 AM

    选择Python还是C 取决于项目需求:1)Python适合快速开发、数据科学和脚本编写,因其简洁语法和丰富库;2)C 适用于需要高性能和底层控制的场景,如系统编程和游戏开发,因其编译型和手动内存管理。

    数据科学和机器学习的Python数据科学和机器学习的PythonApr 19, 2025 am 12:02 AM

    Python在数据科学和机器学习中的应用广泛,主要依赖于其简洁性和强大的库生态系统。1)Pandas用于数据处理和分析,2)Numpy提供高效的数值计算,3)Scikit-learn用于机器学习模型构建和优化,这些库让Python成为数据科学和机器学习的理想工具。

    学习Python:2小时的每日学习是否足够?学习Python:2小时的每日学习是否足够?Apr 18, 2025 am 12:22 AM

    每天学习Python两个小时是否足够?这取决于你的目标和学习方法。1)制定清晰的学习计划,2)选择合适的学习资源和方法,3)动手实践和复习巩固,可以在这段时间内逐步掌握Python的基本知识和高级功能。

    Web开发的Python:关键应用程序Web开发的Python:关键应用程序Apr 18, 2025 am 12:20 AM

    Python在Web开发中的关键应用包括使用Django和Flask框架、API开发、数据分析与可视化、机器学习与AI、以及性能优化。1.Django和Flask框架:Django适合快速开发复杂应用,Flask适用于小型或高度自定义项目。2.API开发:使用Flask或DjangoRESTFramework构建RESTfulAPI。3.数据分析与可视化:利用Python处理数据并通过Web界面展示。4.机器学习与AI:Python用于构建智能Web应用。5.性能优化:通过异步编程、缓存和代码优

    Python vs.C:探索性能和效率Python vs.C:探索性能和效率Apr 18, 2025 am 12:20 AM

    Python在开发效率上优于C ,但C 在执行性能上更高。1.Python的简洁语法和丰富库提高开发效率。2.C 的编译型特性和硬件控制提升执行性能。选择时需根据项目需求权衡开发速度与执行效率。

    See all articles

    热AI工具

    Undresser.AI Undress

    Undresser.AI Undress

    人工智能驱动的应用程序,用于创建逼真的裸体照片

    AI Clothes Remover

    AI Clothes Remover

    用于从照片中去除衣服的在线人工智能工具。

    Undress AI Tool

    Undress AI Tool

    免费脱衣服图片

    Clothoff.io

    Clothoff.io

    AI脱衣机

    AI Hentai Generator

    AI Hentai Generator

    免费生成ai无尽的。

    热工具

    螳螂BT

    螳螂BT

    Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。

    SublimeText3 Linux新版

    SublimeText3 Linux新版

    SublimeText3 Linux最新版

    SublimeText3汉化版

    SublimeText3汉化版

    中文版,非常好用

    Atom编辑器mac版下载

    Atom编辑器mac版下载

    最流行的的开源编辑器

    SublimeText3 Mac版

    SublimeText3 Mac版

    神级代码编辑软件(SublimeText3)