首頁  >  文章  >  後端開發  >  FastAPI、Pydantic、PsycopgPython Web API 的三位一體

FastAPI、Pydantic、PsycopgPython Web API 的三位一體

DDD
DDD原創
2024-10-26 00:29:28457瀏覽

第 1 部分:討論

進入FastAPI

首先,對標題持保留態度。

如果我今天從頭開始進行 Python Web API 開發,我可能會更仔細地關注 LiteStar,在我看來,它的架構更好,並且具有更好的專案治理結構。

但是我們有 FastAPI,而且它不會很快消失。我將它用於許多個人和專業項目,並且仍然喜歡它的簡單性。

有關 FastAPI 設計模式的指南,請查看此頁面。

FastAPI, Pydantic, Psycopgthe holy trinity for Python web APIs

檢索資料庫數據

儘管 FastAPI 在實際的「API」部分中非常出色,但對我來說一直存在一個不確定性:如何最好地存取資料庫,特別是如果我們還需要處理地理空間資料類型。

讓我們回顧一下我們的選項。

註1:我們在這裡只對異步庫感興趣,因為FastAPI是ASGI。

註 2:我只會討論連接到 PostgreSQL,儘管部分討論仍然與其他資料庫相關。

FastAPI, Pydantic, Psycopgthe holy trinity for Python web APIs

編碼簡單 |複雜設計:ORM

處理資料庫連線並將資料庫表中的資料解析為 Python 物件。

  • SQLAlchemy2:Python ORM 世界中最大的競爭者。就我個人而言,我真的不喜歡這種語法,但每個都有自己的語法。

  • TortoiseORM:我個人很喜歡這個受 Django 啟發的非同步 ORM;乾淨又好用。

  • 替代ORM:有很多,如peewee、PonyORM等

中間立場:查詢建構器

沒有資料庫連線。只需從基於 Python 的查詢輸出原始 SQL 並將其傳遞給資料庫驅動程式。

  • SQLAlchemy Core:核心 SQL 查詢產生器,沒有對應到物件部分。還有一個基於此構建的更高級別的 ORM,稱為資料庫,看起來非常不錯。不過,我確實想知道該專案的開發程度如何。

  • PyPika:我對這個不太了解。

簡單的設計:資料庫驅動程式

  • asyncpg:這是 Postgres 的黃金標準非同步資料庫驅動程序,是最早上市且效能最高的驅動程式之一。雖然所有其他驅動程式都使用 C 庫 libpq 與 Postgres 交互,但 MagicStack 選擇重寫自己的自訂實現,也偏離了 Python DBAPI 規範。如果性能是您的主要標準,那麼 asyncpg 可能是最好的選擇。

  • psycopg3:psycopg2 顯然是 Python/Postgres 的同步資料庫驅動程式世界之王。 psycopg3(更名為 psycopg)是該函式庫的下一個完全非同步迭代。近年來,這個庫確實發揮了自己的作用,我希望進一步討論它。請參閱作者關於 psycopg3 早期的有趣部落格。

請注意,這裡顯然圍繞 ORM、查詢建構器與原始 SQL 進行了更廣泛、更概念化的討論。我不會在這裡介紹這個。

重複模型

Pydantic 與 FastAPI 捆綁在一起,非常適合建模、驗證和序列化 API 回應。

如果我們決定使用 ORM 從資料庫中檢索數據,那麼保持兩組資料庫模型同步是不是有點低效? (一個用於 ORM,另一個用於 Pydantic)?

如果我們可以使用 Pydantic 來建模資料庫不是很好嗎?

這正是 FastAPI 的創建者試圖透過 SQLModel 函式庫解決的問題。

雖然這很可能是問題的一個很好的解決方案,但我有一些擔憂:

  • 這個計畫會像FastAPI一樣遭受單一維護者症候群嗎?

  • 這仍然是一個相當年輕的專案和概念,文件並不出色。

  • 它本質上與 Pydantic 和 SQLAlchemy 緊密相關,這意味著遷移將非常困難。

  • 對於更複雜的查詢,可能需要下拉到下面的 SQLAlchemy。

回到基礎

這麼多選擇!分析癱瘓。

FastAPI, Pydantic, Psycopgthe holy trinity for Python web APIs

當存在不確定性時,我會遵循以下原則:保持簡單。

SQL 發明於 50 年前,並且仍然是任何開發人員需要學習的關鍵技能。對於大多數用例來說,它的語法始終易於掌握且編寫起來並不復雜(對於鐵桿 ORM 用戶來說,嘗試一下,您可能會感到驚訝)。

天啊,現在我們甚至可以使用開源 LLM 來產生(大部分有效)SQL 查詢並節省您的打字時間。

雖然 ORM 和查詢建構器可能會來來去去,但資料庫驅動程式可能更一致。最初的 psycopg2 函式庫是近 20 年前編寫的,並且仍在全球生產中積極使用。

將 Psycopg 與 Pydantic 模型結合使用

正如所討論的,雖然 psycopg 的性能可能不如 asyncpg(儘管這種理論性能對現實世界的影響是有爭議的),但 psycopg 專注於易用性和熟悉的 API。

