首页  >  文章  >  后端开发  >  FastAPI、Pydantic、PsycopgPython Web API 的三位一体

FastAPI、Pydantic、PsycopgPython Web API 的三位一体

DDD
DDD原创
2024-10-26 00:29:28540浏览

第 1 部分:讨论

进入FastAPI

首先,对标题持保留态度。

如果我今天从头开始进行 Python Web API 开发,我可能会更仔细地关注 LiteStar,在我看来,它的架构更好,并且具有更好的项目治理结构。

但是我们有 FastAPI,而且它不会很快消失。我将它用于许多个人和专业项目,并且仍然喜欢它的简单性。

有关 FastAPI 设计模式的指南,请查看此页面。

FastAPI, Pydantic, Psycopgthe holy trinity for Python web APIs

检索数据库数据

尽管 FastAPI 在实际的“API”部分非常出色,但对我来说一直存在一个不确定性:如何最好地访问数据库,特别是如果我们还需要处理地理空间数据类型。

让我们回顾一下我们的选项。

注1:我们在这里只对异步库感兴趣,因为FastAPI是ASGI。

注 2:我只会讨论连接到 PostgreSQL,尽管部分讨论仍然与其他数据库相关。

FastAPI, Pydantic, Psycopgthe holy trinity for Python web APIs

编码简单 |复杂设计:ORM

处理数据库连接并将数据库表中的数据解析为 Python 对象。

  • SQLAlchemy2:Python ORM 世界中最大的竞争者。就我个人而言,我真的不喜欢这种语法,但每个都有自己的语法。

  • TortoiseORM:我个人非常喜欢这个受 Django 启发的异步 ORM;干净又好用。

  • 替代ORM:有很多,比如peewee、PonyORM等

中间立场:查询构建器

没有数据库连接。只需从基于 Python 的查询输出原始 SQL 并将其传递给数据库驱动程序。

  • SQLAlchemy Core:核心 SQL 查询生成器,没有映射到对象部分。还有一个基于此构建的更高级别的 ORM,称为数据库,看起来非常不错。不过,我确实想知道该项目的开发程度如何。

  • PyPika:我对这个不太了解。

简单的设计:数据库驱动程序

  • asyncpg:这是 Postgres 的黄金标准异步数据库驱动程序,是最早上市且性能最高的驱动程序之一。虽然所有其他驱动程序都使用 C 库 libpq 与 Postgres 交互,但 MagicStack 选择重写自己的自定义实现,并且也偏离了 Python DBAPI 规范。如果性能是您的主要标准,那么 asyncpg 可能是最好的选择。

  • psycopg3:psycopg2 显然是 Python/Postgres 的同步数据库驱动程序世界之王。 psycopg3(更名为 psycopg)是该库的下一个完全异步迭代。近年来,这个库确实发挥了自己的作用,我希望进一步讨论它。请参阅作者关于 psycopg3 早期的有趣博客。

请注意,这里显然围绕 ORM、查询构建器与原始 SQL 进行了更广泛、更概念化的讨论。我不会在这里介绍这个。

重复模型

Pydantic 与 FastAPI 捆绑在一起,非常适合建模、验证和序列化 API 响应。

如果我们决定使用 ORM 从数据库中检索数据,那么保持两组数据库模型同步是不是有点低效? (一个用于 ORM,另一个用于 Pydantic)?

如果我们可以使用 Pydantic 来建模数据库不是很好吗?

这正是 FastAPI 的创建者试图通过 SQLModel 库解决的问题。

虽然这很可能是问题的一个很好的解决方案,但我有一些担忧:

  • 这个项目会像FastAPI一样遭受单一维护者综合症吗?

  • 这仍然是一个相当年轻的项目和概念,文档并不出色。

  • 它本质上与 Pydantic 和 SQLAlchemy 紧密相关,这意味着迁移将非常困难。

  • 对于更复杂的查询,可能需要下拉到下面的 SQLAlchemy。

回到基础

这么多选择!分析瘫痪。

FastAPI, Pydantic, Psycopgthe holy trinity for Python web APIs

当存在不确定性时,我会遵循以下原则:保持简单。

SQL 发明于 50 年前,并且仍然是任何开发人员需要学习的关键技能。对于大多数用例来说,它的语法始终易于掌握且编写起来并不复杂(对于铁杆 ORM 用户来说,尝试一下,您可能会感到惊讶)。

天啊,现在我们甚至可以使用开源 LLM 来生成(大部分有效)SQL 查询并节省您的打字时间。

虽然 ORM 和查询构建器可能会来来去去,但数据库驱动程序可能更加一致。最初的 psycopg2 库是近 20 年前编写的,并且仍在全球生产中积极使用。

将 Psycopg 与 Pydantic 模型结合使用

正如所讨论的,虽然 psycopg 的性能可能不如 asyncpg(尽管这种理论性能对现实世界的影响是有争议的),但 psycopg 专注于易用性和熟悉的 API。

对我来说杀手级功能是行工厂。

此功能允许您将返回的数据库数据映射到任何 Python 对象,包括标准 lib 数据类、来自伟大 attrs 库的模型,当然还有 Pydantic 模型!

对我来说,这是最好的折衷方法:原始 SQL 的终极灵活性,以及​​ Pydantic 的验证/类型安全功能来对数据库进行建模。 Psycopg 还处理诸如变量输入卫生之类的事情,以避免 SQL 注入。

应该指出的是,asyncpg 还可以处理到 Pydantic 模型的映射,但更多的是作为一种解决方法,而不是内置功能。有关详细信息,请参阅此问题线程。我也不知道这种方法是否能与其他建模库很好地配合。

正如我上面提到的,我通常使用地理空间数据:一个经常被 ORM 和查询构建器忽视的领域。放弃原始 SQL 使我能够解析和解解析地理空间数据,因为我需要纯 Python 中更可接受的类型。请参阅我关于此主题的相关文章。

第 2 部分:用法示例

创建数据库表

这里我们使用原始 SQL 创建一个名为 user 的简单数据库表。

我还会考虑仅使用 SQL 处理数据库创建和迁移,但这是另一篇文章的主题。

init_db.sql

CREATE TYPE public.userrole AS ENUM (
    'READ_ONLY',
    'STANDARD',
    'ADMIN'
);

CREATE TABLE public.users (
    id integer NOT NULL,
    username character varying,
    role public.userrole NOT NULL DEFAULT 'STANDARD',
    profile_img character varying,
    email_address character varying,
    is_email_verified boolean DEFAULT false,
    registered_at timestamp with time zone DEFAULT now()
);

使用 Pydantic 为您的数据库建模

这里我们创建一个名为 DbUser 的模型:

db_models.py

from typing import Optional
from enum import Enum
from datetime import datetime
from pydantic import BaseModel
from pydantic.functional_validators import field_validator
from geojson_pydantic import Feature

class UserRole(str, Enum):
    """Types of user, mapped to database enum userrole."""

    READ_ONLY = "READ_ONLY"
    STANDARD = "STANDARD"
    ADMIN = "ADMIN"

class DbUser(BaseModel):
    """Table users."""

    id: int
    username: str
    role: Optional[UserRole] = UserRole.STANDARD
    profile_img: Optional[str] = None
    email_address: Optional[str] = None
    is_email_verified: bool = False
    registered_at: Optional[datetime]
    # This is a geospatial type I will handle in the SQL
    favourite_place: Optional[dict]

    # DB computed fields (handled in the SQL)
    total_users: Optional[int] = None

    # This example isn't very realistic, but you get the idea
    @field_validator("is_email_verified", mode="before")
    @classmethod
    def i_want_my_ints_as_bools(cls, value: int) -> bool:
        """Example of a validator to convert data type."""
        return bool(value)

这里我们得到了 Pydantic 的类型安全性和验证。

当从数据库中提取数据时,我们可以向该模型添加任何形式的验证或数据转换。

使用 FastAPI 设置 Psycopg

我们使用 psycopg_pool 创建一个池化数据库连接:

db.py

from fastapi import Request
from psycopg import Connection
from psycopg_pool import AsyncConnectionPool

# You should be using environment variables in a settings file here
from app.config import settings


def get_db_connection_pool() -> AsyncConnectionPool:
    """Get the connection pool for psycopg.

    NOTE the pool connection is opened in the FastAPI server startup (lifespan).

    Also note this is also a sync `def`, as it only returns a context manager.
    """
    return AsyncConnectionPool(
        conninfo=settings.DB_URL.unicode_string(), open=False
    )


async def db_conn(request: Request) -> Connection:
    """Get a connection from the psycopg pool.

    Info on connections vs cursors:
    https://www.psycopg.org/psycopg3/docs/advanced/async.html

    Here we are getting a connection from the pool, which will be returned
    after the session ends / endpoint finishes processing.

    In summary:
    - Connection is created on endpoint call.
    - Cursors are used to execute commands throughout endpoint.
      Note it is possible to create multiple cursors from the connection,
      but all will be executed in the same db 'transaction'.
    - Connection is closed on endpoint finish.
    """
    async with request.app.state.db_pool.connection() as conn:
        yield conn

接下来我们在FastAPI生命周期事件中打开连接池:

main.py

from contextlib import asynccontextmanager
from fastapi import FastAPI

from .db import get_db_connection_pool

@asynccontextmanager
async def lifespan(app: FastAPI):
    """FastAPI startup/shutdown event."""
    # For this demo I use print, but please use logging!
    print("Starting up FastAPI server.")

    # Create a pooled db connection and make available in app state
    # NOTE we can access 'request.app.state.db_pool' in endpoints
    app.state.db_pool = get_db_connection_pool()
    await app.state.db_pool.open()

    yield

    # Shutdown events
    print("Shutting down FastAPI server.")
    # Here we make sure to close the connection pool
    await app.state.db_pool.close()

现在,当您的 FastAPI 应用程序启动时,您应该有一个打开的连接池,准备从内部端点获取连接。

