Heim >Backend-Entwicklung >Python-Tutorial >So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen
Bei der tatsächlichen Arbeit gibt es einige zeitaufwändige asynchrone Aufgaben, die geplant werden müssen. B. das Senden von E-Mails, das Abrufen von Daten und das Ausführen geplanter Skripte Aufgabendatenspeicherung
2. Offizielle Dokumentation für Celery zum dynamischen Hinzufügen geplanter Aufgaben
django-celery-beat Dokumentation: https://pypi.org/project/django-celery-beat/
Anleitung zum Django-celery-beat Plug-in:
Diese Erweiterung ermöglicht Ihnen das Speichern periodischer Aufgabenpläne in der Datenbank. Periodische Aufgaben können über die Django-Administratoroberfläche verwaltet werden, wo Sie periodische Aufgaben erstellen, bearbeiten und löschen und festlegen können, wie oft sie ausgeführt werden sollen # 🎜🎜#
3. Sellerie ist einfach und praktisch 3.1 Grundlegende Umgebungskonfigurationpip3 install django #当前我安装的版本是 3.0.6
django-admin startproject typeidea django-admin startapp blog
3. Installieren Sie Selleriepip3 install django-celery
pip3 install -U Celery
pip3 install "celery[librabbitmq,redis,auth,msgpack]"
pip3 install django-celery-beat # 用于动态添加定时任务
pip3 install django-celery-results
pip3 install redis
3.2 Testen Sie mit der Celery-Anwendung
Der task.py-Code lautet wie folgt: #!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
#File: tasks.py
#Time: 2022/3/30 2:26 下午
#Author: julius
"""
from celery import Celery
# 使用redis做为broker
app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0')
# 创建任务函数
@app.task
def my_task():
print('任务正在执行...')
Celery Der erste Parameter dient zum Festlegen eines Namens Der zweite Parameter besteht darin, einen Zwischenhändler festzulegen. Hier verwenden wir Redis als Zwischenhändler. Die Funktion my_task ist eine von uns geschriebene Aufgabenfunktion, die durch Hinzufügen des Dekorators app.task in der Warteschlange des Brokers registriert wird.
#🎜🎜 #3. Rufen Sie die Aufgabe auf
Lassen Sie uns die Funktion testen, eine Aufgabe erstellen, sie zur Aufgabenwarteschlange hinzufügen und die Ausführung durch den Arbeiter bereitstellen. Geben Sie das Python-Terminal ein und führen Sie den folgenden Code aus:$ python manage.py shell >>> from blog.tasks import my_task >>> my_task.delay() <AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>Der Aufruf einer Aufgabenfunktion gibt ein AsyncResult-Objekt zurück, mit dem der Status der Aufgabe überprüft oder abgerufen werden kann Der Rückgabewert der Aufgabe.
4. Sehen Sie sich die Ergebnisse an
Überprüfen Sie den Aufgabenausführungsstatus auf dem Terminal des Arbeiters und Sie können das 83484dfe-f729-417b-8e51 sehen wurde empfangen -6c7ae32a1377 Aufgabe und druckte die Aufgabenausführungsinformationen
5 Speichern und zeigen Sie den Aufgabenausführungsstatus anWenn Sie das Ergebnis der Aufgabenausführung ret zuweisen und dann result() aufrufen, wird ein DisabledBackend-Fehler generiert. Es ist ersichtlich, dass die Statusinformationen der Aufgabenausführung nicht gespeichert werden können, wenn der Backend-Speicher nicht konfiguriert ist Im nächsten Abschnitt werden wir darüber sprechen, wie Sie den Backend-Speicher konfigurieren
$ python manage.py shell >>> from blog.tasks import my_task >>> ret=my_task.delay() >>> ret.result()
4 #Wenn wir den Status der Aufgabe verfolgen möchten, muss Celery die Ergebnisse irgendwo speichern. Es stehen mehrere Speicheroptionen zur Verfügung: SQLAlchemy, Django ORM, Memcached, Redis, RPC (RabbitMQ/AMQP).
1. Fügen Sie den Backend-Parameter
hinzu. In diesem Beispiel verwenden wir Redis als Lösung zum Speichern von Ergebnissen und legen ihn über den Backend-Parameter von Celery fest Speicheradresse für Aufgabenergebnisse. Wir haben das Aufgabenmodul wie folgt geändert:
from celery import Celery # 使用redis作为broker以及backend app = Celery('celery_tasks.tasks', broker='redis://127.0.0.1:6379/8', backend='redis://127.0.0.1:6379/9') # 创建任务函数 @app.task def my_task(a, b): print("任务函数正在执行....") return a + b
Den Backend-Parameter zu Celery hinzugefügt, redis als Ergebnisspeicher angegeben und die Aufgabenfunktion in zwei Parameter und einen Rückgabewert geändert.
$ python manage.py shell >>> from blog.tasks import my_task >>> res=my_task.delay(10,40) >>> res.result 50 >>> res.failed() False
Sehen wir uns die Ausführung des Arbeiters wie folgt an:
Sie können sehen, dass die Sellerieaufgabe wurde erfolgreich ausgeführt. Aber das ist erst der Anfang. Der nächste Schritt besteht darin, zu sehen, wie man geplante Aufgaben hinzufügt. 4. Optimieren Sie die Celery-Verzeichnisstruktur
Das Obige schreibt alle Aufgaben zur Erstellung, Konfiguration und Aufgaben von Celery direkt in eine Datei, sodass das Projekt immer größer wird später ist es auch unbequem. Lassen Sie es uns aufschlüsseln und einige häufig verwendete Parameter hinzufügen.
Die Grundstruktur ist wie folgt$ vim typeidea/celery.py (Sellerie-Anwendungsdatei)
#!/usr/bin/env python # -*- coding: UTF-8 -*- """ #File: celery.py #Time: 2022/3/30 12:25 下午 #Author: julius """ import os from celery import Celery from blog import celeryconfig project_name='typeidea' # set the default django setting module for the 'celery' program os.environ.setdefault('DJANGO_SETTINGS_MODULE','typeidea.settings') app = Celery(project_name) app.config_from_object('django.conf:settings') app.autodiscover_tasks()
vim blog/celeryconfig.py (配置Celery的参数文件)
#!/usr/bin/env python # -*- coding: UTF-8 -*- """ #File: celeryconfig.py #Time: 2022/3/30 2:54 下午 #Author: julius """ # 设置结果存储 from typeidea import settings import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings") CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 设置代理人broker BROKER_URL = 'redis://127.0.0.1:6379/1' # celery 的启动工作数量设置 CELERY_WORKER_CONCURRENCY = 20 # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。 CELERYD_PREFETCH_MULTIPLIER = 20 # 非常重要,有些情况下可以防止死锁 CELERYD_FORCE_EXECV = True # celery 的 worker 执行多少个任务后进行重启操作 CELERY_WORKER_MAX_TASKS_PER_CHILD = 100 # 禁用所有速度限制,如果网络资源有限,不建议开足马力。 CELERY_DISABLE_RATE_LIMITS = True CELERY_ENABLE_UTC = False CELERY_TIMEZONE = settings.TIME_ZONE DJANGO_CELERY_BEAT_TZ_AWARE = False CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
vim blog/tasks.py (tasks 任务文件)
import time from blog.celery import app # 创建任务函数 @app.task def my_task(a, b, c): print('任务正在执行...') print('任务1函数休眠10s') time.sleep(10) return a + b + c
使用 django-celery-beat 动态添加定时任务 celery 4.x 版本在 django 框架中是使用 django-celery-beat 进行动态添加定时任务的。前面虽然已经安装了这个库,但是还要再说明一下。
1. 安装 django-celery-beat
pip3 install django-celery-beat
2.在项目的 settings 文件配置 django-celery-beat
INSTALLED_APPS = [ 'blog', 'django_celery_beat', ... ] # Django设置时区 LANGUAGE_CODE = 'zh-hans' # 使用中国语言 TIME_ZONE = 'Asia/Shanghai' # 设置Django使用中国上海时间 # 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用 # 如果USE_TZ 设置为False,TIME_ZONE = 'Asia/Shanghai', 则使用上海的UTC时间。 USE_TZ = False
3. 创建 django-celery-beat 相关表
执行Django数据库迁移: python manage.py migrate
4. 配置Celery使用 django-celery-beat
配置 celery.py
import os from celery import Celery from blog import celeryconfig # 为celery 设置环境变量 os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings") # 创建celery app app = Celery('blog') # 从单独的配置模块中加载配置 app.config_from_object(celeryconfig) # 设置app自动加载任务 app.autodiscover_tasks([ 'blog', ])
配置 celeryconfig.py
# 设置结果存储 from typeidea import settings import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings") CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 设置代理人broker BROKER_URL = 'redis://127.0.0.1:6379/1' # celery 的启动工作数量设置 CELERY_WORKER_CONCURRENCY = 20 # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。 CELERYD_PREFETCH_MULTIPLIER = 20 # 非常重要,有些情况下可以防止死锁 CELERYD_FORCE_EXECV = True # celery 的 worker 执行多少个任务后进行重启操作 CELERY_WORKER_MAX_TASKS_PER_CHILD = 100 # 禁用所有速度限制,如果网络资源有限,不建议开足马力。 CELERY_DISABLE_RATE_LIMITS = True CELERY_ENABLE_UTC = False CELERY_TIMEZONE = settings.TIME_ZONE DJANGO_CELERY_BEAT_TZ_AWARE = False CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
编写任务 tasks.py
import time from celery import Celery from blog.celery import app # 使用redis做为broker # app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0',backend='redis://127.0.0.1:6379/1') # 创建任务函数 @app.task def my_task(a, b, c): print('任务正在执行...') print('任务1函数休眠10s') time.sleep(10) return a + b + c @app.task def my_task2(): print("任务2函数正在执行....") print('任务2函数休眠10s') time.sleep(10)
5. 启动定时任务work
启动定时任务首先需要有一个work执行异步任务,然后再启动一个定时器触发任务。
启动任务 work
$ celery -A blog worker -l info
启动定时器触发 beat
celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
1. 初始化周期间隔对象interval
对象
>>> from django_celery_beat.models import PeriodicTask, IntervalSchedule >>> schedule, created = IntervalSchedule.objects.get_or_create( ... every=10, ... period=IntervalSchedule.SECONDS, ... ) >>> IntervalSchedule.objects.all() <QuerySet [<IntervalSchedule: every 10 seconds>]>
2.创建一个无参数的周期性间隔任务
>>>PeriodicTask.objects.create(interval=schedule,name='my_task2',task='blog.tasks.my_task2',) <PeriodicTask: my_task2: every 10 seconds>
beat 调度服务日志显示如下:
worker 服务日志显示如下:
3.创建一个带参数的周期性间隔任务
>>> PeriodicTask.objects.create(interval=schedule,name='my_task',task='blog.tasks.my_task',args=json.dumps([10,20,30])) <PeriodicTask: my_task: every 10 seconds>
beat 调度服务日志结果:
worker 服务日志结果:
4.如何高并发执行任务
需要并行执行任务的时候,就需要设置多个worker
来执行任务。
1.初始化 crontab
的调度对象
>>> import pytz >>> schedule, _ = CrontabSchedule.objects.get_or_create( ... minute='*', ... hour='*', ... day_of_week='*', ... day_of_month='*', ... timezone=pytz.timezone('Asia/Shanghai') ... )
2. 创建不带参数的定时任务
PeriodicTask.objects.create(crontab=schedule,name='my_task2_crontab',task='blog.tasks.my_task2',)
beat 调度服务执行结果
worker 执行服务结果
1. 周期性任务的查询
>>> PeriodicTask.objects.all() <ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]> >>> PeriodicTask.objects.get(name='my_task2_crontab') <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai> >>> for task in PeriodicTask.objects.all(): ... print(task.id) ... 1 13 >>> PeriodicTask.objects.get(id=13) <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai> >>> PeriodicTask.objects.get(name='my_task2_crontab') <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
控制台实际操作记录
2.周期性任务的暂停/启动
2.1 设置my_taks2_crontab 暂停任务
>>> my_task2_crontab = PeriodicTask.objects.get(id=13) >>> my_task2_crontab.enabled True >>> my_task2_crontab.enabled=False >>> my_task2_crontab.save()
查看worker输出:
可以看到worker从19:31以后已经没有输出了,说明已经成功吧my_task2_crontab 任务暂停
2.2 设置my_task2_crontab 开启任务
把任务的 enabled 为 True 即可:
>>> my_task2_crontab.enabled False >>> my_task2_crontab.enabled=True >>> my_task2_crontab.save()
查看worker输出:
可以看到worker从19:36开始有输出,说明已把my_task2_crontab 任务重新启动
3. 周期性任务的删除
获取到指定的任务后调用delete(),再次查询指定任务会发现已经不存在了
PeriodicTask.objects.get(name='my_task2_crontab').delete() >>> PeriodicTask.objects.get(name='my_task2_crontab') Traceback (most recent call last): File "<console>", line 1, in <module> File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method return getattr(self.get_queryset(), name)(*args, **kwargs) File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get raise self.model.DoesNotExist( django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.
Das obige ist der detaillierte Inhalt vonSo verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!