對我來說殺手級功能是行工廠。

此功能允許您將返回的資料庫資料映射到任何 Python 對象,包括標準 lib 資料類、來自偉大 attrs 庫的模型,當然還有 Pydantic 模型!

對我來說,這是最好的折衷方法:原始 SQL 的終極靈活性,以及​​ Pydantic 的驗證/類型安全功能來對資料庫進行建模。 Psycopg 也處理諸如變數輸入衛生之類的事情,以避免 SQL 注入。

應該指出的是,asyncpg 還可以處理到 Pydantic 模型的映射,但更多的是作為一種解決方法,而不是內建功能。有關詳細信息,請參閱此問題線程。我也不知道這種方法是否能與其他建模庫很好地配合。

正如我上面提到的,我通常使用地理空間資料:一個經常被 ORM 和查詢建構器忽略的領域。放棄原始 SQL 使我能夠解析和解解析地理空間數據,因為我需要純 Python 中更可接受的類型。請參閱我關於此主題的相關文章。

第 2 部分:用法範例

建立資料庫表

這裡我們使用原始 SQL 建立一個名為 user 的簡單資料庫表。

我也會考慮只使用 SQL 處理資料庫建立和遷移,但這是另一篇文章的主題。

init_db.sql

CREATE TYPE public.userrole AS ENUM (
    'READ_ONLY',
    'STANDARD',
    'ADMIN'
);

CREATE TABLE public.users (
    id integer NOT NULL,
    username character varying,
    role public.userrole NOT NULL DEFAULT 'STANDARD',
    profile_img character varying,
    email_address character varying,
    is_email_verified boolean DEFAULT false,
    registered_at timestamp with time zone DEFAULT now()
);

使用 Pydantic 為您的資料庫建模

這裡我們建立一個名為 DbUser 的模型:

db_models.py

from typing import Optional
from enum import Enum
from datetime import datetime
from pydantic import BaseModel
from pydantic.functional_validators import field_validator
from geojson_pydantic import Feature

class UserRole(str, Enum):
    """Types of user, mapped to database enum userrole."""

    READ_ONLY = "READ_ONLY"
    STANDARD = "STANDARD"
    ADMIN = "ADMIN"

class DbUser(BaseModel):
    """Table users."""

    id: int
    username: str
    role: Optional[UserRole] = UserRole.STANDARD
    profile_img: Optional[str] = None
    email_address: Optional[str] = None
    is_email_verified: bool = False
    registered_at: Optional[datetime]
    # This is a geospatial type I will handle in the SQL
    favourite_place: Optional[dict]

    # DB computed fields (handled in the SQL)
    total_users: Optional[int] = None

    # This example isn't very realistic, but you get the idea
    @field_validator("is_email_verified", mode="before")
    @classmethod
    def i_want_my_ints_as_bools(cls, value: int) -> bool:
        """Example of a validator to convert data type."""
        return bool(value)

這裡我們得到了 Pydantic 的類型安全性和驗證。

當從資料庫中提取資料時,我們可以向該模型添加任何形式的驗證或資料轉換。

使用 FastAPI 設定 Psycopg

我們使用 psycopg_pool 建立一個池化資料庫連接:

db.py

from fastapi import Request
from psycopg import Connection
from psycopg_pool import AsyncConnectionPool

# You should be using environment variables in a settings file here
from app.config import settings


def get_db_connection_pool() -> AsyncConnectionPool:
    """Get the connection pool for psycopg.

    NOTE the pool connection is opened in the FastAPI server startup (lifespan).

    Also note this is also a sync `def`, as it only returns a context manager.
    """
    return AsyncConnectionPool(
        conninfo=settings.DB_URL.unicode_string(), open=False
    )


async def db_conn(request: Request) -> Connection:
    """Get a connection from the psycopg pool.

    Info on connections vs cursors:
    https://www.psycopg.org/psycopg3/docs/advanced/async.html

    Here we are getting a connection from the pool, which will be returned
    after the session ends / endpoint finishes processing.

    In summary:
    - Connection is created on endpoint call.
    - Cursors are used to execute commands throughout endpoint.
      Note it is possible to create multiple cursors from the connection,
      but all will be executed in the same db 'transaction'.
    - Connection is closed on endpoint finish.
    """
    async with request.app.state.db_pool.connection() as conn:
        yield conn

接下來我們在FastAPI生命週期事件中開啟連線池:

main.py

from contextlib import asynccontextmanager
from fastapi import FastAPI

from .db import get_db_connection_pool

@asynccontextmanager
async def lifespan(app: FastAPI):
    """FastAPI startup/shutdown event."""
    # For this demo I use print, but please use logging!
    print("Starting up FastAPI server.")

    # Create a pooled db connection and make available in app state
    # NOTE we can access 'request.app.state.db_pool' in endpoints
    app.state.db_pool = get_db_connection_pool()
    await app.state.db_pool.open()

    yield

    # Shutdown events
    print("Shutting down FastAPI server.")
    # Here we make sure to close the connection pool
    await app.state.db_pool.close()

