Maison  >  Article  >  développement back-end  >  Jetons un coup d'œil à l'envoi asynchrone de journaux à un serveur distant en python

Jetons un coup d'œil à l'envoi asynchrone de journaux à un serveur distant en python

coldplay.xixi
coldplay.xixiavant
2020-10-14 17:33:402866parcourir

Jetons un coup d'œil à l'envoi asynchrone de journaux à un serveur distant en python

Autres apprentissages gratuits connexes : tutoriel vidéo Python

La façon la plus courante d'utiliser les journaux en python est de générer des journaux dans la console et dans les fichiers. Le module de journalisation fournit également les classes correspondantes, qui sont très pratiques à utiliser, mais parfois nous pouvons avoir certains besoins, tels que Nous devons envoyer des journaux à. l'extrémité distante, ou écrire directement dans la base de données. Comment répondre à cette exigence ?

1 StreamHandler et FileHandler

Écrivons d'abord un ensemble simple de sorties dans cmd et le code dans le file

# -*- coding: utf-8 -*-"""
-------------------------------------------------
   File Name:     loger
   Description :
   Author :       yangyanxing
   date:          2020/9/23
-------------------------------------------------
"""import loggingimport sysimport os# 初始化loggerlogger = logging.getLogger("yyx")
logger.setLevel(logging.DEBUG)# 设置日志格式fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S')# 添加cmd handlercmd_handler = logging.StreamHandler(sys.stdout)
cmd_handler.setLevel(logging.DEBUG)
cmd_handler.setFormatter(fmt)# 添加文件的handlerlogpath = os.path.join(os.getcwd(), 'debug.log')
file_handler = logging.FileHandler(logpath)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)# 将cmd和file handler添加到logger中logger.addHandler(cmd_handler)
logger.addHandler(file_handler)

logger.debug("今天天气不错")复制代码

initialise d'abord un enregistreur et définit son niveau de journalisation sur DEBUG, puis initialise cmd_handler et file_handler, et enfin les ajoute à l'enregistreur, exécute le script et il sera imprimé dans cmd Out [2020-09-23 10:45:56] [DEBUG] 今天天气不错 et sera écrit dans le fichier debug.log dans le répertoire actuel

2. Ajoutez HTTPHandler

Si vous souhaitez envoyer le journal au serveur distant lors de l'enregistrement, vous pouvez ajouter un <.>. Dans la bibliothèque standard python logging.handler, de nombreux gestionnaires ont été définis pour nous, dont certains que nous pouvons utiliser directement. Nous utilisons tornado localement pour écrire une interface pour recevoir les journaux et imprimer tous les paramètres reçus HTTPHandler

# 添加一个httphandlerimport logging.handlers
http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)

logger.debug("今天天气不错")复制代码
.

En conséquence, nous avons reçu beaucoup d'informations côté serveur

{
    'name': [b 'yyx'],
    'msg': [b '\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
    'args': [b '()'],
    'levelname': [b 'DEBUG'],
    'levelno': [b '10'],
    'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'],
    'filename': [b 'loger.py'],
    'module': [b 'loger'],
    'exc_info': [b 'None'],
    'exc_text': [b 'None'],
    'stack_info': [b 'None'],
    'lineno': [b '41'],
    'funcName': [b '<module>'],
    'created': [b '1600831054.8881223'],
    'msecs': [b '888.1223201751709'],
    'relativeCreated': [b '22.99976348876953'],
    'thread': [b '14876'],
    'threadName': [b 'MainThread'],
    'processName': [b 'MainProcess'],
    'process': [b '8648'],
    'message': [b '\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
    'asctime': [b '2020-09-23 11:17:34']
}复制代码
On peut dire qu'il y a beaucoup d'informations, mais ce n'est pas ce que nous voulons, nous voulons juste quelque chose comme <.> Log.

[2020-09-23 10:45:56] [DEBUG] 今天天气不错

envoie simplement toutes les informations du journal au serveur. Quant à la façon dont le serveur organise le contenu, cela est fait par le serveur. Nous pouvons donc avoir deux méthodes, l'une consiste à changer. le code de fin du service, réorganisez le contenu du journal en fonction des informations du journal transmises. La seconde consiste à réécrire une classe afin qu'elle puisse reformater le contenu du journal et l'envoyer au serveur lors de l'envoi.

logging.handlers.HTTPHandlerNous utilisons la seconde. méthode, car cette méthode est plus flexible, le serveur n'est utilisé que pour l'enregistrement et le client doit décider quel contenu envoyer.

Nous devons redéfinir une classe. Nous pouvons nous référer à

cette classe et réécrire une classe httpHandler

logging.handlers.HTTPHandlerChaque classe de journal doit remplacer la méthode d'émission, qui est réellement exécutée lors de l'enregistrement. logs. Il s'agit de la méthode d'émission

class CustomHandler(logging.Handler):
    def __init__(self, host, uri, method="POST"):
        logging.Handler.__init__(self)
        self.url = "%s/%s" % (host, uri)
        method = method.upper()        if method not in ["GET", "POST"]:            raise ValueError("method must be GET or POST")
        self.method = method    def emit(self, record):
        '''
        :param record:
        :return:
        '''
        msg = self.format(record)        if self.method == "GET":            if (self.url.find("?") >= 0):
                sep = '&'
            else:
                sep = '?'
            url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
            requests.get(url, timeout=1)        else:
            headers = {                "Content-type": "application/x-www-form-urlencoded",                "Content-length": str(len(msg))
            }
            requests.post(self.url, data={'log': msg}, headers=headers, timeout=1)复制代码

Il y a une ligne dans le code ci-dessus qui définit les paramètres à envoyer

msg = self.format(record) Cette ligne de code signifie que le contenu correspondant sera renvoyé. selon le format défini par l'objet journal. Après cela, le contenu est envoyé via la bibliothèque de requêtes. Quelle que soit l'utilisation de la méthode get ou post, le serveur peut recevoir le journal normalement

[2020-09-23 11:43:50] [DEBUG] 今天天气不错Envoi de journaux distants de manière asynchrone

3. 🎜>

Nous examinons maintenant un problème. Lorsque le journal est envoyé au serveur distant, si le serveur distant le traite très lentement, cela prendra un certain temps, puis l'enregistrement du journal ralentira

Modifiez la classe de traitement des journaux du serveur, laissez-la faire une pause de 5 secondes pour simuler un long processus de traitement

async def post(self):
    print(self.getParam('log'))    await asyncio.sleep(5)
    self.write({"msg": 'ok'})复制代码
À ce moment, nous imprimons le journal ci-dessus

logger.debug("今天天气不错")
logger.debug("是风和日丽的")复制代码
et le résultat obtenu est

[2020-09-23 11:47:33] [DEBUG] 今天天气不错
[2020-09-23 11:47:38] [DEBUG] 是风和日丽的复制代码
Nous avons remarqué que leur intervalle de temps est également de 5 secondes.

Le problème vient maintenant. À l'origine, ce n'était qu'un journal, mais c'est maintenant devenu un fardeau qui ralentit l'ensemble du script, nous devons donc gérer l'écriture du journal à distance de manière asynchrone.

3.1 Utiliser le multi-threading

La première chose à penser est d'utiliser plusieurs threads pour exécuter la méthode d'envoi de journaux

def emit(self, record):
    msg = self.format(record)    if self.method == "GET":        if (self.url.find("?") >= 0):
            sep = '&'
        else:
            sep = '?'
        url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
        t = threading.Thread(target=requests.get, args=(url,))
        t.start()    else:
        headers = {            "Content-type": "application/x-www-form-urlencoded",            "Content-length": str(len(msg))
        }
        t = threading.Thread(target=requests.post, args=(self.url,), kwargs={"data":{'log': msg}, "headers":headers})
        t.start()复制代码
Cette méthode peut atteindre l'objectif principal de ne pas blocage, mais chaque fois qu'un journal est imprimé, un fil de discussion doit être ouvert, ce qui est également un gaspillage de ressources. Nous pouvons également utiliser des pools de threads pour traiter

3.2 Utiliser des pools de threads pour traiter

Il existe des classes ThreadPoolExecutor et ProcessPoolExecutor dans concurrent.futures de python, qui sont des pools de threads et des pools de processus. Ils sont utilisés en premier. lors de l'initialisation. Définissez plusieurs threads, puis laissez ces threads gérer les fonctions correspondantes, afin que vous n'ayez pas besoin de créer de nouveaux threads à chaque fois

Utilisation de base du pool de threads

exector = ThreadPoolExecutor(max_workers=1) # 初始化一个线程池,只有一个线程exector.submit(fn, args, kwargs) # 将函数submit到线程池中复制代码
S'il y a y a-t-il n threads dans le pool de threads, lorsque le nombre de tâches soumises est supérieur à n, les tâches en excès seront placées dans la file d'attente.

Modifiez à nouveau la fonction d'émission ci-dessus

exector = ThreadPoolExecutor(max_workers=1)def emit(self, record):
    msg = self.format(record)
    timeout = aiohttp.ClientTimeout(total=6)    if self.method == "GET":        if (self.url.find("?") >= 0):
            sep = '&'
        else:
            sep = '?'
        url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
        exector.submit(requests.get, url, timeout=6)    else:
        headers = {            "Content-type": "application/x-www-form-urlencoded",            "Content-length": str(len(msg))
        }
        exector.submit(requests.post, self.url, data={'log': msg}, headers=headers, timeout=6)复制代码
Pourquoi seulement initialiser un thread avec un seul thread ici ? Parce que de cette façon, on peut garantir que les journaux de la file d'attente avancée seront envoyés en premier. S'il y a plusieurs threads dans le pool, l'ordre n'est pas nécessairement garanti.

3.3 Utilisez la bibliothèque asynchrone aiohttp pour envoyer des requêtes

La méthode submit de la classe CustomHandler ci-dessus utilise request.post pour envoyer des journaux. Les requêtes elles-mêmes sont bloquées et exécutées, c'est pourquoi son existence. rend le script bloqué pendant une longue période, nous pouvons donc remplacer la bibliothèque de requêtes de blocage par aiohttp asynchrone pour exécuter les méthodes get et post, et réécrire la méthode d'émission dans un CustomHandler

class CustomHandler(logging.Handler):
    def __init__(self, host, uri, method="POST"):
        logging.Handler.__init__(self)
        self.url = "%s/%s" % (host, uri)
        method = method.upper()        if method not in ["GET", "POST"]:            raise ValueError("method must be GET or POST")
        self.method = method    async def emit(self, record):
        msg = self.format(record)
        timeout = aiohttp.ClientTimeout(total=6)        if self.method == "GET":            if (self.url.find("?") >= 0):
                sep = '&'
            else:
                sep = '?'
            url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))            async with aiohttp.ClientSession(timeout=timeout) as session:                async with session.get(self.url) as resp:
                    print(await resp.text())        else:
            headers = {                "Content-type": "application/x-www-form-urlencoded",                "Content-length": str(len(msg))
            }            async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session:                async with session.post(self.url, data={'log': msg}) as resp:
                    print(await resp.text())复制代码
À ce stade, l'exécution du code crashé

C:\Python37\lib\logging\__init__.py:894: RuntimeWarning: coroutine 'CustomHandler.emit' was never awaited
  self.emit(record)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback复制代码
Le serveur n'a pas reçu la demande d'envoi du journal.

究其原因是由于emit方法中使用async with session.post 函数,它需要在一个使用async 修饰的函数里执行,所以修改emit函数,使用async来修饰,这里emit函数变成了异步的函数, 返回的是一个coroutine 对象,要想执行coroutine对象,需要使用await, 但是脚本里却没有在哪里调用 await emit() ,所以崩溃信息中显示coroutine 'CustomHandler.emit' was never awaited.

既然emit方法返回的是一个coroutine对象,那么我们将它放一个loop中执行

async def main():
    await logger.debug("今天天气不错")    await logger.debug("是风和日丽的")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())复制代码

执行依然报错

raise TypeError('An asyncio.Future, a coroutine or an awaitable is '复制代码

意思是需要的是一个coroutine,但是传进来的对象不是。

这似乎就没有办法了,想要使用异步库来发送,但是却没有可以调用await的地方.

解决办法是有的,我们使用 asyncio.get_event_loop() 获取一个事件循环对象, 我们可以在这个对象上注册很多协程对象,这样当执行事件循环的时候,就是去执行注册在该事件循环上的协程, 我们通过一个小例子来看一下

import asyncio 

async def test(n):
    while n > 0:        await asyncio.sleep(1)
        print("test {}".format(n))
        n -= 1
    return n    
async def test2(n):
    while n >0:        await asyncio.sleep(1)
        print("test2 {}".format(n))
        n -= 1def stoploop(task):
    print("执行结束, task n is {}".format(task.result()))
    loop.stop()

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task2 = loop.create_task(test2(3))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))

loop.run_forever()复制代码

我们使用loop = asyncio.get_event_loop() 创建了一个事件循环对象loop, 并且在loop上创建了两个task, 并且给task1添加了一个回调函数,在task1它执行结束以后,将loop停掉.

注意看上面的代码,我们并没有在某处使用await来执行协程,而是通过将协程注册到某个事件循环对象上,然后调用该循环的run_forever() 函数,从而使该循环上的协程对象得以正常的执行.

上面得到的输出为

test 5
test2 3
test 4
test2 2
test 3
test2 1
test 2
test 1
执行结束, task n is 0复制代码

可以看到,使用事件循环对象创建的task,在该循环执行run_forever() 以后就可以执行了.

如果不执行loop.run_forever() 函数,则注册在它上面的协程也不会执行

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
time.sleep(5)# loop.run_forever()复制代码

上面的代码将loop.run_forever() 注释掉,换成time.sleep(5) 停5秒, 这时脚本不会有任何输出,在停了5秒以后就中止了.

回到之前的日志发送远程服务器的代码,我们可以使用aiohttp封装一个发送数据的函数, 然后在emit中将这个函数注册到全局的事件循环对象loop中,最后再执行loop.run_forever() .

loop = asyncio.get_event_loop()class CustomHandler(logging.Handler):
    def __init__(self, host, uri, method="POST"):
        logging.Handler.__init__(self)
        self.url = "%s/%s" % (host, uri)
        method = method.upper()        if method not in ["GET", "POST"]:            raise ValueError("method must be GET or POST")
        self.method = method    # 使用aiohttp封装发送数据函数
    async def submit(self, data):
        timeout = aiohttp.ClientTimeout(total=6)        if self.method == "GET":            if self.url.find("?") >= 0:
                sep = '&'
            else:
                sep = '?'
            url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": data}))            async with aiohttp.ClientSession(timeout=timeout) as session:                async with session.get(url) as resp:
                    print(await resp.text())        else:
            headers = {                "Content-type": "application/x-www-form-urlencoded",
            }            async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session:                async with session.post(self.url, data={'log': data}) as resp:
                    print(await resp.text())        return True

    def emit(self, record):
        msg = self.format(record)
        loop.create_task(self.submit(msg))# 添加一个httphandlerhttp_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)

logger.debug("今天天气不错")
logger.debug("是风和日丽的")

loop.run_forever()复制代码

这时脚本就可以正常的异步执行了.

loop.create_task(self.submit(msg)) 也可以使用

asyncio.ensure_future(self.submit(msg), loop=loop)

来代替,目的都是将协程对象注册到事件循环中.

但这种方式有一点要注意,loop.run_forever() 将会一直阻塞,所以需要有个地方调用loop.stop()方法. 可以注册到某个task的回调中.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer