首先,对标题持保留态度。
如果我今天从头开始进行 Python Web API 开发,我可能会更仔细地关注 LiteStar,在我看来,它的架构更好,并且具有更好的项目治理结构。
但是我们有 FastAPI,而且它不会很快消失。我将它用于许多个人和专业项目,并且仍然喜欢它的简单性。
有关 FastAPI 设计模式的指南,请查看此页面。
尽管 FastAPI 在实际的“API”部分非常出色,但对我来说一直存在一个不确定性:如何最好地访问数据库,特别是如果我们还需要处理地理空间数据类型。
让我们回顾一下我们的选项。
注1:我们在这里只对异步库感兴趣,因为FastAPI是ASGI。
注 2:我只会讨论连接到 PostgreSQL,尽管部分讨论仍然与其他数据库相关。
处理数据库连接并将数据库表中的数据解析为 Python 对象。
SQLAlchemy2:Python ORM 世界中最大的竞争者。就我个人而言,我真的不喜欢这种语法,但每个都有自己的语法。
TortoiseORM:我个人非常喜欢这个受 Django 启发的异步 ORM;干净又好用。
替代ORM:有很多,比如peewee、PonyORM等
没有数据库连接。只需从基于 Python 的查询输出原始 SQL 并将其传递给数据库驱动程序。
SQLAlchemy Core:核心 SQL 查询生成器,没有映射到对象部分。还有一个基于此构建的更高级别的 ORM,称为数据库,看起来非常不错。不过,我确实想知道该项目的开发程度如何。
PyPika:我对这个不太了解。
asyncpg:这是 Postgres 的黄金标准异步数据库驱动程序,是最早上市且性能最高的驱动程序之一。虽然所有其他驱动程序都使用 C 库 libpq 与 Postgres 交互,但 MagicStack 选择重写自己的自定义实现,并且也偏离了 Python DBAPI 规范。如果性能是您的主要标准,那么 asyncpg 可能是最好的选择。
psycopg3:psycopg2 显然是 Python/Postgres 的同步数据库驱动程序世界之王。 psycopg3(更名为 psycopg)是该库的下一个完全异步迭代。近年来,这个库确实发挥了自己的作用,我希望进一步讨论它。请参阅作者关于 psycopg3 早期的有趣博客。
请注意,这里显然围绕 ORM、查询构建器与原始 SQL 进行了更广泛、更概念化的讨论。我不会在这里介绍这个。
Pydantic 与 FastAPI 捆绑在一起,非常适合建模、验证和序列化 API 响应。
如果我们决定使用 ORM 从数据库中检索数据,那么保持两组数据库模型同步是不是有点低效? (一个用于 ORM,另一个用于 Pydantic)?
如果我们可以使用 Pydantic 来建模数据库不是很好吗?
这正是 FastAPI 的创建者试图通过 SQLModel 库解决的问题。
虽然这很可能是问题的一个很好的解决方案,但我有一些担忧:
这个项目会像FastAPI一样遭受单一维护者综合症吗?
这仍然是一个相当年轻的项目和概念,文档并不出色。
它本质上与 Pydantic 和 SQLAlchemy 紧密相关,这意味着迁移将非常困难。
对于更复杂的查询,可能需要下拉到下面的 SQLAlchemy。
这么多选择!分析瘫痪。
当存在不确定性时,我会遵循以下原则:保持简单。
SQL 发明于 50 年前,并且仍然是任何开发人员需要学习的关键技能。对于大多数用例来说,它的语法始终易于掌握且编写起来并不复杂(对于铁杆 ORM 用户来说,尝试一下,您可能会感到惊讶)。
天啊,现在我们甚至可以使用开源 LLM 来生成(大部分有效)SQL 查询并节省您的打字时间。
虽然 ORM 和查询构建器可能会来来去去,但数据库驱动程序可能更加一致。最初的 psycopg2 库是近 20 年前编写的,并且仍在全球生产中积极使用。
正如所讨论的,虽然 psycopg 的性能可能不如 asyncpg(尽管这种理论性能对现实世界的影响是有争议的),但 psycopg 专注于易用性和熟悉的 API。
对我来说杀手级功能是行工厂。
此功能允许您将返回的数据库数据映射到任何 Python 对象,包括标准 lib 数据类、来自伟大 attrs 库的模型,当然还有 Pydantic 模型!
对我来说,这是最好的折衷方法:原始 SQL 的终极灵活性,以及 Pydantic 的验证/类型安全功能来对数据库进行建模。 Psycopg 还处理诸如变量输入卫生之类的事情,以避免 SQL 注入。
应该指出的是,asyncpg 还可以处理到 Pydantic 模型的映射,但更多的是作为一种解决方法,而不是内置功能。有关详细信息,请参阅此问题线程。我也不知道这种方法是否能与其他建模库很好地配合。
正如我上面提到的,我通常使用地理空间数据:一个经常被 ORM 和查询构建器忽视的领域。放弃原始 SQL 使我能够解析和解解析地理空间数据,因为我需要纯 Python 中更可接受的类型。请参阅我关于此主题的相关文章。
这里我们使用原始 SQL 创建一个名为 user 的简单数据库表。
我还会考虑仅使用 SQL 处理数据库创建和迁移,但这是另一篇文章的主题。
init_db.sql
CREATE TYPE public.userrole AS ENUM ( 'READ_ONLY', 'STANDARD', 'ADMIN' ); CREATE TABLE public.users ( id integer NOT NULL, username character varying, role public.userrole NOT NULL DEFAULT 'STANDARD', profile_img character varying, email_address character varying, is_email_verified boolean DEFAULT false, registered_at timestamp with time zone DEFAULT now() );
这里我们创建一个名为 DbUser 的模型:
db_models.py
from typing import Optional from enum import Enum from datetime import datetime from pydantic import BaseModel from pydantic.functional_validators import field_validator from geojson_pydantic import Feature class UserRole(str, Enum): """Types of user, mapped to database enum userrole.""" READ_ONLY = "READ_ONLY" STANDARD = "STANDARD" ADMIN = "ADMIN" class DbUser(BaseModel): """Table users.""" id: int username: str role: Optional[UserRole] = UserRole.STANDARD profile_img: Optional[str] = None email_address: Optional[str] = None is_email_verified: bool = False registered_at: Optional[datetime] # This is a geospatial type I will handle in the SQL favourite_place: Optional[dict] # DB computed fields (handled in the SQL) total_users: Optional[int] = None # This example isn't very realistic, but you get the idea @field_validator("is_email_verified", mode="before") @classmethod def i_want_my_ints_as_bools(cls, value: int) -> bool: """Example of a validator to convert data type.""" return bool(value)
这里我们得到了 Pydantic 的类型安全性和验证。
当从数据库中提取数据时,我们可以向该模型添加任何形式的验证或数据转换。
我们使用 psycopg_pool 创建一个池化数据库连接:
db.py
from fastapi import Request from psycopg import Connection from psycopg_pool import AsyncConnectionPool # You should be using environment variables in a settings file here from app.config import settings def get_db_connection_pool() -> AsyncConnectionPool: """Get the connection pool for psycopg. NOTE the pool connection is opened in the FastAPI server startup (lifespan). Also note this is also a sync `def`, as it only returns a context manager. """ return AsyncConnectionPool( conninfo=settings.DB_URL.unicode_string(), open=False ) async def db_conn(request: Request) -> Connection: """Get a connection from the psycopg pool. Info on connections vs cursors: https://www.psycopg.org/psycopg3/docs/advanced/async.html Here we are getting a connection from the pool, which will be returned after the session ends / endpoint finishes processing. In summary: - Connection is created on endpoint call. - Cursors are used to execute commands throughout endpoint. Note it is possible to create multiple cursors from the connection, but all will be executed in the same db 'transaction'. - Connection is closed on endpoint finish. """ async with request.app.state.db_pool.connection() as conn: yield conn
接下来我们在FastAPI生命周期事件中打开连接池:
main.py
from contextlib import asynccontextmanager from fastapi import FastAPI from .db import get_db_connection_pool @asynccontextmanager async def lifespan(app: FastAPI): """FastAPI startup/shutdown event.""" # For this demo I use print, but please use logging! print("Starting up FastAPI server.") # Create a pooled db connection and make available in app state # NOTE we can access 'request.app.state.db_pool' in endpoints app.state.db_pool = get_db_connection_pool() await app.state.db_pool.open() yield # Shutdown events print("Shutting down FastAPI server.") # Here we make sure to close the connection pool await app.state.db_pool.close()
现在,当您的 FastAPI 应用程序启动时,您应该有一个打开的连接池,准备从内部端点获取连接。
向 Pydantic 模型添加一些方法以实现常用功能会很有用:获取一个用户、所有用户、创建用户、更新用户、删除用户。
但首先我们应该创建一些 Pydantic 模型,用于输入验证(创建新用户)和输出序列化(通过 API 的 JSON 响应)。
user_schemas.py
from typing import Annotated from pydantic import BaseModel, Field from pydantic.functional_validators import field_validator from geojson_pydantic import FeatureCollection, Feature, MultiPolygon, Polygon from .db_models import DbUser class UserIn(DbUser): """User details for insert into DB.""" # Exclude fields not required for input id: Annotated[int, Field(exclude=True)] = None favourite_place: Optional[Feature] @field_validator("favourite_place", mode="before") @classmethod def parse_input_geojson( cls, value: FeatureCollection | Feature | MultiPolygon | Polygon, ) -> Optional[Polygon]: """Parse any format geojson into a single Polygon.""" if value is None: return None # NOTE I don't include this helper function for brevity featcol = normalise_to_single_geom_featcol(value) return featcol.get("features")[0].get("geometry") class UserOut(DbUser): """User details for insert into DB.""" # Ensure it's parsed as a Polygon geojson from db object favourite_place: Polygon # More logic to append computed values
然后我们可以定义我们的辅助方法:one、all、create:
db_models.py
...previous imports from typing import Self, Optional from fastapi.exceptions import HTTPException from psycopg import Connection from psycopg.rows import class_row from .user_schemas import UserIn class DbUser(BaseModel): """Table users.""" ...the fields @classmethod async def one(cls, db: Connection, user_id: int) -> Self: """Get a user by ID. NOTE how the favourite_place field is converted in the db to geojson. """ async with db.cursor(row_factory=class_row(cls)) as cur: sql = """ SELECT u.*, ST_AsGeoJSON(favourite_place)::jsonb AS favourite_place, (SELECT COUNT(*) FROM users) AS total_users FROM users u WHERE u.id = %(user_id)s GROUP BY u.id; """ await cur.execute( sql, {"user_id": user_id}, ) db_project = await cur.fetchone() if not db_project: raise KeyError(f"User ({user_identifier}) not found.") return db_project @classmethod async def all( cls, db: Connection, skip: int = 0, limit: int = 100 ) -> Optional[list[Self]]: """Fetch all users.""" async with db.cursor(row_factory=class_row(cls)) as cur: await cur.execute( """ SELECT *, ST_AsGeoJSON(favourite_place)::jsonb FROM users OFFSET %(offset)s LIMIT %(limit)s; """, {"offset": skip, "limit": limit}, ) return await cur.fetchall() @classmethod async def create( cls, db: Connection, user_in: UserIn, ) -> Optional[Self]: """Create a new user.""" # Omit defaults and empty values from the model model_dump = user_in.model_dump(exclude_none=True, exclude_default=True) columns = ", ".join(model_dump.keys()) value_placeholders = ", ".join(f"%({key})s" for key in model_dump.keys()) sql = f""" INSERT INTO users ({columns}) VALUES ({value_placeholders}) RETURNING *; """ async with db.cursor(row_factory=class_row(cls)) as cur: await cur.execute(sql, model_dump) new_user = await cur.fetchone() if new_user is None: msg = f"Unknown SQL error for data: {model_dump}" print(f"Failed user creation: {model_dump}") raise HTTPException(status_code=500, detail=msg) return new_user
routes.py
from typing import Annotated from fastapi import Depends, HTTPException from psycopg import Connection from .main import app from .db import db_conn from .models import DbUser from .user_schemas import UserIn, UserOut @app.post("/", response_model=UserOut) async def create_user( user_info: UserIn, db: Annotated[Connection, Depends(db_conn)], ): """Create a new user. Here the input is parsed and validated by UserIn then the output is parsed and validated by UserOut returning the user json data. """ new_user = await DbUser.create(db, user_info) if not new_user: raise HTTPException( status_code=422, detail="User creation failed.", ) return new_user # NOTE within an endpoint we can also use # DbUser.one(db, user_id) and DbUser.all(db)
这是我在我维护的项目 FMTM 中开始使用的方法,FMTM 是一个为世界各地社区收集现场数据的工具。
在此处查看完整的代码库。
如果您觉得这有用的话 ⭐!
目前就这些!我希望这对那里的人有帮助?
以上是FastAPI、Pydantic、PsycopgPython Web API 的三位一体的详细内容。更多信息请关注PHP中文网其他相关文章!