Maison  >  Article  >  développement back-end  >  Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

王林
王林avant
2023-05-13 15:43:061931parcourir

    1. Contexte

    Dans le travail réel, certaines tâches asynchrones chronophages devront être planifiées, telles que l'envoi d'e-mails, l'extraction de données et l'exécution de scripts planifiés

    L'idée principale de la mise en œuvre la planification via le céleri consiste à introduire l'intermédiaire redis, à démarrer le travailleur pour l'exécution des tâches, celery-beat effectue le stockage des données des tâches planifiées

    2. Documentation officielle de Celery pour l'ajout dynamique de tâches planifiées

    documentation sur le céleri : https://docs.celeryproject. org/en/latest/userguide/periodic-tasches.html#beat-custom-schedulers

    celery Description de la classe de planification personnalisée :

    La classe de planificateur personnalisée peut être spécifiée dans la ligne de commande (paramètre --scheduler)

    django-celery -beat documentation : https:// pypi.org/project/django-celery-beat/

    Notes sur le plugin django-celery-beat :

    Cette extension vous permet de stocker des plannings de tâches périodiques dans la base de données. être géré depuis l'interface d'administration de Django. Vous pouvez y créer, modifier et supprimer des tâches périodiques et à quelle fréquence elles doivent s'exécuter

    3. Le céleri est simple et pratique

    3.1 Configuration de l'environnement de base

    1 Installez la dernière version de Django.

    pip3 install django #当前我安装的版本是 3.0.6

    2. Créez un projet

    django-admin startproject typeidea
    django-admin startapp blog

    3. Installez celery

    pip3 install django-celery
    pip3 install -U Celery 
    pip3 install "celery[librabbitmq,redis,auth,msgpack]" 
    pip3 install django-celery-beat # 用于动态添加定时任务
    pip3 install django-celery-results
    pip3 install redis

    3.2 Testez à l'aide de l'application Celery

    1 Créez un répertoire de blog et créez une nouvelle tâche.py

    Créez d'abord un dossier de blog dans le Projetez Django et créez des tâches sous le module blog.py, comme suit :

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

    Le code de tâches.py est le suivant :

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: tasks.py
    #Time: 2022/3/30 2:26 下午
    #Author: julius
    """
    from celery import Celery
     
    # 使用redis做为broker
    app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0')
     
    # 创建任务函数
    @app.task
    def my_task():
        print('任务正在执行...')

    Celery Le premier paramètre est de lui donner un nom. définir un intermédiaire. Ici, nous utilisons Redis comme intermédiaire. La fonction my_task est une fonction de tâche que nous avons écrite en ajoutant le décorateur app.task, elle est enregistrée dans la file d'attente du courtier.

    2. Démarrez Redis et créez un travailleur

    Maintenant, nous créons un travailleur et attendons de traiter les tâches dans la file d'attente.

    Entrez le répertoire racine du projet et exécutez la commande : celery -A celery_tasks.tasks worker -l info

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

    3. Appelons la tâche

    Testons la fonction, créons une tâche, ajoutons-la à la file d'attente des tâches et assurer l'exécution des travailleurs.

    Entrez dans le terminal python et exécutez le code suivant :

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> my_task.delay()
    <AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>

    L'appel d'une fonction de tâche renverra un objet AsyncResult. Cet objet peut être utilisé pour vérifier l'état de la tâche ou obtenir la valeur de retour de la tâche.

    4. Vérifiez les résultats

    Vérifiez l'état d'exécution de la tâche dans le terminal du travailleur. Vous pouvez voir que la tâche 83484dfe-f729-417b-8e51-6c7ae32a1377 a été reçue et que les informations d'exécution de la tâche ont été imprimées

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées.

    5. Stockez et affichez l'état d'exécution de la tâche

    Attribuez le résultat de l'exécution de la tâche à ret, puis appelez result() générera une erreur DisabledBackend. On peut voir que les informations sur l'état de l'exécution de la tâche ne peuvent pas être enregistrées lorsque. le stockage du backend n'est pas configuré. Nous en parlerons dans la section suivante. Allez à comment configurer le backend pour enregistrer les résultats de l'exécution des tâches

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> ret=my_task.delay()
    >>> ret.result()

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

    4. Configurer le backend pour stocker les résultats de l'exécution des tâches

    Si nous voulons suivre le statut de la tâche, Celery doit enregistrer les résultats quelque part. Plusieurs options de stockage sont disponibles : SQLAlchemy, Django ORM, Memcached, Redis, RPC (RabbitMQ/AMQP).

    1. Ajoutez le paramètre backend

    Dans cet exemple, nous utilisons Redis comme solution pour stocker les résultats et définissons l'adresse de stockage des résultats de la tâche via le paramètre backend de Celery. Nous avons modifié le module de tâches comme suit :

    from celery import Celery
     
    # 使用redis作为broker以及backend
    app = Celery(&#39;celery_tasks.tasks&#39;,
                 broker=&#39;redis://127.0.0.1:6379/8&#39;,
                 backend=&#39;redis://127.0.0.1:6379/9&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b):
        print("任务函数正在执行....")
        return a + b

    a ajouté un paramètre backend à Celery, spécifié redis comme stockage des résultats et modifié la fonction de tâche en deux paramètres et une valeur de retour.

    2. Appeler la tâche/Afficher le résultat de l'exécution de la tâche

    Appelons à nouveau la tâche et voyons.

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> res=my_task.delay(10,40)
    >>> res.result
    50
    >>> res.failed()
    False

    Regardons l'exécution du travailleur, comme suit :

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

    Vous pouvez voir que la tâche céleri a été exécutée avec succès.

    Mais ce n'est que le début, la prochaine étape consiste à voir comment ajouter des tâches planifiées.

    4. Optimisez la structure du répertoire Celery

    Ce qui précède écrit directement toutes les tâches de création, de configuration et de tâches de l'application Celery dans un seul fichier, ce qui rendra difficile la croissance du projet. Décomposons-le et ajoutons quelques paramètres couramment utilisés.

    La structure de base est la suivante

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

    $ vim typeidea/celery.py (fichier de candidature Céleri)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celery.py
    #Time: 2022/3/30 12:25 下午
    #Author: julius
    """
    import os
    from celery import Celery
    from blog import celeryconfig
    project_name=&#39;typeidea&#39;
    # set the default django setting module for the &#39;celery&#39; program
    os.environ.setdefault(&#39;DJANGO_SETTINGS_MODULE&#39;,&#39;typeidea.settings&#39;)
    app = Celery(project_name)
     
    app.config_from_object(&#39;django.conf:settings&#39;)
     
    app.autodiscover_tasks()

    vim blog/celeryconfig.py (配置Celery的参数文件)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celeryconfig.py
    #Time: 2022/3/30 2:54 下午
    #Author: julius
    """
    
    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;

    vim blog/tasks.py (tasks 任务文件)

    import time
    from blog.celery import app
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c

    五、开始使用django-celery-beat调度器

    使用 django-celery-beat 动态添加定时任务  celery 4.x 版本在 django 框架中是使用 django-celery-beat 进行动态添加定时任务的。前面虽然已经安装了这个库,但是还要再说明一下。

    1. 安装 django-celery-beat

    pip3 install django-celery-beat

    2.在项目的 settings 文件配置 django-celery-beat 

    INSTALLED_APPS = [
        &#39;blog&#39;,
        &#39;django_celery_beat&#39;,
        ...
    ]
     
    # Django设置时区
    LANGUAGE_CODE = &#39;zh-hans&#39;  # 使用中国语言
    TIME_ZONE = &#39;Asia/Shanghai&#39;  # 设置Django使用中国上海时间
    # 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用
    # 如果USE_TZ 设置为False,TIME_ZONE = &#39;Asia/Shanghai&#39;, 则使用上海的UTC时间。
    USE_TZ = False

    3. 创建 django-celery-beat 相关表

    执行Django数据库迁移: python manage.py migrate

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

    4. 配置Celery使用 django-celery-beat

    配置 celery.py

    import os
     
    from celery import Celery
     
    from blog import celeryconfig
     
    # 为celery 设置环境变量
    os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings")
    # 创建celery app
    app = Celery(&#39;blog&#39;)
    # 从单独的配置模块中加载配置
    app.config_from_object(celeryconfig)
     
    # 设置app自动加载任务
    app.autodiscover_tasks([
        &#39;blog&#39;,
    ])

    配置 celeryconfig.py

    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;

    编写任务 tasks.py

    import time
    from celery import Celery
    from blog.celery import app
     
    # 使用redis做为broker
    # app = Celery(&#39;blog.tasks2&#39;,broker=&#39;redis://127.0.0.1:6379/0&#39;,backend=&#39;redis://127.0.0.1:6379/1&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c
     
    @app.task
    def my_task2():
        print("任务2函数正在执行....")
        print(&#39;任务2函数休眠10s&#39;)
        time.sleep(10)

    5. 启动定时任务work

    启动定时任务首先需要有一个work执行异步任务,然后再启动一个定时器触发任务。

    启动任务 work

    $ celery -A blog worker -l info

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

    启动定时器触发 beat

    celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

    六、具体操作演练

    6.1 创建基于间隔时间的周期性任务

    1. 初始化周期间隔对象interval 对象

    >>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
    >>> schedule, created = IntervalSchedule.objects.get_or_create( 
    ...       every=10, 
    ...       period=IntervalSchedule.SECONDS, 
    ...  )
    >>> IntervalSchedule.objects.all()
    <QuerySet [<IntervalSchedule: every 10 seconds>]>

    2.创建一个无参数的周期性间隔任务

    >>>PeriodicTask.objects.create(interval=schedule,name=&#39;my_task2&#39;,task=&#39;blog.tasks.my_task2&#39;,)
    <PeriodicTask: my_task2: every 10 seconds>

    beat 调度服务日志显示如下:

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

     worker 服务日志显示如下:

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

    3.创建一个带参数的周期性间隔任务

    >>> PeriodicTask.objects.create(interval=schedule,name=&#39;my_task&#39;,task=&#39;blog.tasks.my_task&#39;,args=json.dumps([10,20,30]))
    <PeriodicTask: my_task: every 10 seconds>

    beat 调度服务日志结果:

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

     worker 服务日志结果:

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

    4.如何高并发执行任务

    需要并行执行任务的时候,就需要设置多个worker来执行任务。 

    6.2 创建一个不带参数的周期性间隔任务

    1.初始化 crontab 的调度对象

    >>> import pytz
    >>> schedule, _ = CrontabSchedule.objects.get_or_create(
    ... minute=&#39;*&#39;,
    ... hour=&#39;*&#39;,
    ... day_of_week=&#39;*&#39;,
    ... day_of_month=&#39;*&#39;,
    ... timezone=pytz.timezone(&#39;Asia/Shanghai&#39;)
    ... )

    2. 创建不带参数的定时任务

    PeriodicTask.objects.create(crontab=schedule,name=&#39;my_task2_crontab&#39;,task=&#39;blog.tasks.my_task2&#39;,)

    beat 调度服务执行结果 

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

     worker 执行服务结果

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

    6.3 周期性任务的查询、删除操作

    1. 周期性任务的查询

    >>> PeriodicTask.objects.all()
    <ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> for task in PeriodicTask.objects.all():
    ...     print(task.id)
    ... 
    1
    13
    >>> PeriodicTask.objects.get(id=13)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>

     控制台实际操作记录

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

    2.周期性任务的暂停/启动

    2.1 设置my_taks2_crontab 暂停任务

    >>> my_task2_crontab = PeriodicTask.objects.get(id=13)
    >>> my_task2_crontab.enabled
    True
    >>> my_task2_crontab.enabled=False
    >>> my_task2_crontab.save()

    查看worker输出:

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

     可以看到worker从19:31以后已经没有输出了,说明已经成功吧my_task2_crontab 任务暂停

    2.2 设置my_task2_crontab 开启任务

    把任务的 enabled 为 True 即可:

    >>> my_task2_crontab.enabled
    False
    >>> my_task2_crontab.enabled=True
    >>> my_task2_crontab.save()

    查看worker输出:

    Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées

     可以看到worker从19:36开始有输出,说明已把my_task2_crontab 任务重新启动

    3. 周期性任务的删除

    获取到指定的任务后调用delete(),再次查询指定任务会发现已经不存在了

    PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;).delete()
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    Traceback (most recent call last):
      File "<console>", line 1, in <module>
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method
        return getattr(self.get_queryset(), name)(*args, **kwargs)
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get
        raise self.model.DoesNotExist(
    django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Déclaration:
    Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer