>백엔드 개발 >파이썬 튜토리얼 >Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

王林
王林앞으로
2023-05-13 15:43:062127검색

    1. 배경

    실제 작업에서는 이메일 보내기, 데이터 가져오기, 예약된 스크립트 실행 등 예약해야 하는 시간이 많이 걸리는 비동기 작업이 있을 것입니다

    구현의 주요 아이디어 celery를 통한 예약은 중간자 redis를 도입하고 작업 실행을 위해 작업자를 시작하며 celery-beat는 예약된 작업 데이터 저장을 수행합니다

    2. 예약된 작업을 동적으로 추가하기 위한 Celery 공식 문서

    celery 문서: https://docs.celeryproject. org/en/latest/userguide/ periodic-tasks.html#beat-custom-schedulers

    celery 사용자 정의 스케줄링 클래스 설명:

    사용자 정의 스케줄러 클래스는 명령줄에서 지정할 수 있습니다(--scheduler 매개변수)

    django-celery -beat 문서: https:// pypi.org/project/django-celery-beat/

    django-celery-beat 플러그인에 대한 참고 사항:

    이 확장 기능을 사용하면 주기적인 작업 일정을 데이터베이스에 저장할 수 있습니다. Django 관리 인터페이스에서 관리할 수 있습니다. 정기적인 작업과 실행 빈도를 생성, 편집 및 삭제할 수 있습니다

    3. Celery는 간단하고 실용적입니다.

    3.1 기본 환경 구성

    1 최신 버전의 Django를 설치합니다.

    pip3 install django #当前我安装的版本是 3.0.6

    2. 프로젝트 만들기

    django-admin startproject typeidea
    django-admin startapp blog

    3. celery 설치

    pip3 install django-celery
    pip3 install -U Celery 
    pip3 install "celery[librabbitmq,redis,auth,msgpack]" 
    pip3 install django-celery-beat # 用于动态添加定时任务
    pip3 install django-celery-results
    pip3 install redis

    3.2 Celery 애플리케이션을 사용하여 테스트

    1. 블로그 디렉터리를 만들고 task.py

    먼저 블로그 폴더를 만듭니다. Django 프로젝트를 실행하고 blog 폴더 아래에 작업을 생성합니다.

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

    task.py 코드는 다음과 같습니다.

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: tasks.py
    #Time: 2022/3/30 2:26 下午
    #Author: julius
    """
    from celery import Celery
     
    # 使用redis做为broker
    app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0')
     
    # 创建任务函数
    @app.task
    def my_task():
        print('任务正在执行...')

    Celery 첫 번째 매개변수는 이름을 설정하는 것입니다. 중개인 브로커를 설정합니다. 여기서는 Redis를 중개자로 사용합니다. my_task 함수는 우리가 작성한 task 함수로 app.task 데코레이터를 추가하면 브로커의 대기열에 등록됩니다.

    2. Redis 시작 및 작업자 생성

    이제 작업자를 생성하고 대기열에서 작업을 처리하기를 기다리고 있습니다.

    프로젝트의 루트 디렉터리에 들어가서 다음 명령을 실행합니다: celery -A celery_tasks.tasks 작업자 -l info

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

    3 작업 호출

    함수를 테스트하고 작업을 생성하고 추가해 보겠습니다. 작업 대기열을 생성하고 작업자 실행을 제공합니다.

    파이썬 터미널에 들어가서 다음 코드를 실행하세요:

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> my_task.delay()
    <AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>

    작업 함수를 호출하면 AsyncResult 개체가 반환됩니다. 이 개체는 작업 상태를 확인하거나 작업의 반환 값을 얻는 데 사용할 수 있습니다.

    4. 결과 확인

    작업자 터미널에서 작업 실행 상태를 확인하면 83484dfe-f729-417b-8e51-6c7ae32a1377 작업이 수신되고 작업 실행 정보가 인쇄된 것을 확인할 수 있습니다

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

    5. 작업 실행 상태 저장 및 보기

    작업 실행 결과를 ret에 할당한 후 result()를 호출하면 비활성화된Backend 오류가 발생하는 경우를 볼 수 있습니다. 백엔드 저장소가 구성되지 않았습니다. 다음 섹션에서 설명하겠습니다. 작업 실행 결과를 저장하도록 백엔드를 구성하는 방법으로 이동하세요

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> ret=my_task.delay()
    >>> ret.result()

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

    4. 작업 실행 결과를 저장하도록 백엔드를 구성하세요

    작업 상태에 따라 Celery는 결과를 어딘가에 저장해야 합니다. SQLAlchemy, Django ORM, Memcached, Redis, RPC(RabbitMQ/AMQP) 등 여러 가지 스토리지 옵션을 사용할 수 있습니다.

    1. 백엔드 매개변수 추가

    이 예에서는 Redis를 결과 저장 솔루션으로 사용하고 Celery의 백엔드 매개변수를 통해 작업 결과 저장 주소를 설정합니다. 작업 모듈을 다음과 같이 수정했습니다.

    from celery import Celery
     
    # 使用redis作为broker以及backend
    app = Celery(&#39;celery_tasks.tasks&#39;,
                 broker=&#39;redis://127.0.0.1:6379/8&#39;,
                 backend=&#39;redis://127.0.0.1:6379/9&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b):
        print("任务函数正在执行....")
        return a + b

    Celery에 백엔드 매개변수를 추가하고, 결과 저장소로 redis를 지정하고, 작업 함수를 두 개의 매개변수와 반환 값으로 수정했습니다.

    2. 태스크 호출/태스크 실행 결과 보기

    태스크를 다시 호출해서 살펴보겠습니다.

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> res=my_task.delay(10,40)
    >>> res.result
    50
    >>> res.failed()
    False

    다음과 같이 워커의 실행을 살펴보겠습니다.

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

    셀러리 작업이 성공적으로 실행된 것을 확인할 수 있습니다.

    하지만 이것은 시작일 뿐이며 다음 단계는 예약된 작업을 추가하는 방법을 살펴보는 것입니다.

    4. Celery 디렉토리 구조 최적화

    위에서는 모든 Celery 애플리케이션 생성, 구성 및 작업 작업을 하나의 파일에 직접 작성하므로 나중에 프로젝트가 점점 커지는 데 불편을 겪습니다. 이를 분석하고 일반적으로 사용되는 몇 가지 매개변수를 추가해 보겠습니다.

    기본 구조는 다음과 같습니다

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

    $ vim typeidea/celery.py (셀러리 애플리케이션 파일)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celery.py
    #Time: 2022/3/30 12:25 下午
    #Author: julius
    """
    import os
    from celery import Celery
    from blog import celeryconfig
    project_name=&#39;typeidea&#39;
    # set the default django setting module for the &#39;celery&#39; program
    os.environ.setdefault(&#39;DJANGO_SETTINGS_MODULE&#39;,&#39;typeidea.settings&#39;)
    app = Celery(project_name)
     
    app.config_from_object(&#39;django.conf:settings&#39;)
     
    app.autodiscover_tasks()

    vim blog/celeryconfig.py (配置Celery的参数文件)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celeryconfig.py
    #Time: 2022/3/30 2:54 下午
    #Author: julius
    """
    
    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;

    vim blog/tasks.py (tasks 任务文件)

    import time
    from blog.celery import app
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c

    五、开始使用django-celery-beat调度器

    使用 django-celery-beat 动态添加定时任务  celery 4.x 版本在 django 框架中是使用 django-celery-beat 进行动态添加定时任务的。前面虽然已经安装了这个库,但是还要再说明一下。

    1. 安装 django-celery-beat

    pip3 install django-celery-beat

    2.在项目的 settings 文件配置 django-celery-beat 

    INSTALLED_APPS = [
        &#39;blog&#39;,
        &#39;django_celery_beat&#39;,
        ...
    ]
     
    # Django设置时区
    LANGUAGE_CODE = &#39;zh-hans&#39;  # 使用中国语言
    TIME_ZONE = &#39;Asia/Shanghai&#39;  # 设置Django使用中国上海时间
    # 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用
    # 如果USE_TZ 设置为False,TIME_ZONE = &#39;Asia/Shanghai&#39;, 则使用上海的UTC时间。
    USE_TZ = False

    3. 创建 django-celery-beat 相关表

    执行Django数据库迁移: python manage.py migrate

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

    4. 配置Celery使用 django-celery-beat

    配置 celery.py

    import os
     
    from celery import Celery
     
    from blog import celeryconfig
     
    # 为celery 设置环境变量
    os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings")
    # 创建celery app
    app = Celery(&#39;blog&#39;)
    # 从单独的配置模块中加载配置
    app.config_from_object(celeryconfig)
     
    # 设置app自动加载任务
    app.autodiscover_tasks([
        &#39;blog&#39;,
    ])

    配置 celeryconfig.py

    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;

    编写任务 tasks.py

    import time
    from celery import Celery
    from blog.celery import app
     
    # 使用redis做为broker
    # app = Celery(&#39;blog.tasks2&#39;,broker=&#39;redis://127.0.0.1:6379/0&#39;,backend=&#39;redis://127.0.0.1:6379/1&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c
     
    @app.task
    def my_task2():
        print("任务2函数正在执行....")
        print(&#39;任务2函数休眠10s&#39;)
        time.sleep(10)

    5. 启动定时任务work

    启动定时任务首先需要有一个work执行异步任务,然后再启动一个定时器触发任务。

    启动任务 work

    $ celery -A blog worker -l info

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

    启动定时器触发 beat

    celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

    六、具体操作演练

    6.1 创建基于间隔时间的周期性任务

    1. 初始化周期间隔对象interval 对象

    >>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
    >>> schedule, created = IntervalSchedule.objects.get_or_create( 
    ...       every=10, 
    ...       period=IntervalSchedule.SECONDS, 
    ...  )
    >>> IntervalSchedule.objects.all()
    <QuerySet [<IntervalSchedule: every 10 seconds>]>

    2.创建一个无参数的周期性间隔任务

    >>>PeriodicTask.objects.create(interval=schedule,name=&#39;my_task2&#39;,task=&#39;blog.tasks.my_task2&#39;,)
    <PeriodicTask: my_task2: every 10 seconds>

    beat 调度服务日志显示如下:

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

     worker 服务日志显示如下:

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

    3.创建一个带参数的周期性间隔任务

    >>> PeriodicTask.objects.create(interval=schedule,name=&#39;my_task&#39;,task=&#39;blog.tasks.my_task&#39;,args=json.dumps([10,20,30]))
    <PeriodicTask: my_task: every 10 seconds>

    beat 调度服务日志结果:

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

     worker 服务日志结果:

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

    4.如何高并发执行任务

    需要并行执行任务的时候,就需要设置多个worker来执行任务。 

    6.2 创建一个不带参数的周期性间隔任务

    1.初始化 crontab 的调度对象

    >>> import pytz
    >>> schedule, _ = CrontabSchedule.objects.get_or_create(
    ... minute=&#39;*&#39;,
    ... hour=&#39;*&#39;,
    ... day_of_week=&#39;*&#39;,
    ... day_of_month=&#39;*&#39;,
    ... timezone=pytz.timezone(&#39;Asia/Shanghai&#39;)
    ... )

    2. 创建不带参数的定时任务

    PeriodicTask.objects.create(crontab=schedule,name=&#39;my_task2_crontab&#39;,task=&#39;blog.tasks.my_task2&#39;,)

    beat 调度服务执行结果 

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

     worker 执行服务结果

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

    6.3 周期性任务的查询、删除操作

    1. 周期性任务的查询

    >>> PeriodicTask.objects.all()
    <ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> for task in PeriodicTask.objects.all():
    ...     print(task.id)
    ... 
    1
    13
    >>> PeriodicTask.objects.get(id=13)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>

     控制台实际操作记录

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

    2.周期性任务的暂停/启动

    2.1 设置my_taks2_crontab 暂停任务

    >>> my_task2_crontab = PeriodicTask.objects.get(id=13)
    >>> my_task2_crontab.enabled
    True
    >>> my_task2_crontab.enabled=False
    >>> my_task2_crontab.save()

    查看worker输出:

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

     可以看到worker从19:31以后已经没有输出了,说明已经成功吧my_task2_crontab 任务暂停

    2.2 设置my_task2_crontab 开启任务

    把任务的 enabled 为 True 即可:

    >>> my_task2_crontab.enabled
    False
    >>> my_task2_crontab.enabled=True
    >>> my_task2_crontab.save()

    查看worker输出:

    Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법

     可以看到worker从19:36开始有输出,说明已把my_task2_crontab 任务重新启动

    3. 周期性任务的删除

    获取到指定的任务后调用delete(),再次查询指定任务会发现已经不存在了

    PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;).delete()
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    Traceback (most recent call last):
      File "<console>", line 1, in <module>
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method
        return getattr(self.get_queryset(), name)(*args, **kwargs)
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get
        raise self.model.DoesNotExist(
    django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.

    위 내용은 Python Celery를 사용하여 예약된 작업을 동적으로 추가하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    성명:
    이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제