首頁  >  文章  >  後端開發  >  如何在FastAPI中實現定時任務和週期性任務

如何在FastAPI中實現定時任務和週期性任務

WBOY
WBOY原創
2023-07-30 15:53:123550瀏覽

如何在FastAPI中實現定時任務和週期性任務

引言:
FastAPI是一個現代化的、高度效能的Python框架,專注於建立API應用程式。然而,有時我們需要在FastAPI應用程式中執行定時任務和週期性任務。本文將介紹如何在FastAPI應用程式中實作這些任務,並提供對應的程式碼範例。

一、定時任務的實作

  1. 使用APScheduler函式庫
    APScheduler是一個功能強大的Python函式庫,用於排程和管理定時任務。它支援多種任務調度器,如基於日期、時間間隔和Cron表達式等。以下是在FastAPI中使用APScheduler實作定時任務的步驟:

    1. 安裝APScheduler函式庫:在終端機中執行指令pip install apscheduler來安裝APScheduler函式庫。
    2. 建立一個定時任務模組:在FastAPI應用程式的根目錄下,建立一個名為tasks.py的文件,用於定義定時任務。
from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

@scheduler.scheduled_job('interval', seconds=10)
def job():
    print("This is a scheduled job")

scheduler.start()
  1. 註冊定時任務模組:在FastAPI應用程式的主檔案中,匯入定時任務模組並註冊為FastAPI應用程式的子應用程式。
from fastapi import FastAPI
from .tasks import scheduler

app = FastAPI()

app.mount("/tasks", scheduler.app)
  1. 使用Celery庫
    Celery是一個強大的分散式任務佇列庫,支援非同步和定時任務。以下是在FastAPI中使用Celery實作定時任務的步驟:

    1. 安裝Celery函式庫:在終端機中執行指令pip install celery來安裝Celery函式庫。
    2. 建立一個定時任務模組:在FastAPI應用程式的根目錄下,建立一個名為tasks.py的文件,用於定義定時任務。
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def job():
    print("This is a scheduled job")
  1. 註冊定時任務模組:在FastAPI應用程式的主檔案中,匯入定時任務模組並註冊為FastAPI應用程式的子應用程式。
from fastapi import FastAPI
from .tasks import app as celery_app

app = FastAPI()

app.mount("/tasks", celery_app)

二、週期性任務的實作

  1. 使用APScheduler函式庫
    APScheduler函式庫同樣支援週期性任務的排程。以下是在FastAPI應用程式中使用APScheduler實作週期性任務的步驟:

    1. 安裝APScheduler函式庫:參考前文中的步驟1。
    2. 建立一個週期性任務模組:參考前文中的步驟2。
from apscheduler.triggers.cron import CronTrigger

scheduler = BackgroundScheduler()

@scheduler.scheduled_job(CronTrigger.from_crontab('* * * * *'))
def job():
    print("This is a periodic job")

scheduler.start()
  1. 使用Celery函式庫
    Celery函式庫同樣支援週期性任務的排程。以下是在FastAPI應用程式中使用Celery實作週期性任務的步驟:

    1. 安裝Celery函式庫:參考前文中的步驟1。
    2. 建立一個週期性任務模組:參考前文中的步驟2。
from celery import Celery
from celery.schedules import crontab

app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def job():
    print("This is a periodic job")

app.conf.beat_schedule = {
    'job': {
        'task': 'tasks.job',
        'schedule': crontab(minute='*'),
    },
}

結論:
透過使用APScheduler或Celery函式庫,我們可以輕鬆地在FastAPI應用程式中實現定時任務和週期性任務。以上提供的程式碼範例可以作為參考,幫助您在FastAPI專案中快速實現這些任務功能。儘管以上是簡單的範例,但基於這些範例,您可以進一步擴展和自訂自己的任務邏輯。

參考資料:

  • APScheduler官方文件:https://apscheduler.readthedocs.io/
  • Celery官方文件:https://docs.celeryproject. org/

(本文僅供參考,請依實際需求進行相應調整和修改。)

以上是如何在FastAPI中實現定時任務和週期性任務的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn