ホームページ  >  記事  >  バックエンド開発  >  Python Celery を使用してスケジュールされたタスクを動的に追加する方法

Python Celery を使用してスケジュールされたタスクを動的に追加する方法

王林
王林転載
2023-05-13 15:43:062033ブラウズ

    1. 背景

    実際の作業では、電子メールの送信やデータの取得など、スケジュールを設定する必要がある時間のかかる非同期タスクがいくつかあります。スクリプト

    セロリを介してスケジューリングを実装する主なアイデアは、仲介者 Redis を導入し、タスク実行のためにワーカーを開始し、スケジュールされたタスク データ ストレージのためにセロリビートを導入することです

    2. スケジュールされたタスクを動的に追加する Celery の公式ドキュメント

    celery ドキュメント: https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

    celery カスタム スケジューリング クラスの説明:

    カスタム スケジューラ クラスはコマンド ライン (--scheduler パラメーター) で指定できます。

    django-celery-beat ドキュメント: https://pypi. org/project/django-celery-beat/

    django-celery-beat プラグインに関する説明:

    この拡張機能を使用すると、定期的なタスクのスケジュールをデータベースに保存できます。定期的なタスクを管理できます。 Django 管理インターフェイスから、定期的なタスクを作成、編集、削除したり、タスクを実行する頻度を設定したりできます

    3. Celery はシンプルで実用的です

    3.1 基本的な環境構成

    1. 最新バージョンの Django をインストールします

    pip3 install django #当前我安装的版本是 3.0.6

    2. プロジェクトを作成します

    django-admin startproject typeidea
    django-admin startapp blog

    3. celery をインストールします

    pip3 install django-celery
    pip3 install -U Celery 
    pip3 install "celery[librabbitmq,redis,auth,msgpack]" 
    pip3 install django-celery-beat # 用于动态添加定时任务
    pip3 install django-celery-results
    pip3 install redis

    3.2 Celery アプリケーションを使用してテストします

    1. ブログ ディレクトリを作成し、新しいタスクを作成します。py

    まず、Django プロジェクトにブログ フォルダーを作成し、そのブログ フォルダーの下に task.py モジュールを作成します。

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

    task.py コードは次のとおりです:

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: tasks.py
    #Time: 2022/3/30 2:26 下午
    #Author: julius
    """
    from celery import Celery
     
    # 使用redis做为broker
    app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0')
     
    # 创建任务函数
    @app.task
    def my_task():
        print('任务正在执行...')

    Celery の最初のパラメータは、名前を設定することです。2 番目のパラメータは次のとおりです。ここでは仲介者として Redis を使用します。 my_task関数は私たちが作成したタスク関数で、デコレータapp.taskを追加することでブローカーのキューに登録されます。

    2. Redis を起動してワーカーを作成します

    今、ワーカーを作成し、キュー内のタスクの処理を待機しています。

    プロジェクトのルート ディレクトリを入力し、コマンド celery -A celery_tasks.tasks worker -l info

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

    #3. タスクを呼び出します。

    関数をテストし、タスクを作成してタスク キューに追加し、ワーカーを実行してみましょう。

    入力して Python ターミナルを実行し、次のコードを実行します:

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> my_task.delay()
    <AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>

    タスク関数を呼び出すと、AsyncResult オブジェクトが返されます。このオブジェクトは、タスクのステータスを確認したり、戻り値を取得したりするために使用できます。タスクの。

    4. 結果の確認

    ワーカーの端末でタスクの実行ステータスを確認すると、83484dfe-f729-417b-8e51-6c7ae32a1377 タスクが実行されたことがわかります。タスク実行情報の取得

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

    5. タスク実行ステータスの保存と表示

    タスク実行結果の割り当てto ret, and then call result() will generated a DisabledBackend error. バックエンド ストレージが構成されていない場合、タスク実行のステータス情報を保存できないことがわかります。次のセクションでは、バックエンドを構成する方法について説明します。タスクの実行結果を保存する

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> ret=my_task.delay()
    >>> ret.result()

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

    4. タスクの実行結果を保存するようにバックエンドを構成する

    タスクのステータスを追跡したい場合、Celery は保存する必要があります。結果はどこかで。使用可能なストレージ オプションがいくつかあります: SQLAlchemy、Django ORM、Memcached、Redis、RPC (RabbitMQ/AMQP)。

    1. バックエンド パラメーターを追加します

    この例では、結果を保存するソリューションとして Redis を使用し、Celery のバックエンド パラメーターを通じてタスク結果の保存アドレスを設定します。タスク モジュールを次のように変更しました。

    from celery import Celery
     
    # 使用redis作为broker以及backend
    app = Celery(&#39;celery_tasks.tasks&#39;,
                 broker=&#39;redis://127.0.0.1:6379/8&#39;,
                 backend=&#39;redis://127.0.0.1:6379/9&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b):
        print("任务函数正在执行....")
        return a + b

    バックエンド パラメータを Celery に追加し、結果ストレージとして redis を指定し、タスク関数を 2 つのパラメータと戻り値に変更しました。

    2. タスクの呼び出し/タスクの実行結果の表示

    もう一度タスクを呼び出して確認してみましょう。

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> res=my_task.delay(10,40)
    >>> res.result
    50
    >>> res.failed()
    False

    次のように、ワーカーの実行ステータスを見てみましょう:

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

    セロリ タスクが正常に実行されたことがわかります。

    しかし、これは単なる始まりにすぎません。次のステップでは、スケジュールされたタスクを追加する方法を確認します。

    4. Celery ディレクトリ構造の最適化

    上記のコードでは、Celery アプリケーションの作成、構成、およびタスクのタスクがすべて 1 つのファイルに直接書き込まれています。これにより、将来のプロジェクトが大きくなり不便になります。大きめです。これを分解して、一般的に使用されるパラメーターをいくつか追加してみましょう。

    基本的な構造は次のとおりです

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

    ##$ vim typeidea/celery.py (Celery アプリケーション ファイル)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celery.py
    #Time: 2022/3/30 12:25 下午
    #Author: julius
    """
    import os
    from celery import Celery
    from blog import celeryconfig
    project_name=&#39;typeidea&#39;
    # set the default django setting module for the &#39;celery&#39; program
    os.environ.setdefault(&#39;DJANGO_SETTINGS_MODULE&#39;,&#39;typeidea.settings&#39;)
    app = Celery(project_name)
     
    app.config_from_object(&#39;django.conf:settings&#39;)
     
    app.autodiscover_tasks()

    vim blog/celeryconfig.py (配置Celery的参数文件)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celeryconfig.py
    #Time: 2022/3/30 2:54 下午
    #Author: julius
    """
    
    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;

    vim blog/tasks.py (tasks 任务文件)

    import time
    from blog.celery import app
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c

    五、开始使用django-celery-beat调度器

    使用 django-celery-beat 动态添加定时任务  celery 4.x 版本在 django 框架中是使用 django-celery-beat 进行动态添加定时任务的。前面虽然已经安装了这个库,但是还要再说明一下。

    1. 安装 django-celery-beat

    pip3 install django-celery-beat

    2.在项目的 settings 文件配置 django-celery-beat 

    INSTALLED_APPS = [
        &#39;blog&#39;,
        &#39;django_celery_beat&#39;,
        ...
    ]
     
    # Django设置时区
    LANGUAGE_CODE = &#39;zh-hans&#39;  # 使用中国语言
    TIME_ZONE = &#39;Asia/Shanghai&#39;  # 设置Django使用中国上海时间
    # 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用
    # 如果USE_TZ 设置为False,TIME_ZONE = &#39;Asia/Shanghai&#39;, 则使用上海的UTC时间。
    USE_TZ = False

    3. 创建 django-celery-beat 相关表

    执行Django数据库迁移: python manage.py migrate

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

    4. 配置Celery使用 django-celery-beat

    配置 celery.py

    import os
     
    from celery import Celery
     
    from blog import celeryconfig
     
    # 为celery 设置环境变量
    os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings")
    # 创建celery app
    app = Celery(&#39;blog&#39;)
    # 从单独的配置模块中加载配置
    app.config_from_object(celeryconfig)
     
    # 设置app自动加载任务
    app.autodiscover_tasks([
        &#39;blog&#39;,
    ])

    配置 celeryconfig.py

    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;

    编写任务 tasks.py

    import time
    from celery import Celery
    from blog.celery import app
     
    # 使用redis做为broker
    # app = Celery(&#39;blog.tasks2&#39;,broker=&#39;redis://127.0.0.1:6379/0&#39;,backend=&#39;redis://127.0.0.1:6379/1&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c
     
    @app.task
    def my_task2():
        print("任务2函数正在执行....")
        print(&#39;任务2函数休眠10s&#39;)
        time.sleep(10)

    5. 启动定时任务work

    启动定时任务首先需要有一个work执行异步任务,然后再启动一个定时器触发任务。

    启动任务 work

    $ celery -A blog worker -l info

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

    启动定时器触发 beat

    celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

    六、具体操作演练

    6.1 创建基于间隔时间的周期性任务

    1. 初始化周期间隔对象interval 对象

    >>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
    >>> schedule, created = IntervalSchedule.objects.get_or_create( 
    ...       every=10, 
    ...       period=IntervalSchedule.SECONDS, 
    ...  )
    >>> IntervalSchedule.objects.all()
    <QuerySet [<IntervalSchedule: every 10 seconds>]>

    2.创建一个无参数的周期性间隔任务

    >>>PeriodicTask.objects.create(interval=schedule,name=&#39;my_task2&#39;,task=&#39;blog.tasks.my_task2&#39;,)
    <PeriodicTask: my_task2: every 10 seconds>

    beat 调度服务日志显示如下:

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

     worker 服务日志显示如下:

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

    3.创建一个带参数的周期性间隔任务

    >>> PeriodicTask.objects.create(interval=schedule,name=&#39;my_task&#39;,task=&#39;blog.tasks.my_task&#39;,args=json.dumps([10,20,30]))
    <PeriodicTask: my_task: every 10 seconds>

    beat 调度服务日志结果:

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

     worker 服务日志结果:

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

    4.如何高并发执行任务

    需要并行执行任务的时候,就需要设置多个worker来执行任务。 

    6.2 创建一个不带参数的周期性间隔任务

    1.初始化 crontab 的调度对象

    >>> import pytz
    >>> schedule, _ = CrontabSchedule.objects.get_or_create(
    ... minute=&#39;*&#39;,
    ... hour=&#39;*&#39;,
    ... day_of_week=&#39;*&#39;,
    ... day_of_month=&#39;*&#39;,
    ... timezone=pytz.timezone(&#39;Asia/Shanghai&#39;)
    ... )

    2. 创建不带参数的定时任务

    PeriodicTask.objects.create(crontab=schedule,name=&#39;my_task2_crontab&#39;,task=&#39;blog.tasks.my_task2&#39;,)

    beat 调度服务执行结果 

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

     worker 执行服务结果

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

    6.3 周期性任务的查询、删除操作

    1. 周期性任务的查询

    >>> PeriodicTask.objects.all()
    <ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> for task in PeriodicTask.objects.all():
    ...     print(task.id)
    ... 
    1
    13
    >>> PeriodicTask.objects.get(id=13)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>

     控制台实际操作记录

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

    2.周期性任务的暂停/启动

    2.1 设置my_taks2_crontab 暂停任务

    >>> my_task2_crontab = PeriodicTask.objects.get(id=13)
    >>> my_task2_crontab.enabled
    True
    >>> my_task2_crontab.enabled=False
    >>> my_task2_crontab.save()

    查看worker输出:

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

     可以看到worker从19:31以后已经没有输出了,说明已经成功吧my_task2_crontab 任务暂停

    2.2 设置my_task2_crontab 开启任务

    把任务的 enabled 为 True 即可:

    >>> my_task2_crontab.enabled
    False
    >>> my_task2_crontab.enabled=True
    >>> my_task2_crontab.save()

    查看worker输出:

    Python Celery を使用してスケジュールされたタスクを動的に追加する方法

     可以看到worker从19:36开始有输出,说明已把my_task2_crontab 任务重新启动

    3. 周期性任务的删除

    获取到指定的任务后调用delete(),再次查询指定任务会发现已经不存在了

    PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;).delete()
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    Traceback (most recent call last):
      File "<console>", line 1, in <module>
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method
        return getattr(self.get_queryset(), name)(*args, **kwargs)
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get
        raise self.model.DoesNotExist(
    django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.

    以上がPython Celery を使用してスケジュールされたタスクを動的に追加する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。