首页 >后端开发 >Python教程 >如何在 FastAPI 应用程序中运行后台任务?

如何在 FastAPI 应用程序中运行后台任务?

DDD
DDD原创
2024-12-05 06:52:10631浏览

How to Run Background Tasks in a FastAPI Application?

如何在 FastAPI Python 后台运行线程

挑战:将独立的后台函数合并到 FastAPI

使用构建 Python 服务器时FastAPI,可能需要合并一个与 API 逻辑无关但需要在后台重复运行的函数。例如,这可能涉及检查外部 API 并根据响应打印信息。

选项 1:使用线程

一种方法是创建一个执行所需函数的线程。实现方法如下:

import threading

def start_worker():
    print('[main]: starting worker...')
    # Define and execute the worker function
    worker = my_worker.Worker()
    worker.working_loop()

if __name__ == '__main__':
    print('[main]: starting...')
    # Start the worker thread
    _worker_thread = Thread(target=start_worker, daemon=False)
    _worker_thread.start()

选项 2:事件调度

另一种方法涉及使用事件调度程序来执行后台任务:

import sched, time
from threading import Thread

s = sched.scheduler(time.time, time.sleep)

def print_event(sc):
    print("Hello")
    sc.enter(5, 1, print_event, (sc,))

def start_scheduler():
    s.enter(5, 1, print_event, (s,))
    s.run()

@app.on_event("startup")
async def startup_event():
    thread = Thread(target=start_scheduler)
    thread.start()

选项3:带有事件循环的异步任务

如果任务是async def函数,可以将其添加到使用 asyncio.create_task() 函数的当前事件循环:

from fastapi import FastAPI
import asyncio

async def print_task(s):
    while True:
        print('Hello')
        await asyncio.sleep(s)

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Add the async task to the event loop
    asyncio.create_task(print_task(5))
    yield
    print('Shutting down...')

app = FastAPI(lifespan=lifespan)

以上是如何在 FastAPI 应用程序中运行后台任务?的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn