Home >Backend Development >Python Tutorial >How to Run Background Tasks in a FastAPI Application?
When constructing a Python server using FastAPI, it can be necessary to incorporate a function that is unrelated to the API logic but needs to run in the background on a recurring basis. For instance, this could involve checking external APIs and printing information based on the responses.
One approach is to create a thread that executes the desired function. Here's how this can be achieved:
import threading def start_worker(): print('[main]: starting worker...') # Define and execute the worker function worker = my_worker.Worker() worker.working_loop() if __name__ == '__main__': print('[main]: starting...') # Start the worker thread _worker_thread = Thread(target=start_worker, daemon=False) _worker_thread.start()
An alternative method involves using an event scheduler for the background task:
import sched, time from threading import Thread s = sched.scheduler(time.time, time.sleep) def print_event(sc): print("Hello") sc.enter(5, 1, print_event, (sc,)) def start_scheduler(): s.enter(5, 1, print_event, (s,)) s.run() @app.on_event("startup") async def startup_event(): thread = Thread(target=start_scheduler) thread.start()
If the task is an async def function, it can be added to the current event loop using the asyncio.create_task() function:
from fastapi import FastAPI import asyncio async def print_task(s): while True: print('Hello') await asyncio.sleep(s) @asynccontextmanager async def lifespan(app: FastAPI): # Add the async task to the event loop asyncio.create_task(print_task(5)) yield print('Shutting down...') app = FastAPI(lifespan=lifespan)
The above is the detailed content of How to Run Background Tasks in a FastAPI Application?. For more information, please follow other related articles on the PHP Chinese website!