現在,當您的 FastAPI 應用程式啟動時,您應該有一個開啟的連線池,準備從內部端點取得連線。

Pydantic 模型的輔助方法

為 Pydantic 模型添加一些方法以實現常用功能會很有用:取得一個使用者、所有使用者、建立使用者、更新使用者、刪除使用者。

但首先我們應該建立一些 Pydantic 模型,用於輸入驗證(建立新使用者)和輸出序列化(透過 API 的 JSON 回應)。

user_schemas.py

from typing import Annotated
from pydantic import BaseModel, Field
from pydantic.functional_validators import field_validator
from geojson_pydantic import FeatureCollection, Feature, MultiPolygon, Polygon
from .db_models import DbUser

class UserIn(DbUser):
    """User details for insert into DB."""

    # Exclude fields not required for input
    id: Annotated[int, Field(exclude=True)] = None
    favourite_place: Optional[Feature]

    @field_validator("favourite_place", mode="before")
    @classmethod
    def parse_input_geojson(
        cls,
        value: FeatureCollection | Feature | MultiPolygon | Polygon,
    ) -> Optional[Polygon]:
        """Parse any format geojson into a single Polygon."""
        if value is None:
            return None
        # NOTE I don't include this helper function for brevity
        featcol = normalise_to_single_geom_featcol(value)
        return featcol.get("features")[0].get("geometry")

class UserOut(DbUser):
    """User details for insert into DB."""

    # Ensure it's parsed as a Polygon geojson from db object
    favourite_place: Polygon

    # More logic to append computed values

然後我們可以定義我們的輔助方法:one、all、create:

db_models.py

...previous imports
from typing import Self, Optional
from fastapi.exceptions import HTTPException
from psycopg import Connection
from psycopg.rows import class_row

from .user_schemas import UserIn

class DbUser(BaseModel):
    """Table users."""

    ...the fields

    @classmethod
    async def one(cls, db: Connection, user_id: int) -> Self:
        """Get a user by ID.

        NOTE how the favourite_place field is converted in the db to geojson.
        """
        async with db.cursor(row_factory=class_row(cls)) as cur:
            sql = """
                SELECT
                    u.*,
                    ST_AsGeoJSON(favourite_place)::jsonb AS favourite_place,
                    (SELECT COUNT(*) FROM users) AS total_users
                FROM users u
                WHERE
                    u.id = %(user_id)s
                GROUP BY u.id;
            """

            await cur.execute(
                sql,
                {"user_id": user_id},
            )

            db_project = await cur.fetchone()
            if not db_project:
                raise KeyError(f"User ({user_identifier}) not found.")

            return db_project

    @classmethod
    async def all(
        cls, db: Connection, skip: int = 0, limit: int = 100
    ) -> Optional[list[Self]]:
        """Fetch all users."""
        async with db.cursor(row_factory=class_row(cls)) as cur:
            await cur.execute(
                """
                SELECT
                    *,
                    ST_AsGeoJSON(favourite_place)::jsonb
                FROM users
                OFFSET %(offset)s
                LIMIT %(limit)s;
                """,
                {"offset": skip, "limit": limit},
            )
            return await cur.fetchall()

    @classmethod
    async def create(
        cls,
        db: Connection,
        user_in: UserIn,
    ) -> Optional[Self]:
        """Create a new user."""

        # Omit defaults and empty values from the model
        model_dump = user_in.model_dump(exclude_none=True, exclude_default=True)
        columns = ", ".join(model_dump.keys())
        value_placeholders = ", ".join(f"%({key})s" for key in model_dump.keys())

        sql = f"""
            INSERT INTO users
                ({columns})
            VALUES
                ({value_placeholders})
            RETURNING *;
        """


        async with db.cursor(row_factory=class_row(cls)) as cur:
            await cur.execute(sql, model_dump)
            new_user = await cur.fetchone()

            if new_user is None:
                msg = f"Unknown SQL error for data: {model_dump}"
                print(f"Failed user creation: {model_dump}")
                raise HTTPException(status_code=500, detail=msg)

        return new_user

用法

routes.py

from typing import Annotated
from fastapi import Depends, HTTPException
from psycopg import Connection

from .main import app
from .db import db_conn
from .models import DbUser
from .user_schemas import UserIn, UserOut

@app.post("/", response_model=UserOut)
async def create_user(
    user_info: UserIn,
    db: Annotated[Connection, Depends(db_conn)],
):
    """Create a new user.

    Here the input is parsed and validated by UserIn
    then the output is parsed and validated by UserOut
    returning the user json data.
    """

    new_user = await DbUser.create(db, user_info)
    if not new_user:
        raise HTTPException(
            status_code=422,
            detail="User creation failed.",
        )

    return new_user

    # NOTE within an endpoint we can also use
    # DbUser.one(db, user_id) and DbUser.all(db)

這是我在我維護的專案 FMTM 中開始使用的方法,FMTM 是為世界各地社區收集現場資料的工具。

在此處查看完整的程式碼庫。
如果您覺得這有用的話 ⭐!

目前就這些!我希望這對那裡的人有幫助?

以上是FastAPI、Pydantic、PsycopgPython Web API 的三位一體的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn