실행 프로세스를 계산하려면 실행 프로세스의 시작 위치와 끝 위치를 알아야 하므로 가장 간단하고 투박한 방법은 해당 방법을 기반으로 캡슐화하는 것입니다. 라는, MySQL 라이브러리를 호출하는 프레임워크와 MySQL 라이브러리 사이에 중간 계층을 구현하고 중간 계층에서 시간이 많이 걸리는 통계를 완료합니다. 예:
# 伪代码 def my_execute(conn, sql, param): # 针对MySql库的统计封装组件 with MyTracer(conn, sql, param): # 以下为正常使用MySql库的代码 with conn.cursor as cursor: cursor.execute(sql, param) ...
구현하기가 매우 좋은 것 같습니다. 변경하기 편리하지만 최상위 API에 있기 때문에 수정하는 것은 실제로 매우 유연하지 않습니다. 동시에 sql 및 param 연결, nextset 호출과 같은 일부 사전 작업이cursor.execute에서 수행됩니다. 현재 커서 데이터 등 우리가 최종적으로 얻은 시간, 소비량 등의 데이터도 부정확했고 오류 코드 등 일부 세부적인 메타데이터를 얻을 방법이 없었습니다.
가장 직접적으로 유용한 데이터를 얻으려면 소스 코드를 변경한 다음 소스 코드를 호출하면 되지만 각 라이브러리에서 계산하기 위해 소스 코드를 변경해야 하는 경우에는 너무 번거로울 수 있습니다. 다행히도 Python은 프로브와 유사한 일부 인터페이스를 제공하며 라이브러리의 소스 코드는
Python에서는 가져오기 관련 작업을 수행할 때 가져오기 관련 라이브러리를 구현할 수 있습니다. sys.meta_path에 정의된 개체를 기반으로 일치합니다. .sys.meta_path의 개체를 변경하려면 find_module 메서드를 구현해야 합니다. 이 find_module 메서드는 None 또는 load_module 메서드를 구현하는 개체를 반환합니다. 몇몇 라이브러리를 임포트할 때 관련 방법은 간단합니다. 자신만의 프로브 모듈을 만드는 경우, 예제는 aiomysql 모듈에만 관련되므로 MetaPathFinder.find_module에서 aiomysql 모듈만 처리하면 되고 나머지는 먼저 무시됩니다. 그런 다음 우리가 원하는 aiomysql의 기능을 결정해야 합니다. 비즈니스 관점에서 보면 일반적으로cursor.execute,cursor.fetchone,cursor.fetchall,cursor.executemany만 필요하므로 코드를 변경하는 방법과 어떤 기능을 수행하는지 확인하려면 커서에 대해 자세히 살펴봐야 합니다.
import importlib import sys from functools import wraps def func_wrapper(func): """这里通过一个装饰器来达到狸猫换太子和获取数据的效果""" @wraps(func) def wrapper(*args, **kwargs): # 记录开始时间 start = time.time() result = func(*args, **kwargs) # 统计消耗时间 end = time.time() print(f"speed time:{end - start}") return result return wrapper class MetaPathFinder: def find_module(self, fullname, path=None): # 执行时可以看出来在import哪些模块 print(f'find module:{path}:{fullname}') return MetaPathLoader() class MetaPathLoader: def load_module(self, fullname): # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import if fullname in sys.modules: return sys.modules[fullname] # 防止递归调用 finder = sys.meta_path.pop(0) # 导入 module module = importlib.import_module(fullname) if fullname == 'time': # 替换函数 module.sleep = func_wrapper(module.sleep) sys.meta_path.insert(0, finder) return module sys.meta_path.insert(0, MetaPathFinder()) if __name__ == '__main__': import time time.sleep(1) # 输出示例: # find module:datetime # find module:time # load module:time # find module:math # find module:_datetime # speed time:1.00073385238647468
cursor.fetchone(cursor.fetchall도 유사)의 소스 코드를 보면 데이터가 실제로 캐시에서 얻은 것을 알 수 있습니다.
이 데이터는 동안 얻은 것입니다. Cursor.execute 실행:async def execute(self, query, args=None): """Executes the given operation Executes the given operation substituting any markers with the given parameters. For example, getting all rows where id is 5: cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,)) :param query: ``str`` sql statement :param args: ``tuple`` or ``list`` of arguments for sql query :returns: ``int``, number of rows that has been produced of affected """ conn = self._get_db() while (await self.nextset()): pass if args is not None: query = query % self._escape_args(args, conn) await self._query(query) self._executed = query if self._echo: logger.info(query) logger.info("%r", args) return self._rowcount
위의 분석을 바탕으로 핵심 메서드 self._query만 오버로드하면 됩니다. 소스 코드에서 원하는 데이터를 얻을 수 있음을 알 수 있습니다. 그리고 self._query에 전달된 sql 매개변수를 사용하면 self를 기반으로 쿼리 결과를 얻을 수 있으며 동시에 데코레이터를 통해 실행 결과를 얻을 수 있습니다.
아이디어에 따라 수정된 코드는 다음과 같습니다.def fetchone(self): """Fetch the next row """ self._check_executed() fut = self._loop.create_future() if self._rows is None or self._rownumber >= len(self._rows): fut.set_result(None) return fut result = self._rows[self._rownumber] self._rownumber += 1 fut = self._loop.create_future() fut.set_result(result) return fut
이 예제는 매우 좋아 보이지만 호출 시작 시 로직을 명시적으로 호출해야 하며, 일반적으로 프로젝트의 경우 각 입구가 여러 개 있을 수 있습니다. 이 로직을 호출하는 것은 매우 번거롭고 가져오기 전에 먼저 후크 로직을 호출해야 함을 보여줍니다. 이런 식으로 도입 사양을 설정해야 합니다. 그렇지 않으면 후크가 일부 위치에서 성공하지 못할 수 있습니다. 파서가 시작된 후 즉시 실행되도록 후크를 도입하는 논리를 배열함으로써 완벽하게 해결되었습니다. 정보를 확인한 후 Python 인터프리터가 초기화되면 PYTHONPATH에 존재하는 sitecustomize 및 usercustomize 모듈을 자동으로 가져오는 것을 발견했습니다. 모듈을 생성하고 모듈에 대체 함수를 작성합니다. import importlib
import time
import sys
from functools import wraps
from typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKING
from types import ModuleType
if TYPE_CHECKING:
import aiomysql
def func_wrapper(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
start: float = time.time()
func_result: Any = await func(*args, **kwargs)
end: float = time.time()
# 根据_query可以知道, 第一格参数是self, 第二个参数是sql
self: aiomysql.Cursor = args[0]
sql: str = args[1]
# 通过self,我们可以拿到其他的数据
db: str = self._connection.db
user: str = self._connection.user
host: str = self._connection.host
port: str = self._connection.port
execute_result: Tuple[Tuple] = self._rows
# 可以根据自己定义的agent把数据发送到指定的平台, 然后我们就可以在平台上看到对应的数据或进行监控了,
# 这里只是打印一部分数据出来
print({
"sql": sql,
"db": db,
"user": user,
"host": host,
"port": port,
"result": execute_result,
"speed time": end - start
})
return func_result
return cast(Callable, wrapper)
class MetaPathFinder:
@staticmethod
def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]:
if fullname == 'aiomysql':
# 只有aiomysql才进行hook
return MetaPathLoader()
else:
return None
class MetaPathLoader:
@staticmethod
def load_module(fullname: str):
if fullname in sys.modules:
return sys.modules[fullname]
# 防止递归调用
finder: "MetaPathFinder" = sys.meta_path.pop(0)
# 导入 module
module: ModuleType = importlib.import_module(fullname)
# 针对_query进行hook
module.Cursor._query = func_wrapper(module.Cursor._query)
sys.meta_path.insert(0, finder)
return module
async def test_mysql() -> None:
import aiomysql
pool: aiomysql.Pool = await aiomysql.create_pool(
host='127.0.0.1', port=3306, user='root', password='123123', db='mysql'
)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 42;")
(r,) = await cur.fetchone()
assert r == 42
pool.close()
await pool.wait_closed()
if __name__ == '__main__':
sys.meta_path.insert(0, MetaPathFinder())
import asyncio
asyncio.run(test_mysql())
# 输出示例:
# 可以看出sql语句与我们输入的一样, db, user, host, port等参数也是, 还能知道执行的结果和运行时间
# {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}
hook_aiomysql.py는 프로브를 만들기 위한 코드이며 sitecustomize.py에 저장된 코드는 다음과 같습니다. 매우 간단합니다. 프로브 코드를 입력하고 sys.meta_path에 삽입하면 됩니다.
. ├── __init__.py ├── hook_aiomysql.py ├── sitecustomize.py └── test_auto_hook.pytest_auto_hook.py 테스트 코드는 다음과 같습니다.
import sys from hook_aiomysql import MetaPathFinder sys.meta_path.insert(0, MetaPathFinder())
다음으로 PYTHONPATH를 설정하고 코드를 실행하기만 하면 됩니다(프로젝트인 경우 일반적으로 감독자가 시작하며 PYTHONPATH는 구성 파일에서 설정할 수 있습니다). : import asyncio
from hook_aiomysql import test_mysql
asyncio.run(test_mysql())
4. 직접 교체 방법
위의 방법이 매우 잘 실행되고 우리 프로젝트에 쉽게 삽입될 수 있음을 알 수 있습니다. 그러나 해당 방법에 따라 타사 라이브러리로 추출하기가 어렵습니다. sitecustomize.py 파일을 참조하세요. 타사 라이브러리를 분리하려면 다른 방법이 있는지 고려해야 합니다. 위에서 MetaPathLoader를 소개할 때 sys.modules를 언급했는데, 여기서는 반복되는 소개를 줄이기 위해 sys.modules를 사용합니다.
(.venv) ➜ python_hook git:(master) ✗ export PYTHONPATH=. (.venv) ➜ python_hook git:(master) ✗ python test_auto_hook.py {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000213623046875}
import time from functools import wraps from typing import Any, Callable, Tuple, cast import aiomysql def func_wrapper(func: Callable): """和上面一样的封装函数, 这里简单略过""" # 判断是否hook过 _IS_HOOK: bool = False # 存放原来的_query _query: Callable = aiomysql.Cursor._query # hook函数 def install_hook() -> None: _IS_HOOK = False if _IS_HOOK: return aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query) _IS_HOOK = True # 还原到原来的函数方法 def reset_hook() -> None: aiomysql.Cursor._query = _query _IS_HOOK = False
代码简单明了,接下来跑一跑刚才的测试:
import asyncio import aiomysql from demo import install_hook, reset_hook async def test_mysql() -> None: pool: aiomysql.Pool = await aiomysql.create_pool( host='127.0.0.1', port=3306, user='root', password='', db='mysql' ) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed() print("install hook") install_hook() asyncio.run(test_mysql()) print("reset hook") reset_hook() asyncio.run(test_mysql()) print("end")
通过测试输出可以发现我们的逻辑的正确的, install hook后能出现我们提取的元信息, 而reset后则不会打印原信息
install hook {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000347137451171875} reset hook end
위 내용은 Python 프로브는 호출 라이브러리에서 데이터 추출을 어떻게 완료합니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!