>백엔드 개발 >파이썬 튜토리얼 >Python에서 런타임을 기반으로 비즈니스 SQL 코드를 수정하는 방법은 무엇입니까?

Python에서 런타임을 기반으로 비즈니스 SQL 코드를 수정하는 방법은 무엇입니까?

PHPz
PHPz앞으로
2023-05-08 14:22:07877검색

1. Origin

최근 프로젝트에서는 SASS의 특징 중 하나가 멀티 테넌시(Multi-tenancy)이며, 각 테넌트 간의 데이터를 격리해야 합니다. 일반적인 데이터베이스 격리 솔루션에는 데이터베이스 격리, 테이블 격리가 포함됩니다. 격리. 현재는 테이블 격리와 필드 격리만 사용합니다(데이터베이스 격리의 원리는 비슷함). 필드 격리는 쿼리 조건이 다르다는 점만 제외하면 비교적 간단합니다. 예를 들어 다음 SQL 쿼리는

SELECT * FROM t_demo WHERE tenant_id='xxx' AND is_del=0

하지만 엄격함을 위해 해당 테이블에 tenant_id가 있는지 확인해야 합니다. code> SQL 쿼리 필드를 실행하기 전. <code>tenant_id的查询字段。

对于表隔离就麻烦了一些,他需要做到在运行的时候根据对应的租户ID来处理某个数据表,举个例子,假如有下面这样的一条SQL查询:

SELECT * FROM t_demo WHERE is_del=0

在遇到租户A时,SQL查询将变为:

SELECT * FROM t_demo_a WHERE is_del=0

在遇到租户B时,SQL查询将变为:

SELECT * FROM t_demo_b WHERE is_del=0

如果商户数量固定时,一般在代码里编写if-else来判断就可以了,但是常见的SASS化应用的商户是会一直新增的,那么对于这个SQL逻辑就会变成这样:

def sql_handle(tenant_id: str):
    table_name: str = f"t_demo_{tenant_id}"
    sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"

但是这有几个问题,对于ORM来说,一开始只创建一个t_demo对应的表对象就可以了,现在却要根据多个商户创建多个表对象,这是不现实的,其次如果是裸写SQL,一般会使用IDE的检查,而对于这样的SQL:

sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"

IDE是没办法进行检查的,当然还有一个最为严重的问题,就是当前的项目已经非常庞大了,如果每个相关表的调用都进行适配更改的话,那工程量就非常庞大了,所以最好的方案就是在引擎库得到用户传过来的SQL语句后且还没发送到MySQL服务器之前自动的根据商户ID更改SQL, 而要达到这样的效果,就必须侵入到我们使用的MySQL的引擎库,修改里面的方法来兼容我们的需求。

不管是使用dbutils还是sqlalchemy,都可以指定一个引擎库,目前常用的引擎库是pymysql,所以下文都将以pymysql为例进行阐述。

2.侵入库

由于必须侵入到我们使用的引擎库,所以我们应该先判断我们需要修改引擎库的哪个方法,在经过源码阅读后,我判定只要更改pymysql.cursors.Cursormogrify方法:

def mogrify(self, query, args=None):
    """
    Returns the exact string that is sent to the database by calling the
    execute() method.

    This method follows the extension to the DB API 2.0 followed by Psycopg.
    """
    conn = self._get_db()

    if args is not None:
        query = query % self._escape_args(args, conn)
    return query

这个方法的作用就是把用户传过来的SQL和参数进行整合,生成一个最终的SQL,刚好符合我们的需求,于是可以通过继承的思路来创建一个新的属于我们自己的Cursor类:

import pymysql
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        # 在此可以编写处理还合成的SQL逻辑
        mogrify_sql: str = super().mogrify(query, args)
        # 在此可以编写处理合成后的SQL逻辑
        return mogrify_sql
class DictCursor(pymysql.cursors.DictCursorMixin, Cursor):
    """A cursor which returns results as a dictionary"""
    # 直接修改Cursor类的`mogrify`方法并不会影响到`DictCursor`类,所以我们也要创建一个新的`Cursor`类。

创建好了Cursor类后,就需要考虑如何在pymysql中应用我们自定义的Cursor类了,一般的Mysql连接库都支持我们传入自定义的Cursor类,比如pymysql:

import pymysql.cursors
# Connect to the database
connection = pymysql.connect(
    host=&#39;localhost&#39;,
    user=&#39;user&#39;,
    password=&#39;passwd&#39;,
    database=&#39;db&#39;,
    charset=&#39;utf8mb4&#39;,
    cursorclass=pymysql.cursors.DictCursor
)

我们可以通过cursorclass来指定我们的Cursor类,如果使用的库不支持或者是其它原因则需要使用猴子补丁的方法,具体的使用方法见Python探针完成调用库的数据提取。

3.获取商户ID

现在我们已经搞定了在何处修改SQL的问题了,接下来就要思考如何在mogrify方法获取到商户ID以及那些表要进行替换,一般我们在进行一段代码调用时,有两种传参数的方法, 一种是传数组类型的参数:

with conn.cursor() as cursor:
    cursor.execute("SELECT * FROM t_demo WHERE is_del=%s", (0, ))

一种是传字典类型的参数:

with conn.cursor() as cursor:
    cursor.execute("SELECT * FROM t_demo WHERE is_del=%(is_del)s", {"is_del": 0})

目前大多数的项目都存在这两种类型的编写习惯,而引擎库在执行execute时会经过处理后才把参数sqlargs传给了mogrify,如果我们是使用字典类型的参数,那么可以在里面嵌入我们需要的参数,并在mogrify里面提取出来,但是使用了数组类型的参数或者是ORM库的话就比较难传递参数给mogrify方法了,这时可以通过context隐式的把参数传给mogrify方法,具体的分析和原理可见:python如何使用contextvars模块源码分析。

context的使用方法很简单, 首先是创建一个context封装的类:

from contextvars import ContextVar, Token
from typing import Any, Dict, Optional, Set
context: ContextVar[Dict[str, Any]] = ContextVar("context", default={})
class Context(object):
    """基础的context调用,支持Type Hints检查"""
    tenant_id: str
    replace_table_set: Set[str]
    def __getattr__(self, key: str) -> Any:
        value: Any = context.get().get(key)
        return value
    def __setattr__(self, key: str, value: Any) -> None:
        context.get()[key] = value
class WithContext(Context):
    """简单的处理reset token逻辑,和context管理,只用在业务代码"""
    def __init__(self) -> None:
        self._token: Optional[Token] = None

    def __enter__(self) -> "WithContext":
        self._token = context.set({})
        return self
    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        if self._token:
            context.reset(self._token)
            self._token = None

接下来在业务代码中,通过context传入当前业务对应的参数:

with WithContext as context:
    context.tenant_id = "xxx"
    context.replace_table_set = {"t_demo"}
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM t_demo WHERE is_del=%s", (0, ))

然后在mogrify中通过调用context即可获得对应的参数了:

import pymysql
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        tenant_id: str = context.tenant_id
        replace_table_set: Set[str] = context.replace_table_set
        # 在此可以编写处理还合成的SQL逻辑
        mogrify_sql: str = super().mogrify(query, args)
        # 在此可以编写处理合成后的SQL逻辑
        return mogrify_sql

4.修改SQL

现在,万事俱备,只剩下修改SQL的逻辑,之前在做别的项目的时候,建的表都是十分的规范,它们是以t_xxx

테이블 격리의 경우 작업 중 특정 데이터 테이블을 해당 테넌트 ID에 따라 처리해야 하므로 좀 더 번거롭습니다. 예를 들어 다음과 같은 SQL 쿼리가 있는 경우 🎜
import pymysql
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        tenant_id: str = context.tenant_id
        replace_table_set: Set[str] = context.replace_table_set
        # 简单示例,实际上正则的效率会更好
        for replace_table in replace_table_set:
            if replace_table in query:
                # 替换表名
                query = query.replace(f" {replace_table} ", f" {replace_table}_{tenant_id} ")
                # 替换查询条件中带有表名的
                query = query.replace(f" {replace_table}.", f" {replace_table}_{tenant_id}.")
        mogrify_sql: str = super().mogrify(query, args)
        # 在此可以编写处理合成后的SQL逻辑
        return mogrify_sql
🎜🎜 테넌트 A의 경우 SQL 쿼리는 다음과 같습니다. 🎜🎜
SELECT * FROM t_demo
SELECT * FROM t_demo as demo
SELECT * FROM t_other as other LEFT JOIN t_demo demo on demo.xxx==other.xxx
🎜🎜 테넌트 B를 만날 때 SQL 쿼리는 다음과 같습니다. 🎜🎜
SELECT * FROM `case`
🎜판매자 수가 고정된 경우 일반적으로 코드에 if-else로 판단할 수 있지만, 일반 SASS 애플리케이션 판매자는 계속 추가되므로 SQL 로직은 다음과 같이 됩니다.🎜<pre class="brush:py;">from typing import Dict, Set, Tuple, Union import pymysql import sql_metadata class Cursor(pymysql.cursors.Cursor): def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -&gt; str: tenant_id: str = context.tenant_id # 生成一个解析完成的SQL对象 sql_parse: sql_metadata.Parser = sql_metadata.Parser(query) # 新加的一个属性,这里存下需要校验查询条件的表名 check_flag = False where_table_set: Set[str] = context.where_table_set # 该方法会获取到SQL对应的table,返回的是一个table的数组 for table_name in sql_parse.tables: if table_name in where_table_set: if sql_parse.columns_dict: # 该方法会返回SQL对应的字段,其中分为select, join, where等,这里只用到了where for where_column in sql_parse.columns_dict.get(&quot;where&quot;, []): # 如果连表,里面存的是类似于t_demo.tenant_id,所以要兼容这一个情况 if &quot;tenant_id&quot; in where_column.lower().split(&quot;.&quot;): check_flag = True break if not check_flag: # 检查不通过就抛错 raise RuntimeError() # 更换表名的逻辑 replace_table_set: Set[str] = context.replace_table_set new_query: str = query for table_name in sql_parse.tables: if table_name in replace_table_set: new_query = &quot;&quot; # tokens存放着解析完的数据,比如SELECT * FROM t_demo解析后是 # [SELECT, *, FROM, t_demo]四个token for token in sql_parse.tokens: # 判断token是否是表名 if token.is_potential_table_name: # 提取规范的表名 parse_table_name: str = token.stringified_token.strip() if parse_table_name in replace_table_set: new_table_name: str = f&quot; {parse_table_name}_{tenant_id}&quot; # next_token代表SQL的下一个字段 if token.next_token.normalized != &quot;AS&quot;: # 如果当前表没有设置别名 # 通过AS把替换前的表名设置为新表名的别名,这样一来后面的表名即使没进行更改,也是能读到对应商户ID的表 new_table_name += f&quot; AS {parse_table_name}&quot; query += new_table_name continue # 通过stringified_token获取的数据会自动带空格,比如`FROM`得到的会是` FROM`,这样拼接的时候就不用考虑是否加空格了 new_query += token.stringified_token mogrify_sql: str = super().mogrify(new_query, args) # 在此可以编写处理合成后的SQL逻辑 return mogrify_sql</pre>🎜근데 ORM의 경우 몇 가지 문제가 있어서 처음에는 그냥 만들어도 충분했습니다. <code>t_demo에 해당하는 테이블 객체는 단 하나뿐입니다. 이제 여러 판매자에 따라 여러 테이블 객체를 생성해야 합니다. 둘째, Naked SQL을 작성하는 경우 일반적으로 IDE 검사를 사용하게 됩니다. such SQL: 🎜
def get_user_info(uid: str) -> Dict[str, Any]:
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM t_user WHERE uid=%(uid)s", {"uid": uid})
        return cursor.fetchone() or {}
