Home >Backend Development >Python Tutorial >How does Python probe complete data extraction from the calling library?
To count an execution process, you need to know the start and end positions of the execution process, so the simplest and crudest method is based on the requirements The calling method is encapsulated, and an intermediate layer is implemented between the MySQL library and the MySQL library called by the framework. The time-consuming statistics are completed in the middle layer. For example:
# 伪代码 def my_execute(conn, sql, param): # 针对MySql库的统计封装组件 with MyTracer(conn, sql, param): # 以下为正常使用MySql库的代码 with conn.cursor as cursor: cursor.execute(sql, param) ...
It seems to be very good to implement. And the change is very convenient, but because it is modified on the top-level API, it is actually very inflexible. At the same time, some pre-operations are performed in cursor.execute, such as splicing sql and param, and calling nextset to clear the current cursor. Data and more. The data we finally got, such as time and consumption, was inaccurate, and there was no way to get some detailed metadata, such as error codes, etc.
If you want to get the most direct and useful data, just You can change the source code and then call the source code, but if each library needs to change the source code to make statistics, it would be too troublesome. Fortunately, Python also provides some interfaces similar to probes, which can be used to calculate statistics. Replace the source code of the library to complete our code.
In Python, the import hook function can be implemented through sys.meta_path. When performing import-related operations, The import-related libraries will be changed based on the objects defined by sys.meta_path. The objects in sys.meta_path need to implement a find_module method. This find_module method returns None or an object that implements the load_module method. We can use this object to target some libraries. When importing, replace the relevant methods. The simple usage is as follows. Use hooktime.sleep to print the time consumed during sleep.
import importlib import sys from functools import wraps def func_wrapper(func): """这里通过一个装饰器来达到狸猫换太子和获取数据的效果""" @wraps(func) def wrapper(*args, **kwargs): # 记录开始时间 start = time.time() result = func(*args, **kwargs) # 统计消耗时间 end = time.time() print(f"speed time:{end - start}") return result return wrapper class MetaPathFinder: def find_module(self, fullname, path=None): # 执行时可以看出来在import哪些模块 print(f'find module:{path}:{fullname}') return MetaPathLoader() class MetaPathLoader: def load_module(self, fullname): # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import if fullname in sys.modules: return sys.modules[fullname] # 防止递归调用 finder = sys.meta_path.pop(0) # 导入 module module = importlib.import_module(fullname) if fullname == 'time': # 替换函数 module.sleep = func_wrapper(module.sleep) sys.meta_path.insert(0, finder) return module sys.meta_path.insert(0, MetaPathFinder()) if __name__ == '__main__': import time time.sleep(1) # 输出示例: # find module:datetime # find module:time # load module:time # find module:math # find module:_datetime # speed time:1.00073385238647468
After understanding the main process, you can start to make your own probe module. Since the example only involves the aiomysql module, only the aiomysql module needs to be processed in MetaPathFinder.find_module, and the others will be ignored first. Then we need to make sure we want to Which function of aiomysql should be replaced? From a business perspective, generally we only need the main operations of cursor.execute, cursor.fetchone, cursor.fetchall, cursor.executemany, so we need to go deep into cursor to see how to change the code. Which function is overloaded by the latter?
First, look at the source code of cursor.execute (cursor.executemanay is similar), and find that the self.nextset method will be called first to get the data from the previous request, and then merge the sql statement, and finally query through self._query:
async def execute(self, query, args=None): """Executes the given operation Executes the given operation substituting any markers with the given parameters. For example, getting all rows where id is 5: cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,)) :param query: ``str`` sql statement :param args: ``tuple`` or ``list`` of arguments for sql query :returns: ``int``, number of rows that has been produced of affected """ conn = self._get_db() while (await self.nextset()): pass if args is not None: query = query % self._escape_args(args, conn) await self._query(query) self._executed = query if self._echo: logger.info(query) logger.info("%r", args) return self._rowcount
Looking at the source code of cursor.fetchone (cursor.fetchall is similar), we found that the data is actually obtained from the cache,
These data have been obtained during the execution of cursor.execute:
def fetchone(self): """Fetch the next row """ self._check_executed() fut = self._loop.create_future() if self._rows is None or self._rownumber >= len(self._rows): fut.set_result(None) return fut result = self._rows[self._rownumber] self._rownumber += 1 fut = self._loop.create_future() fut.set_result(result) return fut
Based on the above analysis, we only need to overload the core method self._query to get the data we want, from We can know from the source code that we can get the self and sql parameters passed into self._query, and we can get the results of the query based on self. At the same time, we can get the running time through the decorator, and all the required data are basically available. Okay,
The modified code according to the idea is as follows:
import importlib import time import sys from functools import wraps from typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKING from types import ModuleType if TYPE_CHECKING: import aiomysql def func_wrapper(func: Callable): @wraps(func) async def wrapper(*args, **kwargs) -> Any: start: float = time.time() func_result: Any = await func(*args, **kwargs) end: float = time.time() # 根据_query可以知道, 第一格参数是self, 第二个参数是sql self: aiomysql.Cursor = args[0] sql: str = args[1] # 通过self,我们可以拿到其他的数据 db: str = self._connection.db user: str = self._connection.user host: str = self._connection.host port: str = self._connection.port execute_result: Tuple[Tuple] = self._rows # 可以根据自己定义的agent把数据发送到指定的平台, 然后我们就可以在平台上看到对应的数据或进行监控了, # 这里只是打印一部分数据出来 print({ "sql": sql, "db": db, "user": user, "host": host, "port": port, "result": execute_result, "speed time": end - start }) return func_result return cast(Callable, wrapper) class MetaPathFinder: @staticmethod def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]: if fullname == 'aiomysql': # 只有aiomysql才进行hook return MetaPathLoader() else: return None class MetaPathLoader: @staticmethod def load_module(fullname: str): if fullname in sys.modules: return sys.modules[fullname] # 防止递归调用 finder: "MetaPathFinder" = sys.meta_path.pop(0) # 导入 module module: ModuleType = importlib.import_module(fullname) # 针对_query进行hook module.Cursor._query = func_wrapper(module.Cursor._query) sys.meta_path.insert(0, finder) return module async def test_mysql() -> None: import aiomysql pool: aiomysql.Pool = await aiomysql.create_pool( host='127.0.0.1', port=3306, user='root', password='123123', db='mysql' ) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed() if __name__ == '__main__': sys.meta_path.insert(0, MetaPathFinder()) import asyncio asyncio.run(test_mysql()) # 输出示例: # 可以看出sql语句与我们输入的一样, db, user, host, port等参数也是, 还能知道执行的结果和运行时间 # {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}
This example seems very good, but the logic needs to be explicitly called at the entrance of the call. Usually A project may have several entries. Each entry shows that calling this logic will be very troublesome, and our hook logic must be called first before it can be imported. In this way, the introduction specifications must be set, otherwise the hook may not be successful in some places. If the logic of introducing the hook can be arranged to be executed immediately after the parser is started, this problem can be perfectly solved. After checking the information, I found that when the python interpreter is initialized, it will automatically import the sitecustomize and usercustomize modules that exist under PYTHONPATH. , we only need to create the module and write our replacement function in the module.
. ├── __init__.py ├── hook_aiomysql.py ├── sitecustomize.py └── test_auto_hook.py
hook_aiomysql.py is our probe code as an example, and the code stored in sitecustomize.py is as follows. It is very simple, just introduce our probe code and insert it into sys.meta_path:
import sys from hook_aiomysql import MetaPathFinder sys.meta_path.insert(0, MetaPathFinder())
test_auto_hook.py is the test code:
import asyncio from hook_aiomysql import test_mysql asyncio.run(test_mysql())
Next, just set the PYTHONPATH and run our code (if it is a project, it is usually started by the supervisor, then you can Set PYTHONPATH in the configuration file):
(.venv) ➜ python_hook git:(master) ✗ export PYTHONPATH=. (.venv) ➜ python_hook git:(master) ✗ python test_auto_hook.py {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000213623046875}
. You can see that the above method runs very well, and can be easily embedded into our project. , but it depends on the sitecustomize.py file and it is difficult to extract it into a third-party library. If you want to extract it into a third-party library, you have to consider whether there are other methods. When introducing MetaPathLoader above, I mentioned sys.module, in which sys.modules is used to reduce repeated introductions:
class MetaPathLoader: def load_module(self, fullname): # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import if fullname in sys.modules: return sys.modules[fullname] # 防止递归调用 finder = sys.meta_path.pop(0) # 导入 module module = importlib.import_module(fullname) if fullname == 'time': # 替换函数 module.sleep = func_wrapper(module.sleep) sys.meta_path.insert(0, finder) return module
The principle of reducing repeated introductions is that every time a module is introduced, it will be stored in sys .modules, if it is introduced repeatedly, it will be directly refreshed to the latest imported module. The reason why we consider reducing repeated imports above is because we will not upgrade third-party library dependencies when the program is running. Taking advantage of the fact that we do not need to consider repeatedly introducing modules with the same name and different implementations, and that sys.modules will cache imported modules, we can simplify the above logic into importing modules -> Replace the current module method with the hook method we modified.
import time from functools import wraps from typing import Any, Callable, Tuple, cast import aiomysql def func_wrapper(func: Callable): """和上面一样的封装函数, 这里简单略过""" # 判断是否hook过 _IS_HOOK: bool = False # 存放原来的_query _query: Callable = aiomysql.Cursor._query # hook函数 def install_hook() -> None: _IS_HOOK = False if _IS_HOOK: return aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query) _IS_HOOK = True # 还原到原来的函数方法 def reset_hook() -> None: aiomysql.Cursor._query = _query _IS_HOOK = False
代码简单明了,接下来跑一跑刚才的测试:
import asyncio import aiomysql from demo import install_hook, reset_hook async def test_mysql() -> None: pool: aiomysql.Pool = await aiomysql.create_pool( host='127.0.0.1', port=3306, user='root', password='', db='mysql' ) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed() print("install hook") install_hook() asyncio.run(test_mysql()) print("reset hook") reset_hook() asyncio.run(test_mysql()) print("end")
通过测试输出可以发现我们的逻辑的正确的, install hook后能出现我们提取的元信息, 而reset后则不会打印原信息
install hook {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000347137451171875} reset hook end
The above is the detailed content of How does Python probe complete data extraction from the calling library?. For more information, please follow other related articles on the PHP Chinese website!