由於Python是解釋型語言,當用於後端開發時,例如與Python Django結合時,相對於Java Spring,其回應時間會長一些。不過,只要程式碼合理,差別並不會太大。即使Django使用多進程模式,其並發處理能力仍然弱得多。 Python有一些提高並發處理能力的解決方案。例如,使用非同步框架FastAPI,憑藉其非同步能力,可以大幅增強I/O密集型任務的並發處理能力。 FastAPI 是最快的 Python 框架之一。
我們先簡單了解如何使用FastAPI。
安裝:
pip install fastapi
簡單的伺服器端程式碼:
# app.py from typing import Union from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
啟動:
uvicorn app:app --reload
我們可以看到,與其他框架相比,FastAPI的介面只多了一個async關鍵字。 async 關鍵字將介面定義為非同步。僅從回傳結果來看,我們無法看出FastAPI與其他Python框架的差異。區別在於並發訪問。 FastAPI的伺服器執行緒在處理路由請求時,如http://127.0.0.1:8000/,如果遇到網路I/O,將不再等待,而是處理其他請求。當網路 I/O 完成時,執行將恢復。這種非同步能力提高了 I/O 密集型任務的處理能力。
讓我們來看另一個例子。在業務代碼中,發起明確的非同步網路請求。對於這個網路I/O,就像路由請求一樣,FastAPI也會非同步處理。
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # Example of an asynchronous GET request @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code!= 200: raise HTTPException(status_code=response.status_code, detail="Failed to fetch data") return response.json()
如果希望資料庫I/O是非同步的,需要資料庫驅動或ORM非同步操作的支援。
FastAPI非同步的核心實作是非同步I/O。我們可以直接使用非同步I/O來啟動一個具有非同步處理能力的伺服器,而不需要使用FastAPI。
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json') async def init(loop): # Use the event loop to monitor web requests app = web.Application(loop=loop) app.router.add_route('GET', '/', index) # Start the server, and the event loop monitors and processes web requests srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv # Explicitly get an event loop loop = asyncio.get_event_loop() # Start the event loop loop.run_until_complete(init(loop)) loop.run_forever()
本範例啟動時,http://127.0.0.1:8000/的回傳結果與範例1相同。非同步I/O的底層實作原理是「協程」與「事件循環」 .
pip install fastapi
函數索引是用 async def 定義的,這表示它是一個協程。 await 關鍵字用在 I/O 操作之前,告訴執行緒不要等待本次 I/O 操作。普通函數的呼叫是透過堆疊來實現的,函數只能一個一個地呼叫和執行。然而,協程是一種特殊的函數(不是協作線程)。它允許線程在等待標記處暫停執行並切換到執行其他任務。當I/O操作完成後,會繼續執行。
我們來看看多個協程並發執行的效果。
# app.py from typing import Union from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
輸出:
uvicorn app:app --reload
我們可以看到執行緒並沒有一一執行這三個任務。當它遇到I/O操作時,它會切換到執行其他任務。 I/O操作完成後繼續執行。也可以看出,三個協程基本上同時開始等待I/O操作,所以最終的執行完成時間基本上相同。雖然這裡沒有明確使用事件循環,但 asyncio.run 會隱式使用它。
協程是透過生成器實現的。生成器可以暫停函數的執行,也可以恢復函數的執行,這是協程的特性。
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # Example of an asynchronous GET request @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code!= 200: raise HTTPException(status_code=response.status_code, detail="Failed to fetch data") return response.json()
用next()運行生成器時,遇到yield時會暫停。當 next() 再次運行時,它將從上次暫停的地方繼續運行。在Python 3.5之前,協程也是用「註釋」寫的。從Python 3.5開始,使用async def wait。
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json') async def init(loop): # Use the event loop to monitor web requests app = web.Application(loop=loop) app.router.add_route('GET', '/', index) # Start the server, and the event loop monitors and processes web requests srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv # Explicitly get an event loop loop = asyncio.get_event_loop() # Start the event loop loop.run_until_complete(init(loop)) loop.run_forever()
產生器的暫停和恢復功能除了協程之外還可以用於許多事情。例如,它可以循環計算和儲存演算法。例如,實現一個帕斯卡三角形(每行兩端都是1,其他位置的數字是它上面兩個數字的和)。
async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json')
輸出:
import asyncio from datetime import datetime async def coroutine3(): print(f"Coroutine 3 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 3 finished at {datetime.now()}") async def coroutine2(): print(f"Coroutine 2 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 2 finished at {datetime.now()}") async def coroutine1(): print(f"Coroutine 1 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 1 finished at {datetime.now()}") async def main(): print("Main started") # Create tasks to make coroutines execute concurrently task1 = asyncio.create_task(coroutine1()) task2 = asyncio.create_task(coroutine2()) task3 = asyncio.create_task(coroutine3()) # Wait for all tasks to complete await task1 await task2 await task3 print("Main finished") # Run the main coroutine asyncio.run(main())
既然協程執行可以暫停,那麼協程什麼時候會恢復執行呢?這就需要使用事件循環來告訴執行緒。
Main started Coroutine 1 started at 2024-12-27 12:28:01.661251 Coroutine 2 started at 2024-12-27 12:28:01.661276 Coroutine 3 started at 2024-12-27 12:28:01.665012 Coroutine 1 finished at 2024-12-27 12:28:02.665125 Coroutine 2 finished at 2024-12-27 12:28:02.665120 Coroutine 3 finished at 2024-12-27 12:28:02.665120 Main finished
事件循環使用I/O復用技術,不斷循環監聽協程可以繼續執行的事件。當它們可以執行時,執行緒將繼續執行協程。
簡單理解I/O復用:我是一個快遞站的老闆。我不需要主動詢問每個快遞員的任務完成情況。相反,快遞員完成任務後會自行來找我。這提高了我的任務處理能力,我可以做更多的事情。
select、poll、epoll都可以實作I/O重複使用。與select和poll相比,epoll具有更好的性能。 Linux一般預設使用epoll,macOS使用kqueue,與epoll類似,效能也差不多。
pip install fastapi
啟動伺服器socket來監聽指定連接埠。如果運行在Linux系統上,選擇器預設使用epoll作為其實作。程式碼中使用epoll來註冊一個請求接收事件(accept事件)。當新的請求到來時,epoll會觸發並執行事件處理函數,同時註冊一個讀取事件(read event)來處理和回應請求資料。從Web端透過http://127.0.0.1:8000/訪問,傳回結果與範例1相同。伺服器運行日誌:
# app.py from typing import Union from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
直接使用Socket啟動伺服器。使用瀏覽器造訪http://127.0.0.1:8080/或使用curl http://127.0.0.1:8080/造訪時,會回傳{"Hello": "World"}
uvicorn app:app --reload
使用curl http://127.0.0.1:8001/造訪時,伺服器執行日誌:
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # Example of an asynchronous GET request @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code!= 200: raise HTTPException(status_code=response.status_code, detail="Failed to fetch data") return response.json()
非同步I/O在底層使用「協程」和「事件循環」實作。 「協程」確保當執行緒在執行過程中遇到標記的 I/O 操作時,不必等待 I/O 完成而是可以暫停並讓執行緒執行其他任務而不會阻塞。 「事件循環」使用I/O復用技術,不斷循環監視I/O事件。當某個I/O事件完成時,會觸發對應的回調,讓協程繼續執行。
最後介紹一下部署Flask/FastAPI的理想平台:Leapcell。
Leapcell是專為現代分散式應用程式設計的雲端運算平台。其按需付費的定價模式確保沒有閒置成本,這意味著用戶只需為他們實際使用的資源付費。
Leapcell對於WSGI/ASGI應用的獨特優點:
在文件中了解更多!
Leapcell Twitter:https://x.com/LeapcellHQ
以上是使用 FastAPI 掌握 Python 非同步 IO的詳細內容。更多資訊請關注PHP中文網其他相關文章!