FastAPI StreamingResponse가 생성기 기능으로 스트리밍되지 않음
문제:
FastAPI 애플리케이션이 실패함 StreamingResponse를 사용하여 생성기 함수에서 응답을 스트리밍하면 전체 응답이 전체적으로 전송됩니다.
답변:
StreamingResponse를 사용할 때 고려해야 할 몇 가지 요소가 있습니다. 생성기 기능 포함:
1. HTTP 요청 유형:
제공된 코드는 서버에서 데이터를 얻는 데 적합하지 않은 POST 요청을 사용합니다. 데이터를 가져오려면 대신 GET 요청을 사용하세요.
2. 자격 증명 처리:
보안상의 이유로 URL 쿼리 문자열을 통해 자격 증명(예: 'auth_key')을 보내지 마십시오. 대신 헤더나 쿠키를 사용하세요.
3. 생성기 함수 구문:
StreamingResponse의 생성기 함수 내에서 차단 작업을 실행하면 안 됩니다. FastAPI는 스레드 풀을 사용하여 차단 작업을 관리하므로 생성기 기능에 async def 대신 def를 사용합니다.
4. 반복자 사용법:
테스트 코드에서 요청.iter_lines()는 한 번에 한 줄씩 응답 데이터를 반복합니다. 응답에 줄바꿈이 포함되지 않은 경우 iter_content()를 사용하고 청크 크기를 지정하여 잠재적인 버퍼링 문제를 방지하세요.
5. 미디어 유형:
브라우저는 media_type='text/plain'으로 응답을 버퍼링할 수 있습니다. 이를 방지하려면 media_type='text/event-stream'을 설정하거나 응답 헤더에서 X-Content-Type-Options: nosniff를 사용하여 MIME 스니핑을 비활성화하세요.
작업 예:
다음은 위에서 언급한 문제를 해결하는 app.py 및 test.py의 실제 예제입니다.
# app.py from fastapi import FastAPI, StreamingResponse import asyncio app = FastAPI() async def fake_data_streamer(): for i in range(10): yield b'some fake data\n\n' await asyncio.sleep(0.5) @app.get('/') async def main(): headers = {'X-Content-Type-Options': 'nosniff'} # Disable MIME Sniffing return StreamingResponse(fake_data_streamer(), media_type='text/event-stream', headers=headers) # test.py (using httpx) import httpx url = 'http://localhost:8000/' with httpx.stream('GET', url) as r: for chunk in r.iter_content(1024): print(chunk)
위 내용은 FastAPI StreamingResponse가 생성기 기능으로 스트리밍되지 않는 이유는 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!