Maison  >  Article  >  développement back-end  >  Comment envoyer des journaux à un serveur distant de manière asynchrone en Python

Comment envoyer des journaux à un serveur distant de manière asynchrone en Python

WBOY
WBOYavant
2023-05-11 10:31:051231parcourir

StreamHandler et FileHandler

Tout d'abord, écrivons un ensemble simple de code à afficher dans cmd et dans les fichiers :

# -*- coding: utf-8 -*-
"""
-------------------------------------------------
 File Name:   loger
 Description :
 Author :    yangyanxing
 date:     2020/9/23
-------------------------------------------------
"""
import logging
import sys
import os
# 初始化logger
logger = logging.getLogger("yyx")
logger.setLevel(logging.DEBUG)
# 设置日志格式
fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d
%H:%M:%S')
# 添加cmd handler
cmd_handler = logging.StreamHandler(sys.stdout)
cmd_handler.setLevel(logging.DEBUG)
cmd_handler.setFormatter(fmt)
# 添加文件的handler
logpath = os.path.join(os.getcwd(), 'debug.log')
file_handler = logging.FileHandler(logpath)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
# 将cmd和file handler添加到logger中
logger.addHandler(cmd_handler)
logger.addHandler(file_handler)
logger.debug("今天天气不错")

Initialisez d'abord un enregistreur et définissez son niveau de journalisation sur DEBUG, puis initialisez cmd_handler et file_handler. Enfin, ajoutez au logger, exécutez le script, et il sera imprimé en cmd

[2020-09-23 10:45:56] [DEBUG] Il fait beau aujourd'hui et il sera écrit au fichier actuel Ajoutez HTTPHandler au fichier debug.log dans le répertoire Si vous souhaitez envoyer le journal au serveur distant lors de l'enregistrement, vous pouvez ajouter un HTTPHandler Dans la bibliothèque standard python logging.handler, de nombreux gestionnaires ont été définis pour. nous. , certains que nous pouvons utiliser directement, utiliser tornado localement pour écrire une interface de réception des logs, et imprimer tous les paramètres reçus[2020-09-23 10:45:56] [DEBUG] 今天天气不错且会写入到当前目录下的debug.log文件中

添加HTTPHandler

如果想要在记录时将日志发送到远程服务器上,可以添加一个 HTTPHandler , 在python标准库logging.handler中,已经为我们定义好了很多handler,有些我们可以直接用,本地使用tornado写一个接收 日志的接口,将接收到的参数全都打印出来

# 添加一个httphandler
import logging.handlers
http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天气不错")
结果在服务端我们收到了很多信息

{
'name': [b 'yyx'],
'msg': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'args': [b '()'],
'levelname': [b 'DEBUG'],
'levelno': [b '10'],
'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'],
'filename': [b 'loger.py'],
'module': [b 'loger'],
'exc_info': [b 'None'],
'exc_text': [b 'None'],
'stack_info': [b 'None'],
'lineno': [b '41'],
&#39;funcName&#39;: [b &#39;<module>&#39;],
&#39;created&#39;: [b &#39;1600831054.8881223&#39;],
&#39;msecs&#39;: [b &#39;888.1223201751709&#39;],
&#39;relativeCreated&#39;: [b &#39;22.99976348876953&#39;],
&#39;thread&#39;: [b &#39;14876&#39;],
&#39;threadName&#39;: [b &#39;MainThread&#39;],
&#39;processName&#39;: [b &#39;MainProcess&#39;],
&#39;process&#39;: [b &#39;8648&#39;],
&#39;message&#39;: [b
&#39;\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99&#39;],
&#39;asctime&#39;: [b &#39;2020-09-23 11:17:34&#39;]
}

可以说是信息非常之多,但是却并不是我们想要的样子,我们只是想要类似于

[2020-09-23 10:45:56][DEBUG] 今天天气不错

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  def emit(self, record):
    &#39;&#39;&#39;
   重写emit方法,这里主要是为了把初始化时的baseParam添加进来
   :param record:
   :return:
   &#39;&#39;&#39;
    msg = self.format(record)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = &#39;&&#39;
      else:
        sep = &#39;?&#39;
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      requests.get(url, timeout=1)
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      requests.post(self.url, data={&#39;log&#39;: msg}, headers=headers,
timeout=1)

On peut dire qu'il y a beaucoup d'informations, mais ce n'est pas ce que nous voulons, nous voulons juste Un journal similaire à

[2020-09-23 10:45:56][DEBUG] Il fait beau aujourd'hui

logging.handlers.HTTPHandler envoie simplement toutes les informations du journal au serveur, Quant à la façon dont le serveur organise le contenu, cela est fait par le serveur. Nous pouvons donc avoir deux méthodes. La première consiste à modifier le code du serveur et à réorganiser le contenu du journal en fonction des informations transmises. . classe, lui permettant de reformater le contenu du journal sur le serveur lors de l'envoi.

Nous utilisons la deuxième méthode car cette méthode est plus flexible. Le serveur n'est utilisé que pour l'enregistrement et le client doit décider quel contenu envoyer.

Nous devons redéfinir une classe. Nous pouvons nous référer à la classe logging.handlers.HTTPHandler et réécrire une classe httpHandler

Chaque classe de journal doit remplacer la méthode d'émission lors de l'enregistrement des journaux, c'est cette émission qui est en fait. exécuté. Méthode :

{&#39;log&#39;: [b&#39;[2020-09-23 11:39:45] [DEBUG]
\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99&#39;]}

Il y a une ligne dans le code ci-dessus qui définit le paramètre envoyé msg = self.format(record). Cette ligne de code signifie que le contenu correspondant sera renvoyé selon le format défini par le log. objet.

Envoyez ensuite le contenu via la bibliothèque de requêtes. Quelle que soit l'utilisation de la méthode get ou post, le serveur peut recevoir le journal normalement

async def post(self):
  print(self.getParam(&#39;log&#39;))
  await asyncio.sleep(5)
  self.write({"msg": &#39;ok&#39;})

Convertissez le type d'octets pour obtenir :

[2020-09- 23 11 : 43:50] [DEBUG] Il fait beau aujourd'hui

Envoi asynchrone des journaux distants

Maintenant, nous considérons un problème lorsque le journal est envoyé au serveur distant, si le serveur distant le traite très lentement, il le fera. prend beaucoup de temps. Après un certain temps, l'enregistrement du journal ralentira. Modifiez la classe de traitement des journaux du serveur et laissez-la s'arrêter pendant 5 secondes pour simuler un long processus de traitement

logger.debug("今天天气不错")
logger.debug("是风和日丽的")

À ce moment, nous imprimerons. le journal ci-dessus :

def emit(self, record):
  msg = self.format(record)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = &#39;&&#39;
    else:
      sep = &#39;?&#39;
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    t = threading.Thread(target=requests.get, args=(url,))
    t.start()
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    t = threading.Thread(target=requests.post, args=(self.url,), kwargs=
{"data":{&#39;log&#39;: msg},

Le résultat obtenu est :


[2020-09-23 11:47:33] [DEBUG] Il fait beau aujourd'hui

[2020-09-23 11:47:38 ] [DEBUG] Il fait beau et ensoleillé

Nous avons remarqué que leur intervalle de temps est également de 5 secondes.

Vient maintenant le problème. À l'origine, ce n'était qu'un journal, mais c'est maintenant devenu un fardeau qui ralentit l'ensemble du script, nous devons donc gérer l'écriture du journal à distance de manière asynchrone.

1 Utiliser le traitement multi-thread

La première chose à penser est d'utiliser multi-threads pour exécuter la méthode d'envoi de journal ;

exector = ThreadPoolExecutor(max_workers=1) # 初始化一个线程池,只有一个线程
exector.submit(fn, args, kwargs) # 将函数submit到线程池中

Cette méthode peut atteindre l'objectif principal de ne pas bloquer, mais à chaque fois qu'un journal est imprimé, un fil de discussion doit être ouvert, ce qui est également un gaspillage de ressources. Nous pouvons également utiliser des pools de threads pour traiter

2 Utiliser des pools de threads pour traiter

Le concurrent.futures de Python a les classes ThreadPoolExecutor et ProcessPoolExecutor, qui sont des pools de threads et des pools de processus. Ils définissent d'abord plusieurs threads lors de l'initialisation, puis laissent ces threads fonctionner. traiter les fonctions correspondantes, afin que vous n'ayez pas besoin de créer un nouveau thread à chaque fois

Utilisation de base du pool de threads :

exector = ThreadPoolExecutor(max_workers=1)
def emit(self, record):
  msg = self.format(record)
  timeout = aiohttp.ClientTimeout(total=6)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = &#39;&&#39;
    else:
      sep = &#39;?&#39;
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    exector.submit(requests.get, url, timeout=6)
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    exector.submit(requests.post, self.url, data={&#39;log&#39;: msg},
headers=headers, timeout=6)

S'il y a n threads dans le pool de threads, lorsque le nombre de tâches soumises est supérieur à n, la tâche redondante sera placée dans la file d'attente.

Modifiez à nouveau la fonction d'émission ci-dessus

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  async def emit(self, record):
    msg = self.format(record)
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = &#39;&&#39;
      else:
        sep = &#39;?&#39;
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
      async with session.get(self.url) as resp:
          print(await resp.text())
      else:
        headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
      async with session.post(self.url, data={&#39;log&#39;: msg}) as resp:
          print(await resp.text())

Pourquoi initialisons-nous uniquement un pool de threads avec un seul thread ici Parce que de cette façon, il peut être garanti que les journaux de la file d'attente avancée seront envoyés en premier s'il y a plusieurs threads ? dans la piscine, ce n'est pas forcément garanti.

3 Utilisez la bibliothèque asynchrone aiohttp pour envoyer des requêtes

La méthode d'émission de la classe CustomHandler ci-dessus utilise request.post pour envoyer des journaux. Cette requête elle-même est bloquante et en cours d'exécution, et c'est précisément à cause de son existence que le script reste bloqué. . Après un long moment, nous pouvons remplacer la bibliothèque de requêtes de blocage par aiohttp asynchrone pour exécuter les méthodes get et post, et réécrire la méthode émet dans un CustomHandler

C:\Python37\lib\logging\__init__.py:894: RuntimeWarning: coroutine
&#39;CustomHandler.emit&#39; was never awaited
self.emit(record)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

À ce moment, l'exécution du code plante :

async def main():
  await logger.debug("今天天气不错")
  await logger.debug("是风和日丽的")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
🎜 Le Le serveur n'a pas reçu de demande d'envoi de journaux. 🎜La raison est que parce que la fonction async avec session.post est utilisée dans la méthode d'émission, elle doit être exécutée dans une fonction modifiée avec async, donc la fonction d'émission est modifiée et modifiée avec async. La fonction d'émission devient ici asynchrone. La fonction et renvoie est un objet coroutine. Pour exécuter l'objet coroutine, vous devez utiliser wait, mais wait submit() n'est appelé nulle part dans le script, donc les informations de crash montrent que la coroutine 'CustomHandler.emit' n'a jamais été attendue. 🎜

既然emit方法返回的是一个coroutine对象,那么我们将它放一个loop中执行

async def main():
  await logger.debug("今天天气不错")
  await logger.debug("是风和日丽的")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

执行依然报错:

raise TypeError('An asyncio.Future, a coroutine or an awaitable is '

意思是需要的是一个coroutine,但是传进来的对象不是。
这似乎就没有办法了,想要使用异步库来发送,但是却没有可以调用await的地方。

解决办法是有的,我们使用 asyncio.get_event_loop() 获取一个事件循环对象, 我们可以在这个对象上注册很多协程对象,这样当执行事件循环的时候,就是去执行注册在该事件循环上的协程,

我们通过一个小例子来看一下:

import asyncio
async def test(n):
 while n > 0:
   await asyncio.sleep(1)
   print("test {}".format(n))
   n -= 1
 return n

async def test2(n):
 while n >0:
   await asyncio.sleep(1)
   print("test2 {}".format(n))
   n -= 1
def stoploop(task):
 print("执行结束, task n is {}".format(task.result()))
 loop.stop()
loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task2 = loop.create_task(test2(3))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
loop.run_forever()

我们使用 loop = asyncio.get_event_loop() 创建了一个事件循环对象loop, 并且在loop上创建了两个task, 并且给task1添加了一个回调函数,在task1它执行结束以后,将loop停掉。
注意看上面的代码,我们并没有在某处使用await来执行协程,而是通过将协程注册到某个事件循环对象上, 然后调用该循环的 run_forever() 函数,从而使该循环上的协程对象得以正常的执行。

上面得到的输出为:

test 5
test2 3
test 4
test2 2
test 3
test2 1
test 2
test 1
执行结束, task n is 0

可以看到,使用事件循环对象创建的task,在该循环执行run_forever() 以后就可以执行了如果不执行 loop.run_forever() 函数,则注册在它上面的协程也不会执行

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
time.sleep(5)
# loop.run_forever()

上面的代码将loop.run_forever() 注释掉,换成time.sleep(5) 停5秒, 这时脚本不会有任何输出,在停了5秒 以后就中止了,
回到之前的日志发送远程服务器的代码,我们可以使用aiohttp封装一个发送数据的函数, 然后在emit中将 这个函数注册到全局的事件循环对象loop中,最后再执行loop.run_forever()

loop = asyncio.get_event_loop()
class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  # 使用aiohttp封装发送数据函数
  async def submit(self, data):
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if self.url.find("?") >= 0:
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
data}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get(url) as resp:
          print(await resp.text())
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
        async with session.post(self.url, data={'log': data}) as resp:
          print(await resp.text())
    return True
  def emit(self, record):
    msg = self.format(record)
    loop.create_task(self.submit(msg))
# 添加一个httphandler
http_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天气不错")
logger.debug("是风和日丽的")
loop.run_forever()

这时脚本就可以正常的异步执行了:

loop.create_task(self.submit(msg)) 也可以使用
asyncio.ensure_future(self.submit(msg), loop=loop) 来代替,目的都是将协程对象注册到事件循环中。

但这种方式有一点要注意,loop.run_forever() 将会一直阻塞,所以需要有个地方调用 loop.stop() 方法. 可以注册到某个task的回调中。

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer