Maison  >  Article  >  développement back-end  >  Comment implémenter des tâches planifiées et des tâches périodiques dans FastAPI

Comment implémenter des tâches planifiées et des tâches périodiques dans FastAPI

WBOY
WBOYoriginal
2023-07-30 15:53:123447parcourir

Comment implémenter des tâches planifiées et des tâches périodiques dans FastAPI

Introduction :
FastAPI est un framework Python moderne et hautement performant axé sur la création d'applications API. Cependant, nous devons parfois effectuer des tâches planifiées et des tâches périodiques dans les applications FastAPI. Cet article décrit comment implémenter ces tâches dans une application FastAPI et fournit des exemples de code correspondants.

1. Implémentation de tâches planifiées

  1. Utilisation de la bibliothèque APScheduler
    APScheduler est une puissante bibliothèque Python pour planifier et gérer les tâches planifiées. Il prend en charge plusieurs planificateurs de tâches, par exemple en fonction de la date, de l'intervalle de temps et de l'expression Cron. Voici les étapes pour utiliser APScheduler pour implémenter des tâches planifiées dans FastAPI :

    1. Installer la bibliothèque APScheduler : Exécutez la commande pip install apscheduler dans le terminal pour installer la bibliothèque APScheduler. pip install apscheduler来安装APScheduler库。
    2. 创建一个定时任务模块:在FastAPI应用程序的根目录下,创建一个名为tasks.py的文件,用于定义定时任务。
from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

@scheduler.scheduled_job('interval', seconds=10)
def job():
    print("This is a scheduled job")

scheduler.start()
  1. 注册定时任务模块:在FastAPI应用程序的主文件中,导入定时任务模块并注册为FastAPI应用程序的一个子应用。
from fastapi import FastAPI
from .tasks import scheduler

app = FastAPI()

app.mount("/tasks", scheduler.app)
  1. 使用Celery库
    Celery是一个强大的分布式任务队列库,支持异步和定时任务。以下是在FastAPI中使用Celery实现定时任务的步骤:

    1. 安装Celery库:在终端中运行命令pip install celery来安装Celery库。
    2. 创建一个定时任务模块:在FastAPI应用程序的根目录下,创建一个名为tasks.py
    3. Créer un module de tâches planifiées : Dans le répertoire racine de l'application FastAPI, créez un fichier nommé tasks.py pour définir les tâches planifiées.
    from celery import Celery
    
    app = Celery('tasks', broker='redis://localhost:6379')
    
    @app.task
    def job():
        print("This is a scheduled job")
  1. Enregistrez le module de tâches planifiées : Dans le fichier principal de l'application FastAPI, importez le module de tâches planifiées et enregistrez-le en tant que sous-application de l'application FastAPI.

from fastapi import FastAPI
from .tasks import app as celery_app

app = FastAPI()

app.mount("/tasks", celery_app)

  1. Utilisation de la bibliothèque Celery

    Celery est une puissante bibliothèque de files d'attente de tâches distribuées qui prend en charge les tâches asynchrones et planifiées. Voici les étapes pour utiliser Celery pour implémenter des tâches planifiées dans FastAPI :

      Installer la bibliothèque Celery : Exécutez la commande pip install celery dans le terminal pour installer la bibliothèque Celery.
    1. Créer un module de tâches planifiées : Dans le répertoire racine de l'application FastAPI, créez un fichier nommé tasks.py pour définir les tâches planifiées.
  2. from apscheduler.triggers.cron import CronTrigger
    
    scheduler = BackgroundScheduler()
    
    @scheduler.scheduled_job(CronTrigger.from_crontab('* * * * *'))
    def job():
        print("This is a periodic job")
    
    scheduler.start()
    Enregistrez le module de tâches planifiées : Dans le fichier principal de l'application FastAPI, importez le module de tâches planifiées et enregistrez-le en tant que sous-application de l'application FastAPI.
  1. from celery import Celery
    from celery.schedules import crontab
    
    app = Celery('tasks', broker='redis://localhost:6379')
    
    @app.task
    def job():
        print("This is a periodic job")
    
    app.conf.beat_schedule = {
        'job': {
            'task': 'tasks.job',
            'schedule': crontab(minute='*'),
        },
    }

    2. Mise en œuvre de tâches périodiques

    1. Utiliser la bibliothèque APScheduler
    2. La bibliothèque APScheduler prend également en charge la planification de tâches périodiques. Voici les étapes pour utiliser APScheduler pour implémenter des tâches périodiques dans une application FastAPI :
  2. Installer la bibliothèque APScheduler : Reportez-vous à l'étape 1 de l'article précédent.

Créer un module de tâches périodiques : reportez-vous à l'étape 2 de l'article précédent.

rrreee
  • Utilisation de la bibliothèque Celery
  • La bibliothèque Celery prend également en charge la planification de tâches périodiques. Voici les étapes pour implémenter des tâches périodiques à l'aide de Celery dans une application FastAPI :

Installez la bibliothèque Celery : reportez-vous à l'étape 1 de l'article précédent.

🎜Créer un module de tâches périodiques : reportez-vous à l'étape 2 de l'article précédent. 🎜🎜🎜🎜rrreee🎜Conclusion : 🎜En utilisant APScheduler ou la bibliothèque Celery, nous pouvons facilement implémenter des tâches planifiées et des tâches périodiques dans les applications FastAPI. Les exemples de code fournis ci-dessus peuvent être utilisés comme référence pour vous aider à implémenter rapidement ces fonctions de tâche dans votre projet FastAPI. Bien que les exemples ci-dessus soient simples, vous pouvez étendre et personnaliser davantage votre propre logique de tâches en fonction de ces exemples. 🎜🎜Matériaux de référence : 🎜🎜🎜Documentation officielle APScheduler : https://apscheduler.readthedocs.io/🎜🎜Documentation officielle Celery : https://docs.celeryproject.org/🎜🎜🎜 (Cet article est à titre de référence uniquement, veuillez basez-le sur la situation réelle (Ajustez et modifiez en conséquence si nécessaire)🎜.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn