Heim >Backend-Entwicklung >Python-Tutorial >Wie vervollständigt Python die Datenextraktion aus der aufrufenden Bibliothek?

Wie vervollständigt Python die Datenextraktion aus der aufrufenden Bibliothek?

王林
王林nach vorne
2023-05-16 08:46:051090Durchsuche

1. Einfache und grobe Methode – Kapseln Sie die MySQL-Bibliothek.

Um einen Ausführungsprozess zu zählen, müssen Sie die Startposition und Endposition des Ausführungsprozesses kennen. Daher ist die einfachste und einfachste Methode die Kapselung basierend auf der zu verwendenden Methode aufgerufen, Implementieren Sie eine Zwischenschicht zwischen dem Framework, das die MySQL-Bibliothek aufruft, und der MySQL-Bibliothek und vervollständigen Sie zeitaufwändige Statistiken in der Zwischenschicht, wie zum Beispiel:

# 伪代码
def my_execute(conn, sql, param):
 # 针对MySql库的统计封装组件
 with MyTracer(conn, sql, param):
     # 以下为正常使用MySql库的代码
with conn.cursor as cursor:
 cursor.execute(sql, param)
...

Es scheint sehr gut zu implementieren zu sein, und das ist auch sehr gut bequem zu ändern, aber da es sich um eine API der obersten Ebene handelt, ist das Vornehmen von Änderungen tatsächlich sehr unflexibel. Gleichzeitig werden einige Voroperationen in Cursor.execute ausgeführt, z. B. das Zusammenfügen von SQL und Param und das Aufrufen von nextset, um das zu löschen aktuelle Cursordaten usw. Auch die Daten, die wir letztendlich erhalten haben, wie Zeit und Verbrauch, waren ungenau, und es gab keine Möglichkeit, detaillierte Metadaten wie Fehlercodes usw. abzurufen.

Wenn wir die direktesten und nützlichsten Daten erhalten möchten, können wir das tun Ändern Sie nur den Quellcode und rufen Sie dann den Quellcode auf. Wenn jedoch jede Bibliothek den Quellcode ändern muss, um zu zählen, wäre dies zu mühsam. Glücklicherweise bietet Python auch einige Schnittstellen, die Sonden und dem Quellcode der Bibliothek ähneln kann durch Sonden gemessen werden.

2. Python-Sonde

In Python kann die Funktion des Import-Hooks durch sys.meta_path realisiert werden Um die Objekte in .sys.meta_path zu ändern, müssen Sie eine find_module-Methode implementieren. Diese Methode gibt „None“ oder ein Objekt zurück, das die „load_module“-Methode implementiert Methoden beim Importieren einiger Bibliotheken. Die Verwendung ist wie folgt eigenes Sondenmodul, da das Beispiel nur das aiomysql-Modul betrifft, muss nur das aiomysql-Modul in MetaPathFinder.find_module verarbeitet werden, und die anderen werden zuerst ignoriert. Dann müssen wir bestimmen, welche Funktion von aiomysql wir ersetzen möchten. Aus geschäftlicher Sicht benötigen wir im Allgemeinen nur die Hauptoperationen „cursor.execute“, „cursor.fetchone“, „cursor.fetchall“ und „cursor.execute“. Daher müssen Sie tief in den Cursor eintauchen, um zu sehen, wie Sie den Code ändern und welche Funktion letzterer überlädt

Zuerst der Quellcode von Cursor.execute (cursor.executemanay auch Ähnlich), es wird festgestellt, dass die Methode von self.nextset zuerst aufgerufen wird, um die Daten der vorherigen Anforderung abzurufen, dann die SQL-Anweisung zusammenführen und schließlich Abfrage über self._query:

import importlib
import sys
from functools import wraps
def func_wrapper(func):
    """这里通过一个装饰器来达到狸猫换太子和获取数据的效果"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 记录开始时间
        start = time.time()
        result = func(*args, **kwargs)
        # 统计消耗时间
        end = time.time()
        print(f"speed time:{end - start}")
        return result
    return wrapper
class MetaPathFinder:
    def find_module(self, fullname, path=None):
        # 执行时可以看出来在import哪些模块
        print(f'find module:{path}:{fullname}')
        return MetaPathLoader()
class MetaPathLoader:
    def load_module(self, fullname):
        # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止递归调用
        finder = sys.meta_path.pop(0)
        # 导入 module
        module = importlib.import_module(fullname)
        if fullname == 'time':
            # 替换函数
            module.sleep = func_wrapper(module.sleep)
        sys.meta_path.insert(0, finder)
        return module
sys.meta_path.insert(0, MetaPathFinder())
if __name__ == '__main__':
    import time
    time.sleep(1)
# 输出示例:
# find module:datetime
# find module:time
# load module:time
# find module:math
# find module:_datetime
# speed time:1.00073385238647468

Schauen Sie sich den Quellcode von Cursor.fetchone (cursor.fetchall) an. Es wird festgestellt, dass die Daten tatsächlich aus dem Cache abgerufen werden.

Diese Daten wurden während der Ausführung abgerufen von Cursor.execute:

async def execute(self, query, args=None):
    """Executes the given operation
    Executes the given operation substituting any markers with
    the given parameters.
    For example, getting all rows where id is 5:
        cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,))
    :param query: ``str`` sql statement
    :param args: ``tuple`` or ``list`` of arguments for sql query
    :returns: ``int``, number of rows that has been produced of affected
    """
    conn = self._get_db()

    while (await self.nextset()):
        pass

    if args is not None:
        query = query % self._escape_args(args, conn)

    await self._query(query)
    self._executed = query
    if self._echo:
        logger.info(query)
        logger.info("%r", args)
    return self._rowcount

Basierend auf der obigen Analyse müssen wir nur die Kernmethode self._query überladen. Aus dem Quellcode können wir erkennen, dass wir self und sql erhalten können An self._query übergebene Parameter können wir auch auf Basis von self abrufen. Gleichzeitig sind alle erforderlichen Daten im Grunde verfügbar Die Idee lautet wie folgt:

def fetchone(self):
    """Fetch the next row """
    self._check_executed()
    fut = self._loop.create_future()
    if self._rows is None or self._rownumber >= len(self._rows):
        fut.set_result(None)
        return fut
    result = self._rows[self._rownumber]
    self._rownumber += 1
    fut = self._loop.create_future()
    fut.set_result(result)
    return fut

Dieses Beispiel scheint sehr gut zu sein, aber die Logik muss am Eingang des Aufrufs explizit aufgerufen werden, normalerweise für ein Projekt. Es kann mehrere Eingänge geben, die diesen Aufruf anzeigen Dies ist sehr mühsam und unsere Hook-Logik muss vor dem Import aufgerufen werden. Auf diese Weise müssen die Einführungsspezifikationen festgelegt werden, andernfalls ist der Hook möglicherweise an einigen Stellen nicht erfolgreich. Wenn dies möglich ist, kann dieses Problem perfekt gelöst werden Logik zum Einführen des Hooks, der unmittelbar nach dem Start des Parsers ausgeführt werden soll. Nachdem wir die Informationen überprüft haben, haben wir festgestellt, dass der Python-Interpreter automatisch die unter PYTHONPATH vorhandenen Module „sitecustomize“ und „usercustomize“ importiert Schreiben Sie unsere Ersetzungsfunktion in das Modul.

import importlib
import time
import sys
from functools import wraps
from typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKING
from types import ModuleType
if TYPE_CHECKING:
    import aiomysql
def func_wrapper(func: Callable):
    @wraps(func)
    async def wrapper(*args, **kwargs) -> Any:
        start: float = time.time()
        func_result: Any = await func(*args, **kwargs)
        end: float = time.time()
        # 根据_query可以知道, 第一格参数是self, 第二个参数是sql
        self: aiomysql.Cursor = args[0]
        sql: str = args[1]
        # 通过self,我们可以拿到其他的数据
        db: str = self._connection.db
        user: str = self._connection.user
        host: str = self._connection.host
        port: str = self._connection.port
        execute_result: Tuple[Tuple] = self._rows
        # 可以根据自己定义的agent把数据发送到指定的平台, 然后我们就可以在平台上看到对应的数据或进行监控了, 
        # 这里只是打印一部分数据出来
        print({
            "sql": sql,
            "db": db,
            "user": user,
            "host": host,
            "port": port,
            "result": execute_result,
            "speed time": end - start
        })
        return func_result
    return cast(Callable, wrapper)
class MetaPathFinder:

    @staticmethod
    def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]:
        if fullname == 'aiomysql':
            # 只有aiomysql才进行hook
            return MetaPathLoader()
        else:
            return None
class MetaPathLoader:
    @staticmethod
    def load_module(fullname: str):
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止递归调用
        finder: "MetaPathFinder" = sys.meta_path.pop(0)
        # 导入 module
        module: ModuleType = importlib.import_module(fullname)
        # 针对_query进行hook
        module.Cursor._query = func_wrapper(module.Cursor._query)
        sys.meta_path.insert(0, finder)
        return module
async def test_mysql() -> None:
    import aiomysql
    pool: aiomysql.Pool = await aiomysql.create_pool(
        host='127.0.0.1', port=3306, user='root', password='123123', db='mysql'
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            (r,) = await cur.fetchone()
            assert r == 42
    pool.close()
    await pool.wait_closed()

if __name__ == '__main__':
    sys.meta_path.insert(0, MetaPathFinder())
    import asyncio
    asyncio.run(test_mysql())
# 输出示例:
# 可以看出sql语句与我们输入的一样, db, user, host, port等参数也是, 还能知道执行的结果和运行时间
# {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}

hook_aiomysql.py ist unser Code zum Erstellen von Sonden als Beispiel, und der in sitecustomize.py gespeicherte Code ist wie folgt. Es ist sehr einfach, führen Sie einfach unseren Sondencode ein und fügen Sie ihn in sys.meta_path ein:

.
├── __init__.py
├── hook_aiomysql.py
├── sitecustomize.py
└── test_auto_hook.py
test_auto_hook.py Dies ist der Testcode:

import sys
from hook_aiomysql import MetaPathFinder
sys.meta_path.insert(0, MetaPathFinder())

Als nächstes müssen wir nur noch PYTHONPATH festlegen und unseren Code ausführen (wenn es sich um ein Projekt handelt, wird es normalerweise vom Supervisor gestartet, und PYTHONPATH kann in der Konfigurationsdatei festgelegt werden). :

import asyncio
from hook_aiomysql import test_mysql
asyncio.run(test_mysql())

4. Direkte Ersetzungsmethode

Sie können sehen, dass die obige Methode sehr gut läuft und leicht in unser Projekt eingebettet werden kann. Es ist jedoch schwierig, sie in eine Bibliothek eines Drittanbieters zu extrahieren Wenn Sie eine Bibliothek eines Drittanbieters trennen möchten, müssen Sie überlegen, ob es andere Methoden gibt. Bei der Einführung von MetaPathLoader oben habe ich sys.module erwähnt, in dem sys.modules verwendet wird, um wiederholte Einführungen zu reduzieren:

(.venv) ➜  python_hook git:(master) ✗ export PYTHONPATH=.      
(.venv) ➜  python_hook git:(master) ✗ python test_auto_hook.py 
{'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000213623046875}

Das Prinzip der Reduzierung wiederholter Einführungen besteht darin, dass jedes Mal, wenn ein Modul eingeführt wird, es in sys.modules gespeichert wird. Wenn es wiederholt eingeführt wird, wird es direkt auf das zuletzt eingeführte Modul aktualisiert. Der Grund, warum wir oben darüber nachdenken, wiederholte Importe zu reduzieren, liegt darin, dass wir Bibliotheksabhängigkeiten von Drittanbietern nicht aktualisieren, wenn das Programm ausgeführt wird. Unter Ausnutzung der Tatsache, dass wir nicht die wiederholte Einführung von Modulen mit demselben Namen und unterschiedlichen Implementierungen in Betracht ziehen müssen und dass sys.modules importierte Module zwischenspeichert, können wir die obige Logik beim Importieren von Modulen vereinfachen -> Ersetzen Sie die aktuelle Modulmethode durch Die Hook-Methode, die wir geändert haben.

import time
from functools import wraps
from typing import Any, Callable, Tuple, cast
import aiomysql
def func_wrapper(func: Callable):
    """和上面一样的封装函数, 这里简单略过"""
# 判断是否hook过
_IS_HOOK: bool = False
# 存放原来的_query
_query: Callable = aiomysql.Cursor._query
# hook函数
def install_hook() -> None:
    _IS_HOOK = False
    if _IS_HOOK:
        return
    aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query)
    _IS_HOOK = True
# 还原到原来的函数方法
def reset_hook() -> None:
    aiomysql.Cursor._query = _query
    _IS_HOOK = False

代码简单明了,接下来跑一跑刚才的测试:

import asyncio
import aiomysql
from demo import install_hook, reset_hook
async def test_mysql() -> None:
    pool: aiomysql.Pool = await aiomysql.create_pool(
        host='127.0.0.1', port=3306, user='root', password='', db='mysql'
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            (r,) = await cur.fetchone()
            assert r == 42
    pool.close()
    await pool.wait_closed()

print("install hook")
install_hook()
asyncio.run(test_mysql())
print("reset hook")
reset_hook()
asyncio.run(test_mysql())
print("end")

通过测试输出可以发现我们的逻辑的正确的, install hook后能出现我们提取的元信息, 而reset后则不会打印原信息

install hook
{'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000347137451171875}
reset hook
end

Das obige ist der detaillierte Inhalt vonWie vervollständigt Python die Datenextraktion aus der aufrufenden Bibliothek?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen