首頁 >後端開發 >Python教學 >使用 FastAPI 掌握 Python 非同步 IO

使用 FastAPI 掌握 Python 非同步 IO

Barbara Streisand
Barbara Streisand原創
2025-01-04 19:07:41652瀏覽

Mastering Python Async IO with FastAPI

由於Python是解釋型語言,當用於後端開發時,例如與Python Django結合時,相對於Java Spring,其回應時間會長一些。不過,只要程式碼合理,差別並不會太大。即使Django使用多進程模式,其並發處理能力仍然弱得多。 Python有一些提高並發處理能力的解決方案。例如,使用非同步框架FastAPI,憑藉其非同步能力,可以大幅增強I/O密集型任務的並發處理能力。 FastAPI 是最快的 Python 框架之一。

FastAPI 為例

我們先簡單了解如何使用FastAPI。

範例1:預設網路異步IO

安裝

pip install fastapi

簡單的伺服器端程式碼:

# app.py
from typing import Union

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def read_root():
    return {"Hello": "World"}

啟動

uvicorn app:app --reload

我們可以看到,與其他框架相比,FastAPI的介面只多了一個async關鍵字。 async 關鍵字將介面定義為非同步。僅從回傳結果來看,我們無法看出FastAPI與其他Python框架的差異。區別在於並發訪問。 FastAPI的伺服器執行緒在處理路由請求時,如http://127.0.0.1:8000/,如果遇到網路I/O,將不再等待,而是處理其他請求。當網路 I/O 完成時,執行將恢復。這種非同步能力提高了 I/O 密集型任務的處理能力。

範例2:顯式網路異步IO

讓我們來看另一個例子。在業務代碼中,發起明確​​的非同步網路請求。對於這個網路I/O,就像路由請求一樣,FastAPI也會非同步處理。

# app.py
from fastapi import FastAPI, HTTPException
import httpx

app = FastAPI()

# Example of an asynchronous GET request
@app.get("/external-api")
async def call_external_api():
    url = "https://leapcell.io"
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        if response.status_code!= 200:
            raise HTTPException(status_code=response.status_code, detail="Failed to fetch data")
        return response.json()

如果希望資料庫I/O是非同步的,需要資料庫驅動或ORM非同步操作的支援。

異步IO

FastAPI非同步的核心實作是非同步I/O。我們可以直接使用非同步I/O來啟動一個具有非同步處理能力的伺服器,而不需要使用FastAPI。

import asyncio

from aiohttp import web

async def index(request):
    await asyncio.sleep(1)  # Simulate I/O operation
    return web.Response(text='{"Hello": "World"}', content_type='application/json')

async def init(loop):
    # Use the event loop to monitor web requests
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', index)
    # Start the server, and the event loop monitors and processes web requests
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
    print('Server started at http://127.0.0.1:8000...')
    return srv

# Explicitly get an event loop
loop = asyncio.get_event_loop()
# Start the event loop
loop.run_until_complete(init(loop))
loop.run_forever()

本範例啟動時,http://127.0.0.1:8000/的回傳結果與範例1相同。非同步I/O的底層實作原理是「協程」與「事件循環」 .

協程

pip install fastapi

函數索引是用 async def 定義的,這表示它是一個協程。 await 關鍵字用在 I/O 操作之前,告訴執行緒不要等待本次 I/O 操作。普通函數的呼叫是透過堆疊來實現的,函數只能一個一個地呼叫和執行。然而,協程是一種特殊的函數(不是協作線程)。它允許線程在等待標記處暫停執行並切換到執行其他任務。當I/O操作完成後,會繼續執行。

我們來看看多個協程並發執行的效果。

# app.py
from typing import Union

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def read_root():
    return {"Hello": "World"}

輸出:

uvicorn app:app --reload

我們可以看到執行緒並沒有一一執行這三個任務。當它遇到I/O操作時,它會切換到執行其他任務。 I/O操作完成後繼續執行。也可以看出,三個協程基本上同時開始等待I/O操作,所以最終的執行完成時間基本上相同。雖然這裡沒有明確使用事件循環,但 asyncio.run 會隱式使用它。

發電機

協程是透過生成器實現的。生成器可以暫停函數的執行,也可以恢復函數的執行,這是協程的特性。

# app.py
from fastapi import FastAPI, HTTPException
import httpx

app = FastAPI()

# Example of an asynchronous GET request
@app.get("/external-api")
async def call_external_api():
    url = "https://leapcell.io"
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        if response.status_code!= 200:
            raise HTTPException(status_code=response.status_code, detail="Failed to fetch data")
        return response.json()

用next()運行生成器時,遇到yield時會暫停。當 next() 再次運行時,它將從上次暫停的地方繼續運行。在Python 3.5之前,協程也是用「註釋」寫的。從Python 3.5開始,使用async def wait。

import asyncio

from aiohttp import web

async def index(request):
    await asyncio.sleep(1)  # Simulate I/O operation
    return web.Response(text='{"Hello": "World"}', content_type='application/json')

async def init(loop):
    # Use the event loop to monitor web requests
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', index)
    # Start the server, and the event loop monitors and processes web requests
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
    print('Server started at http://127.0.0.1:8000...')
    return srv

# Explicitly get an event loop
loop = asyncio.get_event_loop()
# Start the event loop
loop.run_until_complete(init(loop))
loop.run_forever()

產生器的暫停和恢復功能除了協程之外還可以用於許多事情。例如,它可以循環計算和儲存演算法。例如,實現一個帕斯卡三角形(每行兩端都是1,其他位置的數字是它上面兩個數字的和)。

async def index(request):
    await asyncio.sleep(1)  # Simulate I/O operation
    return web.Response(text='{"Hello": "World"}', content_type='application/json')

輸出:

import asyncio
from datetime import datetime

async def coroutine3():
    print(f"Coroutine 3 started at {datetime.now()}")
    await asyncio.sleep(1)  # Simulate I/O operation
    print(f"Coroutine 3 finished at {datetime.now()}")

async def coroutine2():
    print(f"Coroutine 2 started at {datetime.now()}")
    await asyncio.sleep(1)  # Simulate I/O operation
    print(f"Coroutine 2 finished at {datetime.now()}")

async def coroutine1():
    print(f"Coroutine 1 started at {datetime.now()}")
    await asyncio.sleep(1)  # Simulate I/O operation
    print(f"Coroutine 1 finished at {datetime.now()}")

async def main():
    print("Main started")

    # Create tasks to make coroutines execute concurrently
    task1 = asyncio.create_task(coroutine1())
    task2 = asyncio.create_task(coroutine2())
    task3 = asyncio.create_task(coroutine3())

    # Wait for all tasks to complete
    await task1
    await task2
    await task3

    print("Main finished")

# Run the main coroutine
asyncio.run(main())

事件循環

既然協程執行可以暫停,那麼協程什麼時候會恢復執行呢?這就需要使用事件循環來告訴執行緒。

Main started
Coroutine 1 started at 2024-12-27 12:28:01.661251
Coroutine 2 started at 2024-12-27 12:28:01.661276
Coroutine 3 started at 2024-12-27 12:28:01.665012
Coroutine 1 finished at 2024-12-27 12:28:02.665125
Coroutine 2 finished at 2024-12-27 12:28:02.665120
Coroutine 3 finished at 2024-12-27 12:28:02.665120
Main finished

事件循環使用I/O復用技術,不斷循環監聽協程可以繼續執行的事件。當它們可以執行時,執行緒將繼續執行協程。

I/O復用技術

簡單理解I/O復用:我是一個快遞站的老闆。我不需要主動詢問每個快遞員的任務完成情況。相反,快遞員完成任務後會自行來找我。這提高了我的任務處理能力,我可以做更多的事情。

Mastering Python Async IO with FastAPI

select、poll、epoll都可以實作I/O重複使用。與select和poll相比,epoll具有更好的性能。 Linux一般預設使用epoll,macOS使用kqueue,與epoll類似,效能也差不多。

使用事件循環的套接字伺服器

pip install fastapi

啟動伺服器socket來監聽指定連接埠。如果運行在Linux系統上,選擇器預設使用epoll作為其實作。程式碼中使用epoll來註冊一個請求接收事件(accept事件)。當新的請求到來時,epoll會觸發並執行事件處理函數,同時註冊一個讀取事件(read event)來處理和回應請求資料。從Web端透過http://127.0.0.1:8000/訪問,傳回結果與範例1相同。伺服器運行日誌:

# app.py
from typing import Union

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def read_root():
    return {"Hello": "World"}

套接字伺服器

直接使用Socket啟動伺服器。使用瀏覽器造訪http://127.0.0.1:8080/或使用curl http://127.0.0.1:8080/造訪時,會回傳{"Hello": "World"}

uvicorn app:app --reload

使用curl http://127.0.0.1:8001/造訪時,伺服器執行日誌:

# app.py
from fastapi import FastAPI, HTTPException
import httpx

app = FastAPI()

# Example of an asynchronous GET request
@app.get("/external-api")
async def call_external_api():
    url = "https://leapcell.io"
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        if response.status_code!= 200:
            raise HTTPException(status_code=response.status_code, detail="Failed to fetch data")
        return response.json()

概括

非同步I/O在底層使用「協程」和「事件循環」實作。 「協程」確保當執行緒在執行過程中遇到標記的 I/O 操作時,不必等待 I/O 完成而是可以暫停並讓執行緒執行其他任務而不會阻塞。 「事件循環」使用I/O復用技術,不斷循環監視I/O事件。當某個I/O事件完成時,會觸發對應的回調,讓協程繼續執行。


Leapcell:FastAPI 和其他 Python 應用程式的理想平台:

最後介紹一下部署Flask/FastAPI的理想平台:Leapcell。

Leapcell是專為現代分散式應用程式設計的雲端運算平台。其按需付費的定價模式確保沒有閒置成本,這意味著用戶只需為他們實際使用的資源付費。

Mastering Python Async IO with FastAPI

Leapcell對於WSGI/ASGI應用的獨特優點:

1. 多語言支持

  • 支援 JavaScript、Python、Go 或 Rust 開發。

2. 無限項目免費部署

  • 僅依使用情況收費。沒有要求時不收費。

3. 無與倫比的成本效益

  • 即用即付,無閒置費用。
  • 例如,25 美元可以支援 694 萬個請求,平均回應時間為 60 毫秒。

4. 簡化的開發者體驗

  • 直覺的使用者介面,易於設定。
  • 完全自動化的 CI/CD 管道和 GitOps 整合。
  • 即時指標和日誌,提供可操作的見解。

5. 輕鬆的可擴充性和高效能

  • 自動伸縮,輕鬆應付高併發。
  • 零營運開銷,讓開發者專注於開發。

在文件中了解更多!

Leapcell Twitter:https://x.com/LeapcellHQ

以上是使用 FastAPI 掌握 Python 非同步 IO的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn