Maison >développement back-end >Tutoriel Python >Comment utiliser Python Celery pour ajouter dynamiquement des tâches planifiées
Dans le travail réel, certaines tâches asynchrones chronophages devront être planifiées, telles que l'envoi d'e-mails, l'extraction de données et l'exécution de scripts planifiés
L'idée principale de la mise en œuvre la planification via le céleri consiste à introduire l'intermédiaire redis, à démarrer le travailleur pour l'exécution des tâches, celery-beat effectue le stockage des données des tâches planifiées
documentation sur le céleri : https://docs.celeryproject. org/en/latest/userguide/periodic-tasches.html#beat-custom-schedulers
celery Description de la classe de planification personnalisée :
La classe de planificateur personnalisée peut être spécifiée dans la ligne de commande (paramètre --scheduler)
django-celery -beat documentation : https:// pypi.org/project/django-celery-beat/
Notes sur le plugin django-celery-beat :
Cette extension vous permet de stocker des plannings de tâches périodiques dans la base de données. être géré depuis l'interface d'administration de Django. Vous pouvez y créer, modifier et supprimer des tâches périodiques et à quelle fréquence elles doivent s'exécuter
1 Installez la dernière version de Django.
pip3 install django #当前我安装的版本是 3.0.6
2. Créez un projet
django-admin startproject typeidea django-admin startapp blog
3. Installez celery
pip3 install django-celery pip3 install -U Celery pip3 install "celery[librabbitmq,redis,auth,msgpack]" pip3 install django-celery-beat # 用于动态添加定时任务 pip3 install django-celery-results pip3 install redis
1 Créez un répertoire de blog et créez une nouvelle tâche.py
Créez d'abord un dossier de blog dans le Projetez Django et créez des tâches sous le module blog.py, comme suit :
Le code de tâches.py est le suivant :
#!/usr/bin/env python # -*- coding: UTF-8 -*- """ #File: tasks.py #Time: 2022/3/30 2:26 下午 #Author: julius """ from celery import Celery # 使用redis做为broker app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0') # 创建任务函数 @app.task def my_task(): print('任务正在执行...')
Celery Le premier paramètre est de lui donner un nom. définir un intermédiaire. Ici, nous utilisons Redis comme intermédiaire. La fonction my_task est une fonction de tâche que nous avons écrite en ajoutant le décorateur app.task, elle est enregistrée dans la file d'attente du courtier.
2. Démarrez Redis et créez un travailleur
Maintenant, nous créons un travailleur et attendons de traiter les tâches dans la file d'attente.
Entrez le répertoire racine du projet et exécutez la commande : celery -A celery_tasks.tasks worker -l info
3. Appelons la tâche
Testons la fonction, créons une tâche, ajoutons-la à la file d'attente des tâches et assurer l'exécution des travailleurs.
Entrez dans le terminal python et exécutez le code suivant :
$ python manage.py shell >>> from blog.tasks import my_task >>> my_task.delay() <AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>
L'appel d'une fonction de tâche renverra un objet AsyncResult. Cet objet peut être utilisé pour vérifier l'état de la tâche ou obtenir la valeur de retour de la tâche.
4. Vérifiez les résultats
Vérifiez l'état d'exécution de la tâche dans le terminal du travailleur. Vous pouvez voir que la tâche 83484dfe-f729-417b-8e51-6c7ae32a1377 a été reçue et que les informations d'exécution de la tâche ont été imprimées
.
5. Stockez et affichez l'état d'exécution de la tâche
Attribuez le résultat de l'exécution de la tâche à ret, puis appelez result() générera une erreur DisabledBackend. On peut voir que les informations sur l'état de l'exécution de la tâche ne peuvent pas être enregistrées lorsque. le stockage du backend n'est pas configuré. Nous en parlerons dans la section suivante. Allez à comment configurer le backend pour enregistrer les résultats de l'exécution des tâches
$ python manage.py shell >>> from blog.tasks import my_task >>> ret=my_task.delay() >>> ret.result()
Si nous voulons suivre le statut de la tâche, Celery doit enregistrer les résultats quelque part. Plusieurs options de stockage sont disponibles : SQLAlchemy, Django ORM, Memcached, Redis, RPC (RabbitMQ/AMQP).
1. Ajoutez le paramètre backend
Dans cet exemple, nous utilisons Redis comme solution pour stocker les résultats et définissons l'adresse de stockage des résultats de la tâche via le paramètre backend de Celery. Nous avons modifié le module de tâches comme suit :
from celery import Celery # 使用redis作为broker以及backend app = Celery('celery_tasks.tasks', broker='redis://127.0.0.1:6379/8', backend='redis://127.0.0.1:6379/9') # 创建任务函数 @app.task def my_task(a, b): print("任务函数正在执行....") return a + b
a ajouté un paramètre backend à Celery, spécifié redis comme stockage des résultats et modifié la fonction de tâche en deux paramètres et une valeur de retour.
2. Appeler la tâche/Afficher le résultat de l'exécution de la tâche
Appelons à nouveau la tâche et voyons.
$ python manage.py shell >>> from blog.tasks import my_task >>> res=my_task.delay(10,40) >>> res.result 50 >>> res.failed() False
Regardons l'exécution du travailleur, comme suit :
Vous pouvez voir que la tâche céleri a été exécutée avec succès.
Mais ce n'est que le début, la prochaine étape consiste à voir comment ajouter des tâches planifiées.
Ce qui précède écrit directement toutes les tâches de création, de configuration et de tâches de l'application Celery dans un seul fichier, ce qui rendra difficile la croissance du projet. Décomposons-le et ajoutons quelques paramètres couramment utilisés.
La structure de base est la suivante
$ vim typeidea/celery.py (fichier de candidature Céleri)
#!/usr/bin/env python # -*- coding: UTF-8 -*- """ #File: celery.py #Time: 2022/3/30 12:25 下午 #Author: julius """ import os from celery import Celery from blog import celeryconfig project_name='typeidea' # set the default django setting module for the 'celery' program os.environ.setdefault('DJANGO_SETTINGS_MODULE','typeidea.settings') app = Celery(project_name) app.config_from_object('django.conf:settings') app.autodiscover_tasks()
vim blog/celeryconfig.py (配置Celery的参数文件)
#!/usr/bin/env python # -*- coding: UTF-8 -*- """ #File: celeryconfig.py #Time: 2022/3/30 2:54 下午 #Author: julius """ # 设置结果存储 from typeidea import settings import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings") CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 设置代理人broker BROKER_URL = 'redis://127.0.0.1:6379/1' # celery 的启动工作数量设置 CELERY_WORKER_CONCURRENCY = 20 # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。 CELERYD_PREFETCH_MULTIPLIER = 20 # 非常重要,有些情况下可以防止死锁 CELERYD_FORCE_EXECV = True # celery 的 worker 执行多少个任务后进行重启操作 CELERY_WORKER_MAX_TASKS_PER_CHILD = 100 # 禁用所有速度限制,如果网络资源有限,不建议开足马力。 CELERY_DISABLE_RATE_LIMITS = True CELERY_ENABLE_UTC = False CELERY_TIMEZONE = settings.TIME_ZONE DJANGO_CELERY_BEAT_TZ_AWARE = False CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
vim blog/tasks.py (tasks 任务文件)
import time from blog.celery import app # 创建任务函数 @app.task def my_task(a, b, c): print('任务正在执行...') print('任务1函数休眠10s') time.sleep(10) return a + b + c
使用 django-celery-beat 动态添加定时任务 celery 4.x 版本在 django 框架中是使用 django-celery-beat 进行动态添加定时任务的。前面虽然已经安装了这个库,但是还要再说明一下。
1. 安装 django-celery-beat
pip3 install django-celery-beat
2.在项目的 settings 文件配置 django-celery-beat
INSTALLED_APPS = [ 'blog', 'django_celery_beat', ... ] # Django设置时区 LANGUAGE_CODE = 'zh-hans' # 使用中国语言 TIME_ZONE = 'Asia/Shanghai' # 设置Django使用中国上海时间 # 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用 # 如果USE_TZ 设置为False,TIME_ZONE = 'Asia/Shanghai', 则使用上海的UTC时间。 USE_TZ = False
3. 创建 django-celery-beat 相关表
执行Django数据库迁移: python manage.py migrate
4. 配置Celery使用 django-celery-beat
配置 celery.py
import os from celery import Celery from blog import celeryconfig # 为celery 设置环境变量 os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings") # 创建celery app app = Celery('blog') # 从单独的配置模块中加载配置 app.config_from_object(celeryconfig) # 设置app自动加载任务 app.autodiscover_tasks([ 'blog', ])
配置 celeryconfig.py
# 设置结果存储 from typeidea import settings import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings") CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 设置代理人broker BROKER_URL = 'redis://127.0.0.1:6379/1' # celery 的启动工作数量设置 CELERY_WORKER_CONCURRENCY = 20 # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。 CELERYD_PREFETCH_MULTIPLIER = 20 # 非常重要,有些情况下可以防止死锁 CELERYD_FORCE_EXECV = True # celery 的 worker 执行多少个任务后进行重启操作 CELERY_WORKER_MAX_TASKS_PER_CHILD = 100 # 禁用所有速度限制,如果网络资源有限,不建议开足马力。 CELERY_DISABLE_RATE_LIMITS = True CELERY_ENABLE_UTC = False CELERY_TIMEZONE = settings.TIME_ZONE DJANGO_CELERY_BEAT_TZ_AWARE = False CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
编写任务 tasks.py
import time from celery import Celery from blog.celery import app # 使用redis做为broker # app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0',backend='redis://127.0.0.1:6379/1') # 创建任务函数 @app.task def my_task(a, b, c): print('任务正在执行...') print('任务1函数休眠10s') time.sleep(10) return a + b + c @app.task def my_task2(): print("任务2函数正在执行....") print('任务2函数休眠10s') time.sleep(10)
5. 启动定时任务work
启动定时任务首先需要有一个work执行异步任务,然后再启动一个定时器触发任务。
启动任务 work
$ celery -A blog worker -l info
启动定时器触发 beat
celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
1. 初始化周期间隔对象interval
对象
>>> from django_celery_beat.models import PeriodicTask, IntervalSchedule >>> schedule, created = IntervalSchedule.objects.get_or_create( ... every=10, ... period=IntervalSchedule.SECONDS, ... ) >>> IntervalSchedule.objects.all() <QuerySet [<IntervalSchedule: every 10 seconds>]>
2.创建一个无参数的周期性间隔任务
>>>PeriodicTask.objects.create(interval=schedule,name='my_task2',task='blog.tasks.my_task2',) <PeriodicTask: my_task2: every 10 seconds>
beat 调度服务日志显示如下:
worker 服务日志显示如下:
3.创建一个带参数的周期性间隔任务
>>> PeriodicTask.objects.create(interval=schedule,name='my_task',task='blog.tasks.my_task',args=json.dumps([10,20,30])) <PeriodicTask: my_task: every 10 seconds>
beat 调度服务日志结果:
worker 服务日志结果:
4.如何高并发执行任务
需要并行执行任务的时候,就需要设置多个worker
来执行任务。
1.初始化 crontab
的调度对象
>>> import pytz >>> schedule, _ = CrontabSchedule.objects.get_or_create( ... minute='*', ... hour='*', ... day_of_week='*', ... day_of_month='*', ... timezone=pytz.timezone('Asia/Shanghai') ... )
2. 创建不带参数的定时任务
PeriodicTask.objects.create(crontab=schedule,name='my_task2_crontab',task='blog.tasks.my_task2',)
beat 调度服务执行结果
worker 执行服务结果
1. 周期性任务的查询
>>> PeriodicTask.objects.all() <ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]> >>> PeriodicTask.objects.get(name='my_task2_crontab') <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai> >>> for task in PeriodicTask.objects.all(): ... print(task.id) ... 1 13 >>> PeriodicTask.objects.get(id=13) <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai> >>> PeriodicTask.objects.get(name='my_task2_crontab') <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
控制台实际操作记录
2.周期性任务的暂停/启动
2.1 设置my_taks2_crontab 暂停任务
>>> my_task2_crontab = PeriodicTask.objects.get(id=13) >>> my_task2_crontab.enabled True >>> my_task2_crontab.enabled=False >>> my_task2_crontab.save()
查看worker输出:
可以看到worker从19:31以后已经没有输出了,说明已经成功吧my_task2_crontab 任务暂停
2.2 设置my_task2_crontab 开启任务
把任务的 enabled 为 True 即可:
>>> my_task2_crontab.enabled False >>> my_task2_crontab.enabled=True >>> my_task2_crontab.save()
查看worker输出:
可以看到worker从19:36开始有输出,说明已把my_task2_crontab 任务重新启动
3. 周期性任务的删除
获取到指定的任务后调用delete(),再次查询指定任务会发现已经不存在了
PeriodicTask.objects.get(name='my_task2_crontab').delete() >>> PeriodicTask.objects.get(name='my_task2_crontab') Traceback (most recent call last): File "<console>", line 1, in <module> File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method return getattr(self.get_queryset(), name)(*args, **kwargs) File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get raise self.model.DoesNotExist( django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!