Rumah >pembangunan bahagian belakang >Tutorial Python >Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

王林
王林ke hadapan
2023-05-13 15:43:062126semak imbas

    1. Latar Belakang

    Dalam kerja sebenar, akan ada beberapa tugas tak segerak yang memakan masa yang perlu dijadualkan, seperti menghantar e-mel, menarik data , dan melaksanakan tugas berjadual

    Idea utama melaksanakan penjadualan melalui saderi adalah untuk memperkenalkan redis orang tengah, memulakan pekerja untuk melaksanakan tugas, dan celery-beat untuk penyimpanan data tugas berjadual

    2. Dokumentasi rasmi untuk Celery untuk menambahkan tugas berjadual secara dinamik

    dokumentasi saderi: https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

    keterangan kelas penjadualan tersuai saderi:

    Kelas penjadual tersuai boleh ditentukan pada baris arahan (--parameter penjadual)

    dokumentasi django-celery-beat: https://pypi .org/project/django-celery-beat/

    Nota pada pemalam django-celery-beat:

    Pelanjutan ini membolehkan anda menyimpan jadual tugasan berkala dalam pangkalan data diuruskan daripada antara muka pentadbir Django, di mana anda boleh Mencipta, mengedit dan memadamkan tugasan berkala dan kekerapan tugasan itu perlu dijalankan

    3 Saderi adalah mudah dan praktikal

    3.1 Konfigurasi persekitaran asas

    <.>

    1. Pasang versi terkini Django

    pip3 install django #当前我安装的版本是 3.0.6

    2. Buat projek

    django-admin startproject typeidea
    django-admin startapp blog

    3 🎜>

    1. Buat direktori blog, buat tasks.py baharu

    Mula-mula buat folder blog dalam projek Django, dan buat modul tasks.py di bawah folder blog, seperti berikut:

    Kod tasks.py adalah seperti berikut:

    pip3 install django-celery
    pip3 install -U Celery 
    pip3 install "celery[librabbitmq,redis,auth,msgpack]" 
    pip3 install django-celery-beat # 用于动态添加定时任务
    pip3 install django-celery-results
    pip3 install redis
    Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamikParameter pertama Saderi ialah menetapkan nama untuknya adalah untuk menetapkan broker orang tengah Di sini kita menggunakan Redis sebagai orang tengah. Fungsi my_task ialah fungsi tugas yang kami tulis Dengan menambahkan app.task penghias, ia didaftarkan dalam baris gilir broker.

    2. Mulakan redis dan cipta pekerja

    Sekarang kami sedang mencipta pekerja dan menunggu untuk memproses tugasan dalam baris gilir. Masukkan direktori akar projek dan laksanakan arahan: celery -A celery_tasks.tasks worker -l info

    3 tugasanCara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

    Mari kita uji fungsi, buat tugasan, tambahkannya pada baris gilir tugasan dan sediakan pelaksanaan pekerja. Masukkan terminal python dan laksanakan kod berikut:

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: tasks.py
    #Time: 2022/3/30 2:26 下午
    #Author: julius
    """
    from celery import Celery
     
    # 使用redis做为broker
    app = Celery(&#39;blog.tasks2&#39;,broker=&#39;redis://127.0.0.1:6379/0&#39;)
     
    # 创建任务函数
    @app.task
    def my_task():
        print(&#39;任务正在执行...&#39;)

    Panggil fungsi tugas dan objek AsyncResult akan dikembalikan mengembalikan nilai tugas.

    4 Lihat hasil

    Semak status pelaksanaan tugas pada terminal pekerja Anda boleh melihat bahawa tugasan 83484dfe-f729-417b-8e51-6c7ae32a1377. diterima dan dicetak. Dapatkan maklumat pelaksanaan tugas

    5. untuk ret, dan kemudian memanggil hasil () akan menjana ralat DisabledBackend Ia boleh dilihat bahawa maklumat status pelaksanaan tugas tidak boleh disimpan apabila storan bahagian belakang tidak dikonfigurasikan simpan keputusan pelaksanaan tugasCara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> my_task.delay()
    <AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>

    4 Konfigurasi bahagian belakang untuk menyimpan hasil pelaksanaan tugas

    Jika kita ingin menjejaki status tugas, Celery perlu menyimpan. hasilnya di suatu tempat. Terdapat beberapa pilihan storan yang tersedia: SQLAlchemy, Django ORM, Memcached, Redis, RPC (RabbitMQ/AMQP).

    Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik1. Tambahkan parameter hujung belakang

    Dalam contoh ini, kami menggunakan Redis sebagai penyelesaian untuk menyimpan hasil dan menetapkan alamat storan hasil tugasan melalui parameter hujung belakang Celery. Kami mengubah suai modul tugas seperti berikut:

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> ret=my_task.delay()
    >>> ret.result()

    menambahkan parameter hujung belakang pada Celery, menetapkan redis sebagai storan hasil dan mengubah suai fungsi tugasan kepada dua parameter dan nilai pulangan.

    2. Panggil tugasan/Lihat hasil pelaksanaan tugas

    Mari kita panggil tugas itu semula dan lihat.

    from celery import Celery
     
    # 使用redis作为broker以及backend
    app = Celery(&#39;celery_tasks.tasks&#39;,
                 broker=&#39;redis://127.0.0.1:6379/8&#39;,
                 backend=&#39;redis://127.0.0.1:6379/9&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b):
        print("任务函数正在执行....")
        return a + b

    Mari kita lihat pelaksanaan pekerja, seperti berikut:

    Anda dapat melihat bahawa tugas saderi telah dilaksanakan dengan jayanya.

    Tetapi ini hanyalah permulaan, langkah seterusnya ialah melihat cara menambah tugasan yang dijadualkan.

    4. Optimumkan struktur direktori Saderi Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

    Di atas secara langsung menulis semua tugasan, konfigurasi dan tugasan aplikasi Saderi dalam satu fail Ini akan menyusahkan projek untuk menjadi lebih besar dan lebih besar nanti. Mari pecahkannya dan tambah beberapa parameter yang biasa digunakan.

    Struktur asas adalah seperti berikut

    $ vim typeidea/celery.py (fail aplikasi Saderi)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celery.py
    #Time: 2022/3/30 12:25 下午
    #Author: julius
    """
    import os
    from celery import Celery
    from blog import celeryconfig
    project_name=&#39;typeidea&#39;
    # set the default django setting module for the &#39;celery&#39; program
    os.environ.setdefault(&#39;DJANGO_SETTINGS_MODULE&#39;,&#39;typeidea.settings&#39;)
    app = Celery(project_name)
     
    app.config_from_object(&#39;django.conf:settings&#39;)
     
    app.autodiscover_tasks()

    vim blog/celeryconfig.py (配置Celery的参数文件)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celeryconfig.py
    #Time: 2022/3/30 2:54 下午
    #Author: julius
    """
    
    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;

    vim blog/tasks.py (tasks 任务文件)

    import time
    from blog.celery import app
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c

    五、开始使用django-celery-beat调度器

    使用 django-celery-beat 动态添加定时任务  celery 4.x 版本在 django 框架中是使用 django-celery-beat 进行动态添加定时任务的。前面虽然已经安装了这个库,但是还要再说明一下。

    1. 安装 django-celery-beat

    pip3 install django-celery-beat

    2.在项目的 settings 文件配置 django-celery-beat 

    INSTALLED_APPS = [
        &#39;blog&#39;,
        &#39;django_celery_beat&#39;,
        ...
    ]
     
    # Django设置时区
    LANGUAGE_CODE = &#39;zh-hans&#39;  # 使用中国语言
    TIME_ZONE = &#39;Asia/Shanghai&#39;  # 设置Django使用中国上海时间
    # 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用
    # 如果USE_TZ 设置为False,TIME_ZONE = &#39;Asia/Shanghai&#39;, 则使用上海的UTC时间。
    USE_TZ = False

    3. 创建 django-celery-beat 相关表

    执行Django数据库迁移: python manage.py migrate

    Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

    4. 配置Celery使用 django-celery-beat

    配置 celery.py

    import os
     
    from celery import Celery
     
    from blog import celeryconfig
     
    # 为celery 设置环境变量
    os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings")
    # 创建celery app
    app = Celery(&#39;blog&#39;)
    # 从单独的配置模块中加载配置
    app.config_from_object(celeryconfig)
     
    # 设置app自动加载任务
    app.autodiscover_tasks([
        &#39;blog&#39;,
    ])

    配置 celeryconfig.py

    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;

    编写任务 tasks.py

    import time
    from celery import Celery
    from blog.celery import app
     
    # 使用redis做为broker
    # app = Celery(&#39;blog.tasks2&#39;,broker=&#39;redis://127.0.0.1:6379/0&#39;,backend=&#39;redis://127.0.0.1:6379/1&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c
     
    @app.task
    def my_task2():
        print("任务2函数正在执行....")
        print(&#39;任务2函数休眠10s&#39;)
        time.sleep(10)

    5. 启动定时任务work

    启动定时任务首先需要有一个work执行异步任务,然后再启动一个定时器触发任务。

    启动任务 work

    $ celery -A blog worker -l info

    Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

    启动定时器触发 beat

    celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

    Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

    六、具体操作演练

    6.1 创建基于间隔时间的周期性任务

    1. 初始化周期间隔对象interval 对象

    >>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
    >>> schedule, created = IntervalSchedule.objects.get_or_create( 
    ...       every=10, 
    ...       period=IntervalSchedule.SECONDS, 
    ...  )
    >>> IntervalSchedule.objects.all()
    <QuerySet [<IntervalSchedule: every 10 seconds>]>

    2.创建一个无参数的周期性间隔任务

    >>>PeriodicTask.objects.create(interval=schedule,name=&#39;my_task2&#39;,task=&#39;blog.tasks.my_task2&#39;,)
    <PeriodicTask: my_task2: every 10 seconds>

    beat 调度服务日志显示如下:

    Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

     worker 服务日志显示如下:

    Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

    3.创建一个带参数的周期性间隔任务

    >>> PeriodicTask.objects.create(interval=schedule,name=&#39;my_task&#39;,task=&#39;blog.tasks.my_task&#39;,args=json.dumps([10,20,30]))
    <PeriodicTask: my_task: every 10 seconds>

    beat 调度服务日志结果:

    Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

     worker 服务日志结果:

    Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

    4.如何高并发执行任务

    需要并行执行任务的时候,就需要设置多个worker来执行任务。 

    6.2 创建一个不带参数的周期性间隔任务

    1.初始化 crontab 的调度对象

    >>> import pytz
    >>> schedule, _ = CrontabSchedule.objects.get_or_create(
    ... minute=&#39;*&#39;,
    ... hour=&#39;*&#39;,
    ... day_of_week=&#39;*&#39;,
    ... day_of_month=&#39;*&#39;,
    ... timezone=pytz.timezone(&#39;Asia/Shanghai&#39;)
    ... )

    2. 创建不带参数的定时任务

    PeriodicTask.objects.create(crontab=schedule,name=&#39;my_task2_crontab&#39;,task=&#39;blog.tasks.my_task2&#39;,)

    beat 调度服务执行结果 

    Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

     worker 执行服务结果

    Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

    6.3 周期性任务的查询、删除操作

    1. 周期性任务的查询

    >>> PeriodicTask.objects.all()
    <ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> for task in PeriodicTask.objects.all():
    ...     print(task.id)
    ... 
    1
    13
    >>> PeriodicTask.objects.get(id=13)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>

     控制台实际操作记录

    Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

    2.周期性任务的暂停/启动

    2.1 设置my_taks2_crontab 暂停任务

    >>> my_task2_crontab = PeriodicTask.objects.get(id=13)
    >>> my_task2_crontab.enabled
    True
    >>> my_task2_crontab.enabled=False
    >>> my_task2_crontab.save()

    查看worker输出:

    Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

     可以看到worker从19:31以后已经没有输出了,说明已经成功吧my_task2_crontab 任务暂停

    2.2 设置my_task2_crontab 开启任务

    把任务的 enabled 为 True 即可:

    >>> my_task2_crontab.enabled
    False
    >>> my_task2_crontab.enabled=True
    >>> my_task2_crontab.save()

    查看worker输出:

    Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik

     可以看到worker从19:36开始有输出,说明已把my_task2_crontab 任务重新启动

    3. 周期性任务的删除

    获取到指定的任务后调用delete(),再次查询指定任务会发现已经不存在了

    PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;).delete()
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    Traceback (most recent call last):
      File "<console>", line 1, in <module>
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method
        return getattr(self.get_queryset(), name)(*args, **kwargs)
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get
        raise self.model.DoesNotExist(
    django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.

    Atas ialah kandungan terperinci Cara menggunakan Python Celery untuk menambahkan tugas berjadual secara dinamik. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

    Kenyataan:
    Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam