如何在FastAPI中實現定時任務和週期性任務
引言:
FastAPI是一個現代化的、高度效能的Python框架,專注於建立API應用程式。然而,有時我們需要在FastAPI應用程式中執行定時任務和週期性任務。本文將介紹如何在FastAPI應用程式中實作這些任務,並提供對應的程式碼範例。
一、定時任務的實作
使用APScheduler函式庫
APScheduler是一個功能強大的Python函式庫,用於排程和管理定時任務。它支援多種任務調度器,如基於日期、時間間隔和Cron表達式等。以下是在FastAPI中使用APScheduler實作定時任務的步驟:
pip install apscheduler
來安裝APScheduler函式庫。 tasks.py
的文件,用於定義定時任務。 from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() @scheduler.scheduled_job('interval', seconds=10) def job(): print("This is a scheduled job") scheduler.start()
from fastapi import FastAPI from .tasks import scheduler app = FastAPI() app.mount("/tasks", scheduler.app)
使用Celery庫
Celery是一個強大的分散式任務佇列庫,支援非同步和定時任務。以下是在FastAPI中使用Celery實作定時任務的步驟:
pip install celery
來安裝Celery函式庫。 tasks.py
的文件,用於定義定時任務。 from celery import Celery app = Celery('tasks', broker='redis://localhost:6379') @app.task def job(): print("This is a scheduled job")
from fastapi import FastAPI from .tasks import app as celery_app app = FastAPI() app.mount("/tasks", celery_app)
二、週期性任務的實作
使用APScheduler函式庫
APScheduler函式庫同樣支援週期性任務的排程。以下是在FastAPI應用程式中使用APScheduler實作週期性任務的步驟:
from apscheduler.triggers.cron import CronTrigger scheduler = BackgroundScheduler() @scheduler.scheduled_job(CronTrigger.from_crontab('* * * * *')) def job(): print("This is a periodic job") scheduler.start()
使用Celery函式庫
Celery函式庫同樣支援週期性任務的排程。以下是在FastAPI應用程式中使用Celery實作週期性任務的步驟:
from celery import Celery from celery.schedules import crontab app = Celery('tasks', broker='redis://localhost:6379') @app.task def job(): print("This is a periodic job") app.conf.beat_schedule = { 'job': { 'task': 'tasks.job', 'schedule': crontab(minute='*'), }, }
結論:
透過使用APScheduler或Celery函式庫,我們可以輕鬆地在FastAPI應用程式中實現定時任務和週期性任務。以上提供的程式碼範例可以作為參考,幫助您在FastAPI專案中快速實現這些任務功能。儘管以上是簡單的範例,但基於這些範例,您可以進一步擴展和自訂自己的任務邏輯。
參考資料:
(本文僅供參考,請依實際需求進行相應調整和修改。)
以上是如何在FastAPI中實現定時任務和週期性任務的詳細內容。更多資訊請關注PHP中文網其他相關文章!