Home  >  Article  >  Backend Development  >  How to send logs to a remote server asynchronously in Python

How to send logs to a remote server asynchronously in Python

WBOY
WBOYforward
2023-05-11 10:31:051233browse

StreamHandler and FileHandler

First, let’s write a simple set of code to output to cmd and files:

# -*- coding: utf-8 -*-
"""
-------------------------------------------------
 File Name:   loger
 Description :
 Author :    yangyanxing
 date:     2020/9/23
-------------------------------------------------
"""
import logging
import sys
import os
# 初始化logger
logger = logging.getLogger("yyx")
logger.setLevel(logging.DEBUG)
# 设置日志格式
fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d
%H:%M:%S')
# 添加cmd handler
cmd_handler = logging.StreamHandler(sys.stdout)
cmd_handler.setLevel(logging.DEBUG)
cmd_handler.setFormatter(fmt)
# 添加文件的handler
logpath = os.path.join(os.getcwd(), 'debug.log')
file_handler = logging.FileHandler(logpath)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
# 将cmd和file handler添加到logger中
logger.addHandler(cmd_handler)
logger.addHandler(file_handler)
logger.debug("今天天气不错")

First initialize a logger and set it up The log level is DEBUG, then initialize cmd_handler and file_handler, and finally add them to logger, run the script, and

will be printed in cmd[2020-09-23 10:45: 56] [DEBUG] The weather is good today and it will be written to the debug.log file in the current directory

Add HTTPHandler

If you want to send the log to the remote when recording On the server, you can add an HTTPHandler. In the python standard library logging.handler, many handlers have been defined for us. Some of them can be used directly. We can use tornado locally to write an interface for receiving logs and print out all the received parameters.

# 添加一个httphandler
import logging.handlers
http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天气不错")
结果在服务端我们收到了很多信息

{
'name': [b 'yyx'],
'msg': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'args': [b '()'],
'levelname': [b 'DEBUG'],
'levelno': [b '10'],
'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'],
'filename': [b 'loger.py'],
'module': [b 'loger'],
'exc_info': [b 'None'],
'exc_text': [b 'None'],
'stack_info': [b 'None'],
'lineno': [b '41'],
&#39;funcName&#39;: [b &#39;<module>&#39;],
&#39;created&#39;: [b &#39;1600831054.8881223&#39;],
&#39;msecs&#39;: [b &#39;888.1223201751709&#39;],
&#39;relativeCreated&#39;: [b &#39;22.99976348876953&#39;],
&#39;thread&#39;: [b &#39;14876&#39;],
&#39;threadName&#39;: [b &#39;MainThread&#39;],
&#39;processName&#39;: [b &#39;MainProcess&#39;],
&#39;process&#39;: [b &#39;8648&#39;],
&#39;message&#39;: [b
&#39;\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99&#39;],
&#39;asctime&#39;: [b &#39;2020-09-23 11:17:34&#39;]
}

It can be said that there is a lot of information, but it is not what we want. We just want something similar to

[2020-09-23 10:45: 56][DEBUG] The weather is good todaySuch a log
logging.handlers.HTTPHandler simply sends all the log information to the server. As for how the server organizes the content, it is done by the server. So we There are two methods. One is to change the server code and reorganize the log content according to the passed log information. The second is to rewrite a class and let it send the reformatted log content to when sending. Server.

We use the second method because this method is more flexible. The server is only used for recording, and the client should decide what content to send.

We need to redefine a class. We can refer to the logging.handlers.HTTPHandler class and rewrite an httpHandler class.

Each log class needs to override the emit method to record What is actually executed when logging is the emit method:

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  def emit(self, record):
    &#39;&#39;&#39;
   重写emit方法,这里主要是为了把初始化时的baseParam添加进来
   :param record:
   :return:
   &#39;&#39;&#39;
    msg = self.format(record)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = &#39;&&#39;
      else:
        sep = &#39;?&#39;
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      requests.get(url, timeout=1)
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      requests.post(self.url, data={&#39;log&#39;: msg}, headers=headers,
timeout=1)

There is a line in the above code that defines the parameters to be sent, msg = self.format(record). This line of code indicates that it will be set according to the log object. The corresponding content is returned in the format.

Then send the content through the requests library. Regardless of using the get or post method, the server can receive the log normally.

{&#39;log&#39;: [b&#39;[2020-09-23 11:39:45] [DEBUG]
\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99&#39;]}

Convert the bytes type and you will get it. :

[2020-09-23 11:43:50] [DEBUG] The weather is good today

Asynchronously sending remote logs

Now we consider a problem. When the log is sent to the remote server, if the remote server processes it very slowly, it will take a certain amount of time. Then the log recording will slow down. Modify the server log processing. class, let it pause for 5 seconds, and simulate a long processing process

async def post(self):
  print(self.getParam(&#39;log&#39;))
  await asyncio.sleep(5)
  self.write({"msg": &#39;ok&#39;})

At this time, we print the above log:

logger.debug("今天天气不错")
logger.debug("是风和日丽的")

The output obtained For:

[2020-09-23 11:47:33] [DEBUG] The weather is good today
[2020-09-23 11:47:38] [DEBUG] It's windy and sunny

We noticed that the time interval between them is also 5 seconds.
Now comes the problem. It was originally just a log, but now it has become a burden that drags down the entire script, so we need to handle remote log writing asynchronously.

1 Use multi-thread processing

The first thing to think about is that you should use multiple threads to execute the log sending method;

def emit(self, record):
  msg = self.format(record)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = &#39;&&#39;
    else:
      sep = &#39;?&#39;
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    t = threading.Thread(target=requests.get, args=(url,))
    t.start()
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    t = threading.Thread(target=requests.post, args=(self.url,), kwargs=
{"data":{&#39;log&#39;: msg},

This method is possible The main purpose of not blocking is achieved, but each time a log is printed, a thread needs to be opened, which is also a waste of resources. We can also use the thread pool to process

2 Use the thread pool to process

There are ThreadPoolExecutor and ProcessPoolExecutor classes in python's concurrent.futures, which are thread pools and process pools. They are first used during initialization. Define several threads, and then let these threads handle the corresponding functions, so that you do not need to create new threads every time

Basic use of thread pool:

exector = ThreadPoolExecutor(max_workers=1) # 初始化一个线程池,只有一个线程
exector.submit(fn, args, kwargs) # 将函数submit到线程池中

If There are n threads in the thread pool. When the number of submitted tasks is greater than n, the excess tasks will be placed in the queue.
Modify the above emit function again

exector = ThreadPoolExecutor(max_workers=1)
def emit(self, record):
  msg = self.format(record)
  timeout = aiohttp.ClientTimeout(total=6)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = &#39;&&#39;
    else:
      sep = &#39;?&#39;
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    exector.submit(requests.get, url, timeout=6)
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    exector.submit(requests.post, self.url, data={&#39;log&#39;: msg},
headers=headers, timeout=6)

Why do we only initialize a thread pool with only one thread? Because this can ensure that the logs in the advanced queue will be sent first. If there are multiple threads in the pool, Threads, the order is not necessarily guaranteed.

3 Use the asynchronous aiohttp library to send requests

The emit method in the CustomHandler class above uses requests.post to send logs. The requests themselves are blocked and run, which is why Its existence makes the script stuck for a long time, so we can replace the blocking requests library with asynchronous aiohttp to execute the get and post methods, and rewrite the emit method in a CustomHandler

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  async def emit(self, record):
    msg = self.format(record)
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = &#39;&&#39;
      else:
        sep = &#39;?&#39;
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
      async with session.get(self.url) as resp:
          print(await resp.text())
      else:
        headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
      async with session.post(self.url, data={&#39;log&#39;: msg}) as resp:
          print(await resp.text())

At this time, the code execution crashed:

C:\Python37\lib\logging\__init__.py:894: RuntimeWarning: coroutine
&#39;CustomHandler.emit&#39; was never awaited
self.emit(record)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

The server did not receive the request to send the log.
The reason is that because the async with session.post function is used in the emit method, it needs to be executed in a function modified with async, so the emit function is modified and modified with async, where the emit function becomes an asynchronous function. , a coroutine object is returned. To execute the coroutine object, you need to use await, but await emit() is not called anywhere in the script, so the crash information shows that coroutine 'CustomHandler.emit' was never awaited.

既然emit方法返回的是一个coroutine对象,那么我们将它放一个loop中执行

async def main():
  await logger.debug("今天天气不错")
  await logger.debug("是风和日丽的")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

执行依然报错:

raise TypeError('An asyncio.Future, a coroutine or an awaitable is '

意思是需要的是一个coroutine,但是传进来的对象不是。
这似乎就没有办法了,想要使用异步库来发送,但是却没有可以调用await的地方。

解决办法是有的,我们使用 asyncio.get_event_loop() 获取一个事件循环对象, 我们可以在这个对象上注册很多协程对象,这样当执行事件循环的时候,就是去执行注册在该事件循环上的协程,

我们通过一个小例子来看一下:

import asyncio
async def test(n):
 while n > 0:
   await asyncio.sleep(1)
   print("test {}".format(n))
   n -= 1
 return n

async def test2(n):
 while n >0:
   await asyncio.sleep(1)
   print("test2 {}".format(n))
   n -= 1
def stoploop(task):
 print("执行结束, task n is {}".format(task.result()))
 loop.stop()
loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task2 = loop.create_task(test2(3))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
loop.run_forever()

我们使用 loop = asyncio.get_event_loop() 创建了一个事件循环对象loop, 并且在loop上创建了两个task, 并且给task1添加了一个回调函数,在task1它执行结束以后,将loop停掉。
注意看上面的代码,我们并没有在某处使用await来执行协程,而是通过将协程注册到某个事件循环对象上, 然后调用该循环的 run_forever() 函数,从而使该循环上的协程对象得以正常的执行。

上面得到的输出为:

test 5
test2 3
test 4
test2 2
test 3
test2 1
test 2
test 1
执行结束, task n is 0

可以看到,使用事件循环对象创建的task,在该循环执行run_forever() 以后就可以执行了如果不执行 loop.run_forever() 函数,则注册在它上面的协程也不会执行

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
time.sleep(5)
# loop.run_forever()

上面的代码将loop.run_forever() 注释掉,换成time.sleep(5) 停5秒, 这时脚本不会有任何输出,在停了5秒 以后就中止了,
回到之前的日志发送远程服务器的代码,我们可以使用aiohttp封装一个发送数据的函数, 然后在emit中将 这个函数注册到全局的事件循环对象loop中,最后再执行loop.run_forever()

loop = asyncio.get_event_loop()
class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  # 使用aiohttp封装发送数据函数
  async def submit(self, data):
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if self.url.find("?") >= 0:
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
data}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get(url) as resp:
          print(await resp.text())
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
        async with session.post(self.url, data={'log': data}) as resp:
          print(await resp.text())
    return True
  def emit(self, record):
    msg = self.format(record)
    loop.create_task(self.submit(msg))
# 添加一个httphandler
http_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天气不错")
logger.debug("是风和日丽的")
loop.run_forever()

这时脚本就可以正常的异步执行了:

loop.create_task(self.submit(msg)) 也可以使用
asyncio.ensure_future(self.submit(msg), loop=loop) 来代替,目的都是将协程对象注册到事件循环中。

但这种方式有一点要注意,loop.run_forever() 将会一直阻塞,所以需要有个地方调用 loop.stop() 方法. 可以注册到某个task的回调中。

The above is the detailed content of How to send logs to a remote server asynchronously in Python. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete