ホームページ >バックエンド開発 >Python チュートリアル >FastAPI アプリケーションでバックグラウンド タスクを実行するにはどうすればよいですか?
を使用して Python サーバーを構築する場合FastAPI では、API ロジックとは無関係だがバックグラウンドで繰り返し実行する必要がある関数を組み込むことが必要になる場合があります。たとえば、これには、外部 API のチェックと、応答に基づく情報の出力が含まれる可能性があります。
1 つのアプローチは、目的の関数を実行するスレッドを作成することです。これを実現する方法は次のとおりです:
import threading def start_worker(): print('[main]: starting worker...') # Define and execute the worker function worker = my_worker.Worker() worker.working_loop() if __name__ == '__main__': print('[main]: starting...') # Start the worker thread _worker_thread = Thread(target=start_worker, daemon=False) _worker_thread.start()
別の方法では、バックグラウンド タスクにイベント スケジューラを使用します:
import sched, time from threading import Thread s = sched.scheduler(time.time, time.sleep) def print_event(sc): print("Hello") sc.enter(5, 1, print_event, (sc,)) def start_scheduler(): s.enter(5, 1, print_event, (s,)) s.run() @app.on_event("startup") async def startup_event(): thread = Thread(target=start_scheduler) thread.start()
タスクが async def 関数の場合、 asyncio.create_task() 関数を使用して現在のイベント ループに追加できます:
from fastapi import FastAPI import asyncio async def print_task(s): while True: print('Hello') await asyncio.sleep(s) @asynccontextmanager async def lifespan(app: FastAPI): # Add the async task to the event loop asyncio.create_task(print_task(5)) yield print('Shutting down...') app = FastAPI(lifespan=lifespan)
以上がFastAPI アプリケーションでバックグラウンド タスクを実行するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。