ホームページ  >  記事  >  バックエンド開発  >  FastAPI、Pydantic、Psycopg Python Web API の三位一体

FastAPI、Pydantic、Psycopg Python Web API の三位一体

DDD
DDDオリジナル
2024-10-26 00:29:28457ブラウズ

パート 1: ディスカッション

FastAPI を入力してください

まず、タイトルに塩をひとつまみ入れてください。

私が今、Python Web API 開発をゼロから始めていたとしたら、おそらく LiteStar をもっと詳しく検討するでしょう。LiteStar の方がより適切に設計されており、より優れたプロジェクト ガバナンス構造を備えていると思われます。

しかし、FastAPI はありますが、それがすぐになくなるわけではありません。私は多くの個人的および専門的なプロジェクトにこれを使用していますが、今でもそのシンプルさを楽しんでいます。

FastAPI 設計パターンのガイドについては、このページを参照してください。

FastAPI, Pydantic, Psycopgthe holy trinity for Python web APIs

データベースデータの取得

FastAPI は実際の「API」部分では優れていますが、私にとって常に不確実性が 1 つありました。それは、データベースに最適にアクセスする方法、特に地理空間データ型も処理する必要がある場合です。

オプションを確認してみましょう。

注 1: FastAPI は ASGI であるため、ここでは async ライブラリのみに興味があります。

注 2: PostgreSQL への接続についてのみ説明しますが、説明の一部は他のデータベースにも関連します。

FastAPI, Pydantic, Psycopgthe holy trinity for Python web APIs

シンプルなコーディング |複雑な設計: ORM

データベース接続と、データベース テーブルからのデータの Python オブジェクトへの解析を処理します。

  • SQLAlchemy2: Python ORM 界の最大の候補。個人的にはこの構文が本当に嫌いですが、それぞれ独自の構文があります。

  • TortoiseORM: 私は個人的に、この Django からインスピレーションを得た非同期 ORM がとても気に入っています。清潔で使い心地が良いです。

  • 代替 ORM: peewee、PonyORM など、多数あります。

中間点: クエリ ビルダー

データベース接続がありません。 Python ベースのクエリから生の SQL を出力し、それをデータベース ドライバーに渡すだけです。

  • SQLAlchemy Core: オブジェクト部分へのマッピングを含まない、コア SQL クエリ ビルダー。これにはデータベースと呼ばれる、非常に見栄えの良い高レベルの ORM が構築されています。しかし、このプロジェクトがどれほど活発に開発されているのかは疑問です。

  • PyPika: これについてはあまり知りません。

シンプルな設計: データベースドライバー

  • asyncpg: これは Postgres のゴールドスタンダードの非同期データベースドライバーであり、最初に市場に投入されたドライバーの 1 つであり、最もパフォーマンスが優れています。他のすべてのドライバーは Postgres とのインターフェースに C ライブラリ libpq を使用しますが、MagicStack は独自のカスタム実装を書き直すことを選択し、Python DBAPI 仕様からも逸脱しています。ここでパフォーマンスが主な基準である場合は、おそらく asyncpg が最良の選択肢です。

  • psycopg3: まあ、psycopg2 は明らかに Python/Postgres の 同期 データベース ドライバーの世界の王様でした。 psycopg3 (単に psycopg にブランド変更されました) は、このライブラリの次の完全に非同期の反復です。このライブラリはここ数年で真価を発揮してきたので、さらに詳しく説明したいと思います。 psycopg3 の初期の頃についての著者によるこの興味深いブログを参照してください。

ここでは、ORM、クエリ ビルダー、生の SQL に関して、より広範で概念的な議論が行われることは明らかであることに注意してください。ここではそれについては説明しません。

重複したモデル

Pydantic は FastAPI にバンドルされており、API 応答のモデリング、検証、シリアル化に優れています。

データベースからデータを取得するために ORM を使用することにした場合、2 セットのデータベース モデルの同期を維持するのは少し非効率的ではないでしょうか? (1 つは ORM 用、もう 1 つは Pydantic 用)?

Pydantic を使用してデータベースをモデル化できたら素晴らしいと思いませんか?

これはまさに、FastAPI の作成者が SQLModel ライブラリを使用して解決しようとした問題です。

これは問題に対する優れた解決策である可能性が非常に高いですが、いくつかの懸念があります:

  • このプロジェクトは FastAPI のようなシングルメンテナ症候群に悩まされますか?

  • これはまだ比較的若いプロジェクトとコンセプトであり、ドキュメントはそれほど素晴らしいものではありません。

  • 本質的に Pydantic と SQLAlchemy と結びついているため、移行は非常に困難になります。

  • より複雑なクエリの場合は、その下の SQLAlchemy へのドロップダウンが必要になる場合があります。

基本に戻る

オプションがたくさんあります!分析麻痺。

FastAPI, Pydantic, Psycopgthe holy trinity for Python web APIs

不確実性がある場合、私は次の教訓を使用します: シンプルにしてください。

SQL は 50 年前に発明され、今でも開発者にとって学ぶべき重要なスキルです。その構文は一貫して理解しやすく、ほとんどのユースケースで簡単に作成できます (熱心な ORM ユーザーは試してみてください。驚くかもしれません)。

なんと、最近ではオープンソース LLM を使用して (ほとんど機能する) SQL クエリを生成し、入力の手間を省くことができるようになりました。

ORM とクエリ ビルダーは現れたり消えたりする可能性がありますが、データベース ドライバーの方が一貫性があると考えられます。オリジナルの psycopg2 ライブラリは 20 年近く前に作成され、今でも世界中で実稼働環境で積極的に使用されています。

Psycopg を Pydantic モデルで使用する

前述したように、psycopg は asyncpg ほどパフォーマンスが高くない可能性がありますが (ただし、この理論上のパフォーマンスが実際にどのような意味を持つかについては議論の余地があります)、psycopg は使いやすさと使い慣れた API に重点を置いています。

私にとってキラー機能は Row Factory です。

この機能を使用すると、返されたデータベース データを、標準の lib データクラス、優れた attrs ライブラリのモデル、そしてもちろん Pydantic モデルなどの Python オブジェクトにマッピングできます。

私にとって、これはアプローチの最良の折衷案です。つまり、生の SQL の究極の柔軟性と、データベースをモデル化するための Pydantic の検証/型安全機能を組み合わせたものです。 Psycopg は、SQL インジェクションを回避するために変数入力のサニテーションなども処理します。

asyncpg は Pydantic モデルへのマッピングも処理できますが、これは組み込み機能というよりは回避策であることに注意してください。詳細については、この問題のスレッドを参照してください。また、このアプローチが他のモデリング ライブラリで適切に機能するかどうかもわかりません。

上で述べたように、私は通常、地理空間データを扱っています。この領域は、ORM やクエリ ビルダーによって無視されがちです。生の SQL にドロップすると、純粋な Python でより受け入れられる型が必要になるため、地理空間データを解析および解析解除できるようになります。このトピックに関する私の関連記事をご覧ください。

パート 2: 使用例

データベーステーブルの作成

ここでは、生の SQL で user という単純なデータベース テーブルを作成します。

SQL のみを使用してデータベースの作成と移行を処理することも検討しますが、これは別の記事で取り上げます。

init_db.sql

CREATE TYPE public.userrole AS ENUM (
    'READ_ONLY',
    'STANDARD',
    'ADMIN'
);

CREATE TABLE public.users (
    id integer NOT NULL,
    username character varying,
    role public.userrole NOT NULL DEFAULT 'STANDARD',
    profile_img character varying,
    email_address character varying,
    is_email_verified boolean DEFAULT false,
    registered_at timestamp with time zone DEFAULT now()
);

Pydantic でデータベースをモデル化する

ここでは、DbUser というモデルを作成します。

db_models.py

from typing import Optional
from enum import Enum
from datetime import datetime
from pydantic import BaseModel
from pydantic.functional_validators import field_validator
from geojson_pydantic import Feature

class UserRole(str, Enum):
    """Types of user, mapped to database enum userrole."""

    READ_ONLY = "READ_ONLY"
    STANDARD = "STANDARD"
    ADMIN = "ADMIN"

class DbUser(BaseModel):
    """Table users."""

    id: int
    username: str
    role: Optional[UserRole] = UserRole.STANDARD
    profile_img: Optional[str] = None
    email_address: Optional[str] = None
    is_email_verified: bool = False
    registered_at: Optional[datetime]
    # This is a geospatial type I will handle in the SQL
    favourite_place: Optional[dict]

    # DB computed fields (handled in the SQL)
    total_users: Optional[int] = None

    # This example isn't very realistic, but you get the idea
    @field_validator("is_email_verified", mode="before")
    @classmethod
    def i_want_my_ints_as_bools(cls, value: int) -> bool:
        """Example of a validator to convert data type."""
        return bool(value)

ここでは、Pydantic の型安全性と検証を取得します。

データベースからデータが抽出されるときに、このモデルに任意の形式の検証またはデータ変換を追加できます。

FastAPI を使用した Psycopg のセットアップ

psycopg_pool を使用して、プールされた データベース接続を作成します。

db.py

from fastapi import Request
from psycopg import Connection
from psycopg_pool import AsyncConnectionPool

# You should be using environment variables in a settings file here
from app.config import settings


def get_db_connection_pool() -> AsyncConnectionPool:
    """Get the connection pool for psycopg.

    NOTE the pool connection is opened in the FastAPI server startup (lifespan).

    Also note this is also a sync `def`, as it only returns a context manager.
    """
    return AsyncConnectionPool(
        conninfo=settings.DB_URL.unicode_string(), open=False
    )


async def db_conn(request: Request) -> Connection:
    """Get a connection from the psycopg pool.

    Info on connections vs cursors:
    https://www.psycopg.org/psycopg3/docs/advanced/async.html

    Here we are getting a connection from the pool, which will be returned
    after the session ends / endpoint finishes processing.

    In summary:
    - Connection is created on endpoint call.
    - Cursors are used to execute commands throughout endpoint.
      Note it is possible to create multiple cursors from the connection,
      but all will be executed in the same db 'transaction'.
    - Connection is closed on endpoint finish.
    """
    async with request.app.state.db_pool.connection() as conn:
        yield conn

次に、FastAPI ライフスパン イベントで接続プールを開きます。

main.py

from contextlib import asynccontextmanager
from fastapi import FastAPI

from .db import get_db_connection_pool

@asynccontextmanager
async def lifespan(app: FastAPI):
    """FastAPI startup/shutdown event."""
    # For this demo I use print, but please use logging!
    print("Starting up FastAPI server.")

    # Create a pooled db connection and make available in app state
    # NOTE we can access 'request.app.state.db_pool' in endpoints
    app.state.db_pool = get_db_connection_pool()
    await app.state.db_pool.open()

    yield

    # Shutdown events
    print("Shutting down FastAPI server.")
    # Here we make sure to close the connection pool
    await app.state.db_pool.close()

FastAPI アプリを起動すると、接続プールが開いており、エンドポイント内から接続できるようになります。

Pydantic モデルのヘルパー メソッド

一般的な機能として、Pydantic モデルにいくつかのメソッドを追加すると便利です: 1 人のユーザー、すべてのユーザーの取得、ユーザーの作成、ユーザーの更新、ユーザーの削除。

しかし、最初に、入力検証 (新しいユーザーを作成するため) と出力シリアル化 (API 経由の JSON 応答) のための Pydantic モデルをいくつか作成する必要があります。

user_schemas.py

from typing import Annotated
from pydantic import BaseModel, Field
from pydantic.functional_validators import field_validator
from geojson_pydantic import FeatureCollection, Feature, MultiPolygon, Polygon
from .db_models import DbUser

class UserIn(DbUser):
    """User details for insert into DB."""

    # Exclude fields not required for input
    id: Annotated[int, Field(exclude=True)] = None
    favourite_place: Optional[Feature]

    @field_validator("favourite_place", mode="before")
    @classmethod
    def parse_input_geojson(
        cls,
        value: FeatureCollection | Feature | MultiPolygon | Polygon,
    ) -> Optional[Polygon]:
        """Parse any format geojson into a single Polygon."""
        if value is None:
            return None
        # NOTE I don't include this helper function for brevity
        featcol = normalise_to_single_geom_featcol(value)
        return featcol.get("features")[0].get("geometry")

class UserOut(DbUser):
    """User details for insert into DB."""

    # Ensure it's parsed as a Polygon geojson from db object
    favourite_place: Polygon

    # More logic to append computed values

次に、ヘルパー メソッドを定義します: one、all、create:

db_models.py

...previous imports
from typing import Self, Optional
from fastapi.exceptions import HTTPException
from psycopg import Connection
from psycopg.rows import class_row

from .user_schemas import UserIn

class DbUser(BaseModel):
    """Table users."""

    ...the fields

    @classmethod
    async def one(cls, db: Connection, user_id: int) -> Self:
        """Get a user by ID.

        NOTE how the favourite_place field is converted in the db to geojson.
        """
        async with db.cursor(row_factory=class_row(cls)) as cur:
            sql = """
                SELECT
                    u.*,
                    ST_AsGeoJSON(favourite_place)::jsonb AS favourite_place,
                    (SELECT COUNT(*) FROM users) AS total_users
                FROM users u
                WHERE
                    u.id = %(user_id)s
                GROUP BY u.id;
            """

            await cur.execute(
                sql,
                {"user_id": user_id},
            )

            db_project = await cur.fetchone()
            if not db_project:
                raise KeyError(f"User ({user_identifier}) not found.")

            return db_project

    @classmethod
    async def all(
        cls, db: Connection, skip: int = 0, limit: int = 100
    ) -> Optional[list[Self]]:
        """Fetch all users."""
        async with db.cursor(row_factory=class_row(cls)) as cur:
            await cur.execute(
                """
                SELECT
                    *,
                    ST_AsGeoJSON(favourite_place)::jsonb
                FROM users
                OFFSET %(offset)s
                LIMIT %(limit)s;
                """,
                {"offset": skip, "limit": limit},
            )
            return await cur.fetchall()

    @classmethod
    async def create(
        cls,
        db: Connection,
        user_in: UserIn,
    ) -> Optional[Self]:
        """Create a new user."""

        # Omit defaults and empty values from the model
        model_dump = user_in.model_dump(exclude_none=True, exclude_default=True)
        columns = ", ".join(model_dump.keys())
        value_placeholders = ", ".join(f"%({key})s" for key in model_dump.keys())

        sql = f"""
            INSERT INTO users
                ({columns})
            VALUES
                ({value_placeholders})
            RETURNING *;
        """


        async with db.cursor(row_factory=class_row(cls)) as cur:
            await cur.execute(sql, model_dump)
            new_user = await cur.fetchone()

            if new_user is None:
                msg = f"Unknown SQL error for data: {model_dump}"
                print(f"Failed user creation: {model_dump}")
                raise HTTPException(status_code=500, detail=msg)

        return new_user

使用法

routes.py

from typing import Annotated
from fastapi import Depends, HTTPException
from psycopg import Connection

from .main import app
from .db import db_conn
from .models import DbUser
from .user_schemas import UserIn, UserOut

@app.post("/", response_model=UserOut)
async def create_user(
    user_info: UserIn,
    db: Annotated[Connection, Depends(db_conn)],
):
    """Create a new user.

    Here the input is parsed and validated by UserIn
    then the output is parsed and validated by UserOut
    returning the user json data.
    """

    new_user = await DbUser.create(db, user_info)
    if not new_user:
        raise HTTPException(
            status_code=422,
            detail="User creation failed.",
        )

    return new_user

    # NOTE within an endpoint we can also use
    # DbUser.one(db, user_id) and DbUser.all(db)

これは、私が管理しているプロジェクト、世界中のコミュニティのフィールド データを収集するツール FMTM で使用し始めたアプローチです。

完全なコードベースはこちらをご覧ください。
そして、⭐これが役に立ったと思ったら!

今回はここまでです!これが誰かの役に立つことを願っています?

以上がFastAPI、Pydantic、Psycopg Python Web API の三位一体の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。