Rumah >pembangunan bahagian belakang >Tutorial Python >Bagaimanakah Python menyiasat pengekstrakan data lengkap dari perpustakaan panggilan?

Bagaimanakah Python menyiasat pengekstrakan data lengkap dari perpustakaan panggilan?

王林
王林ke hadapan
2023-05-16 08:46:051102semak imbas

1. Kaedah ringkas dan kasar - merangkum perpustakaan mysql

Untuk mengira proses pelaksanaan, anda perlu mengetahui kedudukan mula dan akhir proses pelaksanaan, jadi kaedah paling mudah dan paling kasar adalah berdasarkan keperluan Kaedah panggilan dikapsulkan, dan lapisan perantaraan dilaksanakan antara rangka kerja yang memanggil perpustakaan MySQL dan perpustakaan MySQL, dan statistik yang memakan masa dilengkapkan dalam lapisan perantaraan, seperti:

# 伪代码
def my_execute(conn, sql, param):
 # 针对MySql库的统计封装组件
 with MyTracer(conn, sql, param):
     # 以下为正常使用MySql库的代码
with conn.cursor as cursor:
 cursor.execute(sql, param)
...

Nampaknya sangat bagus untuk dilaksanakan, Dan perubahan itu sangat mudah, tetapi kerana ia diubah suai pada API peringkat atas, ia sebenarnya sangat tidak fleksibel Pada masa yang sama, beberapa pra-operasi dilakukan dalam kursor .execute, seperti splicing sql dan param, dan memanggil nextset untuk mengosongkan kursor semasa dan banyak lagi. Data yang akhirnya kami perolehi, seperti masa dan penggunaan, juga tidak tepat dan tiada cara untuk mendapatkan beberapa metadata terperinci, seperti kod ralat, dsb.

Jika anda ingin mendapatkan yang paling langsung dan berguna data, hanya Anda boleh menukar kod sumber dan kemudian memanggil kod sumber, tetapi jika setiap perpustakaan perlu menukar kod sumber untuk membuat statistik, ia akan menjadi terlalu menyusahkan, Python juga menyediakan beberapa antara muka yang serupa dengan probe, yang boleh digunakan untuk mengira statistik. Gantikan kod sumber perpustakaan untuk melengkapkan kod kami.

2. Siasatan Python

Dalam Python, fungsi cangkuk import boleh dilaksanakan melalui sys.meta_path operasi berkaitan import, Perpustakaan berkaitan import akan diubah berdasarkan objek yang ditakrifkan oleh sys.meta_path Objek dalam sys.meta_path perlu melaksanakan kaedah find_module ini mengembalikan Tiada atau objek yang melaksanakan kaedah load_module. Kita boleh menggunakan objek ini untuk menyasarkan beberapa perpustakaan Apabila mengimport, menggantikan kaedah yang berkaitan Penggunaan mudah adalah seperti berikut

Selepas memahami proses utama, anda boleh mula membuat modul probe anda sendiri Memandangkan contoh hanya melibatkan modul aiomysql, hanya modul aiomysql perlu diproses dalam MetaPathFinder.find_module, dan yang lain akan diabaikan. Pertama. Kemudian kita perlu memastikan kita mahu Fungsi aiomysql yang manakah harus digantikan Dari perspektif perniagaan, secara amnya kita hanya memerlukan operasi utama cursor.execute, cursor.fetchone, cursor.fetchall, cursor.executemany, jadi kita perlu pergi jauh ke kursor untuk melihat cara menukar kod yang mana fungsi yang lebih dimuatkan oleh yang terakhir? kaedah self.nextset akan dipanggil dahulu untuk mendapatkan data daripada permintaan sebelumnya, dan kemudian menggabungkan pernyataan sql, dan akhirnya membuat pertanyaan melalui self._query:

import importlib
import sys
from functools import wraps
def func_wrapper(func):
    """这里通过一个装饰器来达到狸猫换太子和获取数据的效果"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 记录开始时间
        start = time.time()
        result = func(*args, **kwargs)
        # 统计消耗时间
        end = time.time()
        print(f"speed time:{end - start}")
        return result
    return wrapper
class MetaPathFinder:
    def find_module(self, fullname, path=None):
        # 执行时可以看出来在import哪些模块
        print(f'find module:{path}:{fullname}')
        return MetaPathLoader()
class MetaPathLoader:
    def load_module(self, fullname):
        # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止递归调用
        finder = sys.meta_path.pop(0)
        # 导入 module
        module = importlib.import_module(fullname)
        if fullname == 'time':
            # 替换函数
            module.sleep = func_wrapper(module.sleep)
        sys.meta_path.insert(0, finder)
        return module
sys.meta_path.insert(0, MetaPathFinder())
if __name__ == '__main__':
    import time
    time.sleep(1)
# 输出示例:
# find module:datetime
# find module:time
# load module:time
# find module:math
# find module:_datetime
# speed time:1.00073385238647468

Melihat kod sumber cursor.fetchone (kursor .fetchall adalah serupa), kami mendapati bahawa data sebenarnya diperoleh daripada cache,

Data ini telah diperolehi semasa pelaksanaan cursor.execute:

async def execute(self, query, args=None):
    """Executes the given operation
    Executes the given operation substituting any markers with
    the given parameters.
    For example, getting all rows where id is 5:
        cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,))
    :param query: ``str`` sql statement
    :param args: ``tuple`` or ``list`` of arguments for sql query
    :returns: ``int``, number of rows that has been produced of affected
    """
    conn = self._get_db()

    while (await self.nextset()):
        pass

    if args is not None:
        query = query % self._escape_args(args, conn)

    await self._query(query)
    self._executed = query
    if self._echo:
        logger.info(query)
        logger.info("%r", args)
    return self._rowcount

Berdasarkan analisis di atas, kita hanya perlu membebankan kaedah teras self._query untuk mendapatkan data yang kita inginkan, daripada Kita boleh tahu daripada kod sumber bahawa kita boleh mendapatkan parameter diri dan sql yang dihantar ke diri._query, dan kita boleh dapatkan hasil pertanyaan berdasarkan diri Pada masa yang sama, kita boleh mendapatkan masa berjalan melalui penghias, dan semua data yang diperlukan pada asasnya tersedia,

Kod diubah suai mengikut ideanya adalah seperti berikut:

def fetchone(self):
    """Fetch the next row """
    self._check_executed()
    fut = self._loop.create_future()
    if self._rows is None or self._rownumber >= len(self._rows):
        fut.set_result(None)
        return fut
    result = self._rows[self._rownumber]
    self._rownumber += 1
    fut = self._loop.create_future()
    fut.set_result(result)
    return fut

Contoh ini kelihatan sangat bagus, tetapi logiknya perlu dipanggil secara jelas di pintu masuk panggilan Biasanya Projek mungkin mempunyai beberapa entri logik ini akan menjadi sangat menyusahkan, dan logik cangkuk kami mesti dipanggil terlebih dahulu sebelum mengimport Dengan cara ini, spesifikasi pengenalan mesti ditetapkan, jika tidak cangkuk mungkin tidak berjaya di beberapa tempat Jika logik memperkenalkan cangkuk boleh diatur untuk dilaksanakan serta-merta selepas parser dimulakan, masalah ini boleh diselesaikan dengan sempurna Selepas menyemak maklumat, saya mendapati bahawa apabila penterjemah python dimulakan, ia secara automatik akan mengimport modul sitecustomize dan usercustomize yang wujud di bawah PYTHONPATH , kami hanya perlukan untuk mencipta modul dan menulis fungsi gantian kami dalam modul.

import importlib
import time
import sys
from functools import wraps
from typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKING
from types import ModuleType
if TYPE_CHECKING:
    import aiomysql
def func_wrapper(func: Callable):
    @wraps(func)
    async def wrapper(*args, **kwargs) -> Any:
        start: float = time.time()
        func_result: Any = await func(*args, **kwargs)
        end: float = time.time()
        # 根据_query可以知道, 第一格参数是self, 第二个参数是sql
        self: aiomysql.Cursor = args[0]
        sql: str = args[1]
        # 通过self,我们可以拿到其他的数据
        db: str = self._connection.db
        user: str = self._connection.user
        host: str = self._connection.host
        port: str = self._connection.port
        execute_result: Tuple[Tuple] = self._rows
        # 可以根据自己定义的agent把数据发送到指定的平台, 然后我们就可以在平台上看到对应的数据或进行监控了, 
        # 这里只是打印一部分数据出来
        print({
            "sql": sql,
            "db": db,
            "user": user,
            "host": host,
            "port": port,
            "result": execute_result,
            "speed time": end - start
        })
        return func_result
    return cast(Callable, wrapper)
class MetaPathFinder:

    @staticmethod
    def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]:
        if fullname == 'aiomysql':
            # 只有aiomysql才进行hook
            return MetaPathLoader()
        else:
            return None
class MetaPathLoader:
    @staticmethod
    def load_module(fullname: str):
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止递归调用
        finder: "MetaPathFinder" = sys.meta_path.pop(0)
        # 导入 module
        module: ModuleType = importlib.import_module(fullname)
        # 针对_query进行hook
        module.Cursor._query = func_wrapper(module.Cursor._query)
        sys.meta_path.insert(0, finder)
        return module
async def test_mysql() -> None:
    import aiomysql
    pool: aiomysql.Pool = await aiomysql.create_pool(
        host='127.0.0.1', port=3306, user='root', password='123123', db='mysql'
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            (r,) = await cur.fetchone()
            assert r == 42
    pool.close()
    await pool.wait_closed()

if __name__ == '__main__':
    sys.meta_path.insert(0, MetaPathFinder())
    import asyncio
    asyncio.run(test_mysql())
# 输出示例:
# 可以看出sql语句与我们输入的一样, db, user, host, port等参数也是, 还能知道执行的结果和运行时间
# {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}
hook_aiomysql.py ialah kod kami untuk membuat probe sebagai contoh, dan kod yang disimpan dalam sitecustomize.py adalah seperti berikut, cuma perkenalkan kod probe kami dan masukkan ke dalam sys.meta_path:

.
├── __init__.py
├── hook_aiomysql.py
├── sitecustomize.py
└── test_auto_hook.py

test_auto_hook.py ialah kod ujian:

import sys
from hook_aiomysql import MetaPathFinder
sys.meta_path.insert(0, MetaPathFinder())

Seterusnya, cuma tetapkan PYTHONPATH dan jalankan kod kami (jika ia projek, ia biasanya dimulakan oleh penyelia, maka anda boleh Tetapkan PYTHONPATH dalam fail konfigurasi):

import asyncio
from hook_aiomysql import test_mysql
asyncio.run(test_mysql())
4 Kaedah penggantian langsung

Anda dapat melihat bahawa kaedah di atas berjalan dengan baik, dan boleh disematkan dengan mudah ke dalam kami. project , tetapi ia bergantung pada fail sitecustomize.py dan sukar untuk mengekstraknya ke pustaka pihak ketiga Jika anda ingin mengekstraknya ke pustaka pihak ketiga, anda perlu mempertimbangkan sama ada terdapat kaedah lain. Apabila memperkenalkan MetaPathLoader di atas, saya menyebut sys.module, di mana sys.modules digunakan untuk mengurangkan pengenalan berulang:

(.venv) ➜  python_hook git:(master) ✗ export PYTHONPATH=.      
(.venv) ➜  python_hook git:(master) ✗ python test_auto_hook.py 
{'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000213623046875}

Prinsip mengurangkan pengenalan berulang ialah setiap kali modul diperkenalkan, ia akan disimpan dalam sys .modules, jika ia diperkenalkan berulang kali, ia akan terus dimuat semula ke modul yang diimport terkini. Sebab mengapa kami mempertimbangkan untuk mengurangkan import berulang di atas adalah kerana kami tidak akan menaik taraf kebergantungan perpustakaan pihak ketiga apabila program dijalankan. Mengambil kesempatan daripada fakta bahawa kami tidak perlu mempertimbangkan berulang kali memperkenalkan modul dengan nama yang sama dan pelaksanaan yang berbeza, dan sys.modules akan cache modul yang diimport, kami boleh memudahkan logik di atas ke dalam mengimport modul -> Gantikan kaedah modul semasa dengan kaedah cangkuk yang kami ubah suai.

import time
from functools import wraps
from typing import Any, Callable, Tuple, cast
import aiomysql
def func_wrapper(func: Callable):
    """和上面一样的封装函数, 这里简单略过"""
# 判断是否hook过
_IS_HOOK: bool = False
# 存放原来的_query
_query: Callable = aiomysql.Cursor._query
# hook函数
def install_hook() -> None:
    _IS_HOOK = False
    if _IS_HOOK:
        return
    aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query)
    _IS_HOOK = True
# 还原到原来的函数方法
def reset_hook() -> None:
    aiomysql.Cursor._query = _query
    _IS_HOOK = False

代码简单明了,接下来跑一跑刚才的测试:

import asyncio
import aiomysql
from demo import install_hook, reset_hook
async def test_mysql() -> None:
    pool: aiomysql.Pool = await aiomysql.create_pool(
        host='127.0.0.1', port=3306, user='root', password='', db='mysql'
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            (r,) = await cur.fetchone()
            assert r == 42
    pool.close()
    await pool.wait_closed()

print("install hook")
install_hook()
asyncio.run(test_mysql())
print("reset hook")
reset_hook()
asyncio.run(test_mysql())
print("end")

通过测试输出可以发现我们的逻辑的正确的, install hook后能出现我们提取的元信息, 而reset后则不会打印原信息

install hook
{'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000347137451171875}
reset hook
end

Atas ialah kandungan terperinci Bagaimanakah Python menyiasat pengekstrakan data lengkap dari perpustakaan panggilan?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam