FastAPI StreamingResponse 未使用生成器函数进行流式处理
问题:
FastAPI 应用程序无法使用 StreamingResponse 从生成器函数流式传输响应,从而导致整个响应作为一个整体发送。
答案:
使用 StreamingResponse 时需要考虑几个因素使用生成器函数:
1。 HTTP请求类型:
提供的代码使用POST请求,不适合从服务器获取数据。使用 GET 请求来获取数据。
2.凭据处理:
出于安全原因,请避免通过 URL 查询字符串发送凭据(例如“auth_key”)。请改用标头或 cookie。
3.生成器函数语法:
阻塞操作不应在 StreamingResponse 的生成器函数内执行。对于生成器函数,请使用 def 而不是 async def,因为 FastAPI 使用线程池来管理阻塞操作。
4.迭代器用法:
在您的测试代码中,requests.iter_lines() 一次一行地迭代响应数据。如果响应不包含换行符,请使用 iter_content() 并指定块大小以避免潜在的缓冲问题。
5.媒体类型:
浏览器可能会缓冲 media_type='text/plain' 的响应。为了避免这种情况,请设置 media_type='text/event-stream' 或在响应标头中使用 X-Content-Type-Options: nosniff 禁用 MIME 嗅探。
工作示例:
这是 app.py 和 test.py 中的一个工作示例,它解决了上述问题:
# app.py from fastapi import FastAPI, StreamingResponse import asyncio app = FastAPI() async def fake_data_streamer(): for i in range(10): yield b'some fake data\n\n' await asyncio.sleep(0.5) @app.get('/') async def main(): headers = {'X-Content-Type-Options': 'nosniff'} # Disable MIME Sniffing return StreamingResponse(fake_data_streamer(), media_type='text/event-stream', headers=headers) # test.py (using httpx) import httpx url = 'http://localhost:8000/' with httpx.stream('GET', url) as r: for chunk in r.iter_content(1024): print(chunk)
以上是为什么我的 FastAPI StreamingResponse 不使用生成器函数进行流式传输?的详细内容。更多信息请关注PHP中文网其他相关文章!