🎜 IDE에서는 확인할 수 없습니다. 물론 가장 심각한 문제도 있는데, 이는 현재 프로젝트가 이미 매우 크다는 것입니다. 관련 테이블에 대한 모든 호출이 변경되면 작업량이 늘어납니다. 매우 크기 때문에 가장 좋은 해결책은 엔진 라이브러리가 사용자가 전달한 SQL 문을 받은 후 MySQL 서버로 전송되기 전에 판매자 ID에 따라 SQL을 자동으로 변경하는 것입니다. 그리고 이 효과를 얻으려면 우리가 사용하는 MySQL 엔진 라이브러리를 침범하고 내부 메소드를 우리 요구에 맞게 수정해야 합니다. 🎜
🎜dbutils를 사용하든 sqlalchemy를 사용하든 엔진 라이브러리를 지정할 수 있습니다. 현재 일반적으로 사용되는 엔진 라이브러리는 pymysql입니다. 그래서 다음은 모두 pymysql을 예로 들어 설명하겠습니다. 🎜
🎜2. 라이브러리 침입🎜🎜우리가 사용하는 엔진 라이브러리에 침입해야 하기 때문에, 먼저 엔진 라이브러리의 어떤 메소드를 수정해야 하는지 결정해야 합니다. pymysql .cursors.Cursormogrify 메소드를 변경하려면: 🎜rrreee🎜 이 메소드의 기능은 사용자가 전달한 SQL과 매개변수를 통합하여 최종 SQL을 생성하는 것입니다. , 이는 우리의 요구 사항을 충족합니다. 따라서 상속을 통해 새로운 Cursor 클래스를 만들 수 있습니다. 🎜rrreee🎜 Cursor 클래스를 만든 후에는 어떻게 해야 하는지 고려해야 합니다. 에서 사용하세요. 사용자 정의 <code>Cursor 클래스는 pymysql에서 사용됩니다. 일반 Mysql 연결 라이브러리는 사용자 정의 Cursor 클래스(예: <code>pymysql:🎜rrreee🎜사용된 라이브러리가 지원하지 않는 경우 cursorclass를 통해 Cursor 클래스를 지정할 수 있음) 또는 다른 이유로 원숭이 패치 방법을 사용해야 합니다. 구체적인 사용 방법은 Python 프로브를 참조하여 호출 라이브러리에서 데이터 추출을 완료하세요. 🎜🎜3. 판매자 ID 가져오기🎜🎜이제 SQL을 수정할 위치를 파악했으니 mogrify 메서드에서 판매자 ID를 가져오는 방법과 어떤 테이블이 필요한지 생각해 봐야 합니다. 일반적으로 코드를 호출할 때 매개변수를 전달하는 방법에는 두 가지가 있습니다. 하나는 배열 유형 매개변수를 전달하는 것입니다: 🎜rrreee🎜 다른 하나는 사전 유형 매개변수를 전달하는 것입니다: 🎜rrreee🎜 현재 대부분의 프로젝트에는 이 두 가지 방법이 있습니다. 이러한 유형의 작성 습관이 있으며, 엔진 라이브러리는 execute mogrifysql 및 args 매개변수를 에 전달합니다. /code>, 사전 유형 매개변수를 사용하는 경우 필요한 매개변수를 여기에 삽입하고 <code>mogrify에서 추출할 수 있지만 배열 유형 매개변수나 ORM 라이브러리를 사용하면 매개변수를 전달하기가 더 어려워집니다. mogrify 메소드는 context를 통해 암묵적으로 mogrify 메소드에 전달될 수 있으며 그 원리를 알 수 있습니다. : Python에서 contextvars 모듈 소스 코드 분석을 사용하는 방법. 🎜🎜context를 사용하는 방법은 매우 간단합니다. 먼저 context로 캡슐화된 클래스를 만듭니다. 🎜rrreee🎜다음으로 비즈니스 코드에서 해당 데이터를 전달합니다. 매개변수: 🎜rrreee🎜 그런 다음 mogrify에서 context를 호출하여 해당 매개변수를 얻을 수 있습니다. 🎜rrreee🎜4 이제 SQL을 수정하세요. , 모든 것이 준비되었습니다. 나머지는 SQL의 논리를 수정하는 것입니다. 이전에 다른 프로젝트를 작업할 때 제가 만든 테이블은 t_xxx 형식으로 테이블 이름을 지정했습니다. 이는 테이블 이름을 바꾸는 것이 매우 편리합니다. 두 번 대체하는 한 코드는 다음과 같습니다. 🎜
import pymysql
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        tenant_id: str = context.tenant_id
        replace_table_set: Set[str] = context.replace_table_set
        # 简单示例,实际上正则的效率会更好
        for replace_table in replace_table_set:
            if replace_table in query:
                # 替换表名
                query = query.replace(f" {replace_table} ", f" {replace_table}_{tenant_id} ")
                # 替换查询条件中带有表名的
                query = query.replace(f" {replace_table}.", f" {replace_table}_{tenant_id}.")
        mogrify_sql: str = super().mogrify(query, args)
        # 在此可以编写处理合成后的SQL逻辑
        return mogrify_sql

但是现在项目的SQL规范并不是很好,有些表名还是MySQL的关键字,所以靠简单的替换是行不通的,同时这个需求中,一些表只需要字段隔离,需要确保有带上对应的字段查询,这就意味着必须有一个库可以来解析SQL,并返回一些数据使我们可以比较方便的知道SQL中哪些是表名,哪些是查询字段了。

目前在Python中有一个比较知名的SQL解析库--sqlparse,它可以通过解析引擎把SQL解析成一个Python对象,之后我们就可以通过一些语法来判断哪些是SQL关键字, 哪些是表名,哪些是查询条件等等。但是这个库只实现一些底层的API,我们需要对他和SQL比较了解之后才能实现一些比较完备的功能,比如下面3种常见的SQL:

SELECT * FROM t_demo
SELECT * FROM t_demo as demo
SELECT * FROM t_other as other LEFT JOIN t_demo demo on demo.xxx==other.xxx

如果我们要通过sqlparse来提取表名的话就需要处理这3种情况,而我们如果要每一个情况都编写出来的话,那将会非常费心费力,同时也可能存在遗漏的情况,这时就需要用到另外一个库--sql_metadata,这个库是基于sqlparse和正则的解析库,同时提供了大量的常见使用方法的封装,我们通过直接调用对应的函数就能知道SQL中有哪些表名,查询字段是什么了。

目前已知这个库有一个缺陷,就是会自动去掉字段的符号, 比如表名为关键字时,我们需要使用`符号把它包起来:

SELECT * FROM `case`

但在经过sql_metadata解析后得到的表名是case而不是`case`,需要人为的处理,但是我并不觉得这是一个BUG,自己不按规范创建表,能怪谁呢。

接下来就可以通过sql_metadata的方法来实现我需要的功能了,在根据需求修改后,代码长这样(说明见注释):

from typing import Dict, Set, Tuple, Union
import pymysql
import sql_metadata
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        tenant_id: str = context.tenant_id
        # 生成一个解析完成的SQL对象
        sql_parse: sql_metadata.Parser = sql_metadata.Parser(query)

        # 新加的一个属性,这里存下需要校验查询条件的表名
        check_flag = False 
        where_table_set: Set[str] = context.where_table_set
        # 该方法会获取到SQL对应的table,返回的是一个table的数组
        for table_name in sql_parse.tables:
            if table_name in where_table_set:
                if sql_parse.columns_dict:
                    # 该方法会返回SQL对应的字段,其中分为select, join, where等,这里只用到了where
                    for where_column in sql_parse.columns_dict.get("where", []):
                        # 如果连表,里面存的是类似于t_demo.tenant_id,所以要兼容这一个情况
                        if "tenant_id" in where_column.lower().split("."):
                            check_flag = True
                            break
        if not check_flag:
            # 检查不通过就抛错
            raise RuntimeError()
        # 更换表名的逻辑
        replace_table_set: Set[str] = context.replace_table_set
        new_query: str = query
        for table_name in sql_parse.tables:
            if table_name in replace_table_set:
                new_query = ""
                # tokens存放着解析完的数据,比如SELECT * FROM t_demo解析后是
                # [SELECT, *, FROM, t_demo]四个token
                for token in sql_parse.tokens:
                    # 判断token是否是表名  
                    if token.is_potential_table_name:
                        # 提取规范的表名
                        parse_table_name: str = token.stringified_token.strip()
                        if parse_table_name in replace_table_set:
                            new_table_name: str = f" {parse_table_name}_{tenant_id}"
                            # next_token代表SQL的下一个字段
                            if token.next_token.normalized != "AS":
                                # 如果当前表没有设置别名
                                # 通过AS把替换前的表名设置为新表名的别名,这样一来后面的表名即使没进行更改,也是能读到对应商户ID的表
                                new_table_name += f" AS {parse_table_name}"
                            query += new_table_name
                            continue
                    # 通过stringified_token获取的数据会自动带空格,比如`FROM`得到的会是` FROM`,这样拼接的时候就不用考虑是否加空格了
                    new_query += token.stringified_token
        mogrify_sql: str = super().mogrify(new_query, args)
        # 在此可以编写处理合成后的SQL逻辑
        return mogrify_sql

这份代码十分简单,它只做简单介绍,事实上这段逻辑会应用到所有的SQL查询中,我们应该要保证这段代码是没问题的,同时不要有太多的性能浪费,所以在使用的时候要考虑到代码拆分和优化。 比如在使用的过程中可以发现,我们的SQL转换和检查都是在父类的Cursor.mogrify之前进行的,这就意味着不管我们代码逻辑里cursor.execute传的参数是什么,对于同一个代码逻辑来说,传过来的query值是保持不变的,比如下面的代码:

def get_user_info(uid: str) -> Dict[str, Any]:
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM t_user WHERE uid=%(uid)s", {"uid": uid})
        return cursor.fetchone() or {}

这段代码中传到Cursor.mogrify的query永远为SELECT * FROM t_user WHERE uid=%(uid)s,有变化的只是args中uid的不同。 有了这样的一个前提条件,那么我们就可以把query的校验结果和转换结果缓存下来,减少每次都需要解析SQL再校验造成的性能浪费。至于如何实现缓存则需要根据自己的项目来决定,比如项目中只有几百个SQL执行,那么直接用Pythondict来存放就可以了,如果项目中执行的SQL很多,同时有些执行的频率非常的高,有些执行的频率非常的低,那么可以考虑使用LRU来缓存。

위 내용은 Python에서 런타임을 기반으로 비즈니스 SQL 코드를 수정하는 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제