当使用标准 Python 请求库在 FastAPI 中发出 HTTP 请求时,线程安全成为并发请求中的一个问题。要有效解决此问题,请考虑使用 httpx,这是一个既提供线程安全性又提高性能的库。
httpx 附带异步 API,使您可以轻松地HTTP 请求同时高效处理多个并发任务。以下是它在 FastAPI 端点中的使用示例:
from httpx import AsyncClient from fastapi import FastAPI, Request app = FastAPI() @app.on_event("startup") async def startup_event(): app.state.client = AsyncClient() @app.on_event('shutdown') async def shutdown_event(): await app.state.client.aclose() @app.get('/') async def home(request: Request): client = request.state.client req = client.build_request('GET', 'https://www.example.com') r = await client.send(req, stream=True) return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))
在此示例中:
如果不需要使用 async def 定义端点,则有必要选择 httpx 的同步 API。这种方法维护了线程安全并简化了端点实现:
from httpx import Client from fastapi import FastAPI, Request app = FastAPI() @app.on_event("startup") def startup_event(): app.state.client = Client() @app.on_event('shutdown') async def shutdown_event(): await app.state.client.aclose() @app.get('/') def home(request: Request): client = request.state.client req = client.build_request('GET', 'https://www.example.com') try: r = client.send(req) content_type = r.headers.get('content-type') except Exception as e: content_type = 'text/plain' e = str(e) if content_type == 'application/json': return r.json() elif content_type == 'text/plain': return PlainTextResponse(content=r.text) else: return Response(content=r.content)
在此示例中,同步 API 在 try/ except 块中处理 HTTP 请求,从而允许优雅地处理请求期间可能出现的任何异常。
通过利用 httpx 及其功能,您可以可以自信地在 FastAPI 中发出下游 HTTP 请求,无缝处理多个并发任务并确保应用程序稳定性。
以上是httpx 如何增强 FastAPI 中安全高效的下游 HTTP 请求?的详细内容。更多信息请关注PHP中文网其他相关文章!