Pydantic 模型的辅助方法

向 Pydantic 模型添加一些方法以实现常用功能会很有用:获取一个用户、所有用户、创建用户、更新用户、删除用户。

但首先我们应该创建一些 Pydantic 模型,用于输入验证(创建新用户)和输出序列化(通过 API 的 JSON 响应)。

user_schemas.py

from typing import Annotated
from pydantic import BaseModel, Field
from pydantic.functional_validators import field_validator
from geojson_pydantic import FeatureCollection, Feature, MultiPolygon, Polygon
from .db_models import DbUser

class UserIn(DbUser):
    """User details for insert into DB."""

    # Exclude fields not required for input
    id: Annotated[int, Field(exclude=True)] = None
    favourite_place: Optional[Feature]

    @field_validator("favourite_place", mode="before")
    @classmethod
    def parse_input_geojson(
        cls,
        value: FeatureCollection | Feature | MultiPolygon | Polygon,
    ) -> Optional[Polygon]:
        """Parse any format geojson into a single Polygon."""
        if value is None:
            return None
        # NOTE I don't include this helper function for brevity
        featcol = normalise_to_single_geom_featcol(value)
        return featcol.get("features")[0].get("geometry")

class UserOut(DbUser):
    """User details for insert into DB."""

    # Ensure it's parsed as a Polygon geojson from db object
    favourite_place: Polygon

    # More logic to append computed values

然后我们可以定义我们的辅助方法:one、all、create:

db_models.py

...previous imports
from typing import Self, Optional
from fastapi.exceptions import HTTPException
from psycopg import Connection
from psycopg.rows import class_row

from .user_schemas import UserIn

class DbUser(BaseModel):
    """Table users."""

    ...the fields

    @classmethod
    async def one(cls, db: Connection, user_id: int) -> Self:
        """Get a user by ID.

        NOTE how the favourite_place field is converted in the db to geojson.
        """
        async with db.cursor(row_factory=class_row(cls)) as cur:
            sql = """
                SELECT
                    u.*,
                    ST_AsGeoJSON(favourite_place)::jsonb AS favourite_place,
                    (SELECT COUNT(*) FROM users) AS total_users
                FROM users u
                WHERE
                    u.id = %(user_id)s
                GROUP BY u.id;
            """

            await cur.execute(
                sql,
                {"user_id": user_id},
            )

            db_project = await cur.fetchone()
            if not db_project:
                raise KeyError(f"User ({user_identifier}) not found.")

            return db_project

    @classmethod
    async def all(
        cls, db: Connection, skip: int = 0, limit: int = 100
    ) -> Optional[list[Self]]:
        """Fetch all users."""
        async with db.cursor(row_factory=class_row(cls)) as cur:
            await cur.execute(
                """
                SELECT
                    *,
                    ST_AsGeoJSON(favourite_place)::jsonb
                FROM users
                OFFSET %(offset)s
                LIMIT %(limit)s;
                """,
                {"offset": skip, "limit": limit},
            )
            return await cur.fetchall()

    @classmethod
    async def create(
        cls,
        db: Connection,
        user_in: UserIn,
    ) -> Optional[Self]:
        """Create a new user."""

        # Omit defaults and empty values from the model
        model_dump = user_in.model_dump(exclude_none=True, exclude_default=True)
        columns = ", ".join(model_dump.keys())
        value_placeholders = ", ".join(f"%({key})s" for key in model_dump.keys())

        sql = f"""
            INSERT INTO users
                ({columns})
            VALUES
                ({value_placeholders})
            RETURNING *;
        """


        async with db.cursor(row_factory=class_row(cls)) as cur:
            await cur.execute(sql, model_dump)
            new_user = await cur.fetchone()

            if new_user is None:
                msg = f"Unknown SQL error for data: {model_dump}"
                print(f"Failed user creation: {model_dump}")
                raise HTTPException(status_code=500, detail=msg)

        return new_user

用法

routes.py

from typing import Annotated
from fastapi import Depends, HTTPException
from psycopg import Connection

from .main import app
from .db import db_conn
from .models import DbUser
from .user_schemas import UserIn, UserOut

@app.post("/", response_model=UserOut)
async def create_user(
    user_info: UserIn,
    db: Annotated[Connection, Depends(db_conn)],
):
    """Create a new user.

    Here the input is parsed and validated by UserIn
    then the output is parsed and validated by UserOut
    returning the user json data.
    """

    new_user = await DbUser.create(db, user_info)
    if not new_user:
        raise HTTPException(
            status_code=422,
            detail="User creation failed.",
        )

    return new_user

    # NOTE within an endpoint we can also use
    # DbUser.one(db, user_id) and DbUser.all(db)

这是我在我维护的项目 FMTM 中开始使用的方法,FMTM 是一个为世界各地社区收集现场数据的工具。

在此处查看完整的代码库。
如果您觉得这有用的话 ⭐!

目前就这些!我希望这对那里的人有帮助?

以上是FastAPI、Pydantic、PsycopgPython Web API 的三位一体的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn