ホームページ  >  記事  >  バックエンド開発  >  Python のランタイムに基づいてビジネス SQL コードを変更するにはどうすればよいですか?

Python のランタイムに基づいてビジネス SQL コードを変更するにはどうすればよいですか?

PHPz
PHPz転載
2023-05-08 14:22:07769ブラウズ

1. 原点

最近、プロジェクトは SASS の実装を準備しています。SASS の特徴の 1 つはマルチテナントであり、各テナント間のデータを分離する必要があります。一般的なデータベース分離ソリューションにはデータベースが含まれます。分離、テーブル分離、フィールド分離。現在、私はテーブル分離とフィールド分離のみを使用しています (データベース分離の原理は似ています)。フィールドの分離は比較的単純です。つまり、クエリ条件が異なります。たとえば、次の SQL クエリ:

SELECT * FROM t_demo WHERE tenant_id='xxx' AND is_del=0

ただし、厳密にするために、クエリ条件が異なるかどうかを確認する必要があります。 SQL を実行する前に、対応するテーブルが組み込まれます。 tenant_id のクエリ フィールド。

テーブル分離の場合は少し面倒で、運用中に特定のデータテーブルを対応するテナントIDに応じて処理する必要があります。例えば、次のようなSQLクエリがあるとします。

SELECT * FROM t_demo WHERE is_del=0

テナント A に遭遇した場合、SQL クエリは次のようになります。

SELECT * FROM t_demo_a WHERE is_del=0

テナント B に遭遇した場合、SQL クエリは次のようになります。

SELECT * FROM t_demo_b WHERE is_del=0

マーチャントの数が固定されている場合、通常はコード内に

if-else を記述して判断するだけで十分ですが、一般的な SASS アプリケーションのマーチャントは随時追加されるため、これについては SQLロジックは次のようになります:

def sql_handle(tenant_id: str):
    table_name: str = f"t_demo_{tenant_id}"
    sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"

しかし、これにはいくつかの問題があり、ORM の場合、最初に

t_demo に対応するテーブル オブジェクトを作成するだけで十分ですが、現在は複数のマーチャントに基づいて複数のテーブル オブジェクトを作成することは非現実的です。第二に、裸の SQL を作成する場合、通常は IDE を使用してチェックします。そのような SQL の場合:

sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"

IDE ではチェックできません。もう 1 つの最も深刻な問題があります。それは、現在のプロジェクトがすでに非常に大きいということです。関連する各テーブル呼び出しを調整して変更すると、作業量が非常に膨大になるため、最良の解決策は、エンジン ライブラリを取得した後に使用することです。ユーザーによって渡された SQL ステートメントを

MySQL サーバーに送信する前に、SQL はマーチャント ID に従って自動的に変更されます。この効果を実現するには、使用する MySQL## に侵入する必要があります. # エンジン ライブラリ、ニーズに合わせて内部のメソッドを変更します。

dbutils

を使用するか sqlalchemy を使用するかに関係なく、エンジン ライブラリを指定できます。現在一般的に使用されているエンジン ライブラリは pymysql です。以下、pymysqlを例にして説明します。

2. ライブラリに侵入する

使用するエンジン ライブラリに侵入する必要があるため、最初にエンジン ライブラリのどのメソッドを変更する必要があるかを決定する必要があります。 , I

pymysql.cursors.Cursor

mogrify メソッドのみを変更する必要があることが判明しました: <pre class="brush:py;">def mogrify(self, query, args=None): &quot;&quot;&quot; Returns the exact string that is sent to the database by calling the execute() method. This method follows the extension to the DB API 2.0 followed by Psycopg. &quot;&quot;&quot; conn = self._get_db() if args is not None: query = query % self._escape_args(args, conn) return query</pre>このメソッドの機能は、SQL を統合することです。 SQL は私たちのニーズを満たしているため、継承を通じて独自の新しい

Cursor

クラスを作成できます。 <pre class="brush:py;">import pymysql class Cursor(pymysql.cursors.Cursor): def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -&gt; str: # 在此可以编写处理还合成的SQL逻辑 mogrify_sql: str = super().mogrify(query, args) # 在此可以编写处理合成后的SQL逻辑 return mogrify_sql class DictCursor(pymysql.cursors.DictCursorMixin, Cursor): &quot;&quot;&quot;A cursor which returns results as a dictionary&quot;&quot;&quot; # 直接修改Cursor类的`mogrify`方法并不会影响到`DictCursor`类,所以我们也要创建一个新的`Cursor`类。</pre> Created

Cursor

クラスの後に、pymysql でカスタム Cursor クラスを適用する方法を検討する必要があります。一般的な Mysql 接続ライブラリは、カスタム ## を渡すことをサポートしています。 #Cursor クラス (pymysql:

import pymysql.cursors
# Connect to the database
connection = pymysql.connect(
    host=&#39;localhost&#39;,
    user=&#39;user&#39;,
    password=&#39;passwd&#39;,
    database=&#39;db&#39;,
    charset=&#39;utf8mb4&#39;,
    cursorclass=pymysql.cursors.DictCursor
)
など) If ライブラリを使用する場合は、cursorclass

を通じて

Cursor クラスを指定できます。はサポートしていないか、その他の理由により、モンキー パッチ メソッドを使用する必要があります。具体的な使用方法については、「Python プローブを参照して呼び出しライブラリのデータ抽出を完了する」を参照してください。 3. 販売者 ID の取得

SQL を変更する場所がわかったので、

mogrify

で販売者 ID を取得する方法を考える必要があります。メソッド そしてそれらのテーブルは置き換える必要があります。一般に、コードを呼び出すとき、パラメータを渡す方法は 2 つあります。1 つは配列型パラメータを渡すことです:

with conn.cursor() as cursor:
    cursor.execute("SELECT * FROM t_demo WHERE is_del=%s", (0, ))
もう 1 つは辞書型パラメータを渡すことです:
with conn.cursor() as cursor:
    cursor.execute("SELECT * FROM t_demo WHERE is_del=%(is_del)s", {"is_del": 0})

現在、ほとんどのプロジェクトにはこれら 2 種類の書き方があり、エンジン ライブラリは

execute

##args## の実行時にパラメータ

sql と # を処理します。 # は mogrify に渡されます。辞書型パラメータを使用する場合は、その中に必要なパラメータを埋め込んで mogrify で抽出できますが、配列型パラメータや ORM を使用する場合は、ライブラリを使用すると、mogrify メソッドにパラメータを渡すことがより困難になります。この場合、context を通じて暗黙的にパラメータを mogrify# に渡すことができます。具体的な分析と原則については、「Python で contextvars モジュールのソース コード分析を使用する方法」を参照してください。 context の使用法は非常に簡単です。まず、context カプセル化クラスを作成します:

from contextvars import ContextVar, Token
from typing import Any, Dict, Optional, Set
context: ContextVar[Dict[str, Any]] = ContextVar("context", default={})
class Context(object):
    """基础的context调用,支持Type Hints检查"""
    tenant_id: str
    replace_table_set: Set[str]
    def __getattr__(self, key: str) -> Any:
        value: Any = context.get().get(key)
        return value
    def __setattr__(self, key: str, value: Any) -> None:
        context.get()[key] = value
class WithContext(Context):
    """简单的处理reset token逻辑,和context管理,只用在业务代码"""
    def __init__(self) -> None:
        self._token: Optional[Token] = None

    def __enter__(self) -> "WithContext":
        self._token = context.set({})
        return self
    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        if self._token:
            context.reset(self._token)
            self._token = None

次に、ビジネス コードで context を渡します。現在のビジネスに対応するパラメーター <pre class="brush:py;">with WithContext as context: context.tenant_id = &quot;xxx&quot; context.replace_table_set = {&quot;t_demo&quot;} with conn.cursor() as cursor: cursor.execute(&quot;SELECT * FROM t_demo WHERE is_del=%s&quot;, (0, ))</pre> を渡し、 mogrify

context

を呼び出して、対応するパラメーター

import pymysql
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        tenant_id: str = context.tenant_id
        replace_table_set: Set[str] = context.replace_table_set
        # 在此可以编写处理还合成的SQL逻辑
        mogrify_sql: str = super().mogrify(query, args)
        # 在此可以编写处理合成后的SQL逻辑
        return mogrify_sql

4 を取得します。 SQL の変更これですべての準備が整い、SQL を変更するロジックだけが残っています。以前に他のプロジェクトで作業していたとき、構築されたテーブルは非常に標準化されており、t_xxx## で始まりました。テーブルに名前を付けます。 # の形式なので、テーブル名を置き換えるのに非常に便利です。ほとんどの状況と互換性を持たせるには、2 回置き換えるだけで済みます。コードは次のとおりです:

import pymysql
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        tenant_id: str = context.tenant_id
        replace_table_set: Set[str] = context.replace_table_set
        # 简单示例,实际上正则的效率会更好
        for replace_table in replace_table_set:
            if replace_table in query:
                # 替换表名
                query = query.replace(f" {replace_table} ", f" {replace_table}_{tenant_id} ")
                # 替换查询条件中带有表名的
                query = query.replace(f" {replace_table}.", f" {replace_table}_{tenant_id}.")
        mogrify_sql: str = super().mogrify(query, args)
        # 在此可以编写处理合成后的SQL逻辑
        return mogrify_sql

但是现在项目的SQL规范并不是很好,有些表名还是MySQL的关键字,所以靠简单的替换是行不通的,同时这个需求中,一些表只需要字段隔离,需要确保有带上对应的字段查询,这就意味着必须有一个库可以来解析SQL,并返回一些数据使我们可以比较方便的知道SQL中哪些是表名,哪些是查询字段了。

目前在Python中有一个比较知名的SQL解析库--sqlparse,它可以通过解析引擎把SQL解析成一个Python对象,之后我们就可以通过一些语法来判断哪些是SQL关键字, 哪些是表名,哪些是查询条件等等。但是这个库只实现一些底层的API,我们需要对他和SQL比较了解之后才能实现一些比较完备的功能,比如下面3种常见的SQL:

SELECT * FROM t_demo
SELECT * FROM t_demo as demo
SELECT * FROM t_other as other LEFT JOIN t_demo demo on demo.xxx==other.xxx

如果我们要通过sqlparse来提取表名的话就需要处理这3种情况,而我们如果要每一个情况都编写出来的话,那将会非常费心费力,同时也可能存在遗漏的情况,这时就需要用到另外一个库--sql_metadata,这个库是基于sqlparse和正则的解析库,同时提供了大量的常见使用方法的封装,我们通过直接调用对应的函数就能知道SQL中有哪些表名,查询字段是什么了。

目前已知这个库有一个缺陷,就是会自动去掉字段的符号, 比如表名为关键字时,我们需要使用`符号把它包起来:

SELECT * FROM `case`

但在经过sql_metadata解析后得到的表名是case而不是`case`,需要人为的处理,但是我并不觉得这是一个BUG,自己不按规范创建表,能怪谁呢。

接下来就可以通过sql_metadata的方法来实现我需要的功能了,在根据需求修改后,代码长这样(说明见注释):

from typing import Dict, Set, Tuple, Union
import pymysql
import sql_metadata
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        tenant_id: str = context.tenant_id
        # 生成一个解析完成的SQL对象
        sql_parse: sql_metadata.Parser = sql_metadata.Parser(query)

        # 新加的一个属性,这里存下需要校验查询条件的表名
        check_flag = False 
        where_table_set: Set[str] = context.where_table_set
        # 该方法会获取到SQL对应的table,返回的是一个table的数组
        for table_name in sql_parse.tables:
            if table_name in where_table_set:
                if sql_parse.columns_dict:
                    # 该方法会返回SQL对应的字段,其中分为select, join, where等,这里只用到了where
                    for where_column in sql_parse.columns_dict.get("where", []):
                        # 如果连表,里面存的是类似于t_demo.tenant_id,所以要兼容这一个情况
                        if "tenant_id" in where_column.lower().split("."):
                            check_flag = True
                            break
        if not check_flag:
            # 检查不通过就抛错
            raise RuntimeError()
        # 更换表名的逻辑
        replace_table_set: Set[str] = context.replace_table_set
        new_query: str = query
        for table_name in sql_parse.tables:
            if table_name in replace_table_set:
                new_query = ""
                # tokens存放着解析完的数据,比如SELECT * FROM t_demo解析后是
                # [SELECT, *, FROM, t_demo]四个token
                for token in sql_parse.tokens:
                    # 判断token是否是表名  
                    if token.is_potential_table_name:
                        # 提取规范的表名
                        parse_table_name: str = token.stringified_token.strip()
                        if parse_table_name in replace_table_set:
                            new_table_name: str = f" {parse_table_name}_{tenant_id}"
                            # next_token代表SQL的下一个字段
                            if token.next_token.normalized != "AS":
                                # 如果当前表没有设置别名
                                # 通过AS把替换前的表名设置为新表名的别名,这样一来后面的表名即使没进行更改,也是能读到对应商户ID的表
                                new_table_name += f" AS {parse_table_name}"
                            query += new_table_name
                            continue
                    # 通过stringified_token获取的数据会自动带空格,比如`FROM`得到的会是` FROM`,这样拼接的时候就不用考虑是否加空格了
                    new_query += token.stringified_token
        mogrify_sql: str = super().mogrify(new_query, args)
        # 在此可以编写处理合成后的SQL逻辑
        return mogrify_sql

这份代码十分简单,它只做简单介绍,事实上这段逻辑会应用到所有的SQL查询中,我们应该要保证这段代码是没问题的,同时不要有太多的性能浪费,所以在使用的时候要考虑到代码拆分和优化。 比如在使用的过程中可以发现,我们的SQL转换和检查都是在父类的Cursor.mogrify之前进行的,这就意味着不管我们代码逻辑里cursor.execute传的参数是什么,对于同一个代码逻辑来说,传过来的query值是保持不变的,比如下面的代码:

def get_user_info(uid: str) -> Dict[str, Any]:
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM t_user WHERE uid=%(uid)s", {"uid": uid})
        return cursor.fetchone() or {}

这段代码中传到Cursor.mogrify的query永远为SELECT * FROM t_user WHERE uid=%(uid)s,有变化的只是args中uid的不同。 有了这样的一个前提条件,那么我们就可以把query的校验结果和转换结果缓存下来,减少每次都需要解析SQL再校验造成的性能浪费。至于如何实现缓存则需要根据自己的项目来决定,比如项目中只有几百个SQL执行,那么直接用Pythondict来存放就可以了,如果项目中执行的SQL很多,同时有些执行的频率非常的高,有些执行的频率非常的低,那么可以考虑使用LRU来缓存。

以上がPython のランタイムに基づいてビジネス SQL コードを変更するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。