首頁 >後端開發 >Python教學 >如何在 FastAPI 應用程式中執行後台任務?

如何在 FastAPI 應用程式中執行後台任務?

DDD
DDD原創
2024-12-05 06:52:10622瀏覽

How to Run Background Tasks in a FastAPI Application?

如何在FastAPI Python 後台運行線程

挑戰:將獨立的後台函數合併到FastAPI

使用構建FastAPI,可能需要合併一個與API 邏輯無關但需要在背景重複執行的函數。例如,這可能涉及檢查外部 API 並根據回應列印訊息。

選項 1:使用執行緒

一種方法是建立一個執行所需函數的執行緒。實作方法如下:

import threading

def start_worker():
    print('[main]: starting worker...')
    # Define and execute the worker function
    worker = my_worker.Worker()
    worker.working_loop()

if __name__ == '__main__':
    print('[main]: starting...')
    # Start the worker thread
    _worker_thread = Thread(target=start_worker, daemon=False)
    _worker_thread.start()

選項2:事件調度

另一種方法涉及使用事件調度程序來執行後台任務:

import sched, time
from threading import Thread

s = sched.scheduler(time.time, time.sleep)

def print_event(sc):
    print("Hello")
    sc.enter(5, 1, print_event, (sc,))

def start_scheduler():
    s.enter(5, 1, print_event, (s,))
    s.run()

@app.on_event("startup")
async def startup_event():
    thread = Thread(target=start_scheduler)
    thread.start()

選項3:帶有事件循環的非同步任務

如果任務是async def函數,可以將其加入使用asyncio.create_task() 函數的目前事件循環:

from fastapi import FastAPI
import asyncio

async def print_task(s):
    while True:
        print('Hello')
        await asyncio.sleep(s)

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Add the async task to the event loop
    asyncio.create_task(print_task(5))
    yield
    print('Shutting down...')

app = FastAPI(lifespan=lifespan)

以上是如何在 FastAPI 應用程式中執行後台任務?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn