>  기사  >  백엔드 개발  >  Python에서 원격 서버에 로그를 비동기적으로 보내는 방법 이해

Python에서 원격 서버에 로그를 비동기적으로 보내는 방법 이해

coldplay.xixi
coldplay.xixi앞으로
2020-10-22 18:51:552469검색

python 동영상 튜토리얼 칼럼에서는 Python에서 로그를 원격 서버에 비동기적으로 전송하는 방법을 설명합니다.

Python에서 원격 서버에 로그를 비동기적으로 보내는 방법 이해

파이썬에서 로그를 사용하는 가장 일반적인 방법은 콘솔과 파일에 로그를 출력하는 것입니다. 로깅 모듈은 사용하기 매우 편리한 해당 클래스도 제공하지만 때로는 다음과 같은 몇 가지 요구 사항이 있을 수 있습니다. 원격 엔드에 로그를 보내거나 데이터베이스에 직접 써야 하는 경우, 이 요구 사항을 충족하는 방법은 무엇입니까?

1. StreamHandler 및 FileHandler

먼저 cmd 및 파일에 간단한 출력 세트를 작성해 보겠습니다. 먼저 코드

# -*- coding: utf-8 -*-"""
-------------------------------------------------
   File Name:     loger
   Description :
   Author :       yangyanxing
   date:          2020/9/23
-------------------------------------------------
"""import loggingimport sysimport os# 初始化loggerlogger = logging.getLogger("yyx")
logger.setLevel(logging.DEBUG)# 设置日志格式fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S')# 添加cmd handlercmd_handler = logging.StreamHandler(sys.stdout)
cmd_handler.setLevel(logging.DEBUG)
cmd_handler.setFormatter(fmt)# 添加文件的handlerlogpath = os.path.join(os.getcwd(), 'debug.log')
file_handler = logging.FileHandler(logpath)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)# 将cmd和file handler添加到logger中logger.addHandler(cmd_handler)
logger.addHandler(file_handler)

logger.debug("今天天气不错")复制代码

로거를 초기화하고 로그 수준을 DEBUG로 설정한 다음 cmd_handler 및 file_handler를 초기화하고 마지막으로 이를 로거에 추가하고 스크립트를 실행하고 [2020-09-23 10:45:56] [DEBUG] 오늘 날씨가 좋다 현재 디렉터리의 debug.log 파일에 기록됩니다.[2020-09-23 10:45:56] [DEBUG] 今天天气不错 且会写入到当前目录下的debug.log文件中.

二、添加HTTPHandler

如果想要在记录时将日志发送到远程服务器上,可以添加一个 HTTPHandler , 在python标准库logging.handler中,已经为我们定义好了很多handler,有些我们可以直接用,本地使用tornado写一个接收日志的接口,将接收到的参数全都打印出来

# 添加一个httphandlerimport logging.handlers
http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)

logger.debug("今天天气不错")复制代码

结果在服务端我们收到了很多信息

{
    'name': [b 'yyx'],
    'msg': [b '\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
    'args': [b '()'],
    'levelname': [b 'DEBUG'],
    'levelno': [b '10'],
    'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'],
    'filename': [b 'loger.py'],
    'module': [b 'loger'],
    'exc_info': [b 'None'],
    'exc_text': [b 'None'],
    'stack_info': [b 'None'],
    'lineno': [b '41'],
    'funcName': [b '<module>'],
    'created': [b '1600831054.8881223'],
    'msecs': [b '888.1223201751709'],
    'relativeCreated': [b '22.99976348876953'],
    'thread': [b '14876'],
    'threadName': [b 'MainThread'],
    'processName': [b 'MainProcess'],
    'process': [b '8648'],
    'message': [b '\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
    'asctime': [b '2020-09-23 11:17:34']
}复制代码

可以说是信息非常之多,但是却并不是我们想要的样子,我们只是想要类似于 [2020-09-23 10:45:56] [DEBUG] 今天天气不错 这样的日志.

logging.handlers.HTTPHandler 只是简单的将日志所有信息发送给服务端,至于服务端要怎么组织内容是由服务端来完成. 所以我们可以有两种方法,一种是改服务端代码,根据传过来的日志信息重新组织一下日志内容, 第二种是我们重新写一个类,让它在发送的时候将重新格式化日志内容发送到服务端.

我们采用第二种方法,因为这种方法比较灵活, 服务端只是用于记录,发送什么内容应该是由客户端来决定。

我们需要重新定义一个类,我们可以参考logging.handlers.HTTPHandler 这个类,重新写一个httpHandler类

每个日志类都需要重写emit方法,记录日志时真正要执行是也就是这个emit方法

class CustomHandler(logging.Handler):
    def __init__(self, host, uri, method="POST"):
        logging.Handler.__init__(self)
        self.url = "%s/%s" % (host, uri)
        method = method.upper()        if method not in ["GET", "POST"]:            raise ValueError("method must be GET or POST")
        self.method = method    def emit(self, record):
        '''
        :param record:
        :return:
        '''
        msg = self.format(record)        if self.method == "GET":            if (self.url.find("?") >= 0):
                sep = '&'
            else:
                sep = '?'
            url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
            requests.get(url, timeout=1)        else:
            headers = {                "Content-type": "application/x-www-form-urlencoded",                "Content-length": str(len(msg))
            }
            requests.post(self.url, data={'log': msg}, headers=headers, timeout=1)复制代码

上面代码中有一行定义发送的参数 msg = self.format(record)

2. HTTPHandler를 추가하세요

로그를 보내고 싶다면 기록할 때 원격 서버에 HTTPHandler를 추가할 수 있습니다. Python 표준 라이브러리인 login.handler에는 많은 핸들러가 정의되어 있으며 그 중 일부는 직접 사용하거나 토네이도를 로컬에서 사용할 수 있습니다. 로그 수신을 위한 인터페이스를 작성하고, 수신된 모든 매개변수를 인쇄합니다

async def post(self):
    print(self.getParam('log'))    await asyncio.sleep(5)
    self.write({"msg": 'ok'})复制代码

결과적으로 서버 측에서 많은 정보를 받았습니다[2020-09-23 11:43:50] [DEBUG] 今天天气不错

logger.debug("今天天气不错")
logger.debug("是风和日丽的")复制代码
정보가 많다고 할 수 있지만, 우리가 원하는 것은 아닙니다. [2020-09-23 10:45:56] [DEBUG] 오늘 날씨가 좋습니다.

logging.handlers.HTTPHandler는 단순히 모든 로그 정보를 서버에 보냅니다. 서버가 콘텐츠를 구성하는 방법은 두 가지 방법이 있습니다. 하나는 전달된 로그 정보를 기반으로 서버 코드를 변경하는 것입니다. 두 번째 방법은 전송 시 로그 내용을 서버에 다시 포맷할 수 있도록 클래스를 다시 작성하는 것입니다. <p></p>이 방법은 서버가 기록에만 사용되기 때문에 두 번째 방법을 사용합니다. 어떤 콘텐츠를 보낼지 결정하는 것은 클라이언트의 몫입니다. <p></p>클래스를 재정의해야 합니다. <code>logging.handlers.HTTPHandler 클래스를 참조하여 httpHandler 클래스를 다시 작성할 수 있습니다.

각 로그 클래스에서 실제로 필요한 것은 무엇입니까? 기록 로그? 실행은 이 방출 방법입니다

[2020-09-23 11:47:33] [DEBUG] 今天天气不错
[2020-09-23 11:47:38] [DEBUG] 是风和日丽的复制代码
위 코드에는 전송할 매개변수를 정의하는 줄이 있습니다. msg = self.format(record)

이 코드 줄은 다음을 의미합니다. 해당 콘텐츠에 해당하는 로그 개체에 의해 설정된 형식에 따라 반환됩니다. 그 후에는 요청 라이브러리를 통해 콘텐츠가 전송됩니다. get 또는 post 방법을 사용하더라도 서버는 정상적으로 로그를 수신할 수 있습니다.

3. 원격 로그를 비동기적으로 보냅니다.

이제 문제를 생각해 보겠습니다. 원격 서버 프로세스 중 원격 서버의 처리 속도가 매우 느린 경우 일정 시간이 걸리며 이후 로그 기록이 느려지므로 서버 로그 처리 클래스를 수정하여 5분간 일시 중지하도록 합니다.

def emit(self, record):
    msg = self.format(record)    if self.method == "GET":        if (self.url.find("?") >= 0):
            sep = '&'
        else:
            sep = '?'
        url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
        t = threading.Thread(target=requests.get, args=(url,))
        t.start()    else:
        headers = {            "Content-type": "application/x-www-form-urlencoded",            "Content-length": str(len(msg))
        }
        t = threading.Thread(target=requests.post, args=(self.url,), kwargs={"data":{'log': msg}, "headers":headers})
        t.start()复制代码

이제 위의 로그를 인쇄합니다.

exector = ThreadPoolExecutor(max_workers=1) # 初始化一个线程池,只有一个线程exector.submit(fn, args, kwargs) # 将函数submit到线程池中复制代码

얻어진 출력은

exector = ThreadPoolExecutor(max_workers=1)def emit(self, record):
    msg = self.format(record)
    timeout = aiohttp.ClientTimeout(total=6)    if self.method == "GET":        if (self.url.find("?") >= 0):
            sep = '&'
        else:
            sep = '?'
        url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
        exector.submit(requests.get, url, timeout=6)    else:
        headers = {            "Content-type": "application/x-www-form-urlencoded",            "Content-length": str(len(msg))
        }
        exector.submit(requests.post, self.url, data={'log': msg}, headers=headers, timeout=6)复制代码

그 사이의 시간 간격도 5초입니다.

이제 문제가 발생합니다. 원래 로그였던 것이 이제 전체 스크립트를 끌어내리는 부담이 되었기 때문에 원격 로그 작성을 비동기적으로 처리해야 합니다.

3.1 멀티 스레드 처리 사용

먼저 생각해야 할 것은 멀티 스레드를 사용하여 로그 전송 방법을 실행하는 것입니다

class CustomHandler(logging.Handler):
    def __init__(self, host, uri, method="POST"):
        logging.Handler.__init__(self)
        self.url = "%s/%s" % (host, uri)
        method = method.upper()        if method not in ["GET", "POST"]:            raise ValueError("method must be GET or POST")
        self.method = method    async def emit(self, record):
        msg = self.format(record)
        timeout = aiohttp.ClientTimeout(total=6)        if self.method == "GET":            if (self.url.find("?") >= 0):
                sep = '&'
            else:
                sep = '?'
            url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))            async with aiohttp.ClientSession(timeout=timeout) as session:                async with session.get(self.url) as resp:
                    print(await resp.text())        else:
            headers = {                "Content-type": "application/x-www-form-urlencoded",                "Content-length": str(len(msg))
            }            async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session:                async with session.post(self.url, data={'log': msg}) as resp:
                    print(await resp.text())复制代码

이 방법은 차단하지 않는다는 주요 목적을 달성할 수 있지만 로그가 인쇄될 때마다 스레드를 열어야 하는데 이는 리소스 낭비입니다.

3.2 스레드 풀을 사용하여 처리

Python의 Concurrent.futures에는 스레드 풀과 프로세스 풀인 ThreadPoolExecutor 및 ProcessPoolExecutor 클래스가 있습니다. 이들은 초기화 중에 먼저 여러 스레드를 정의한 다음입니다. 이 스레드가 해당 기능을 처리하게 하므로 매번 새 스레드를 생성할 필요가 없습니다

🎜스레드 풀의 기본 사용법🎜
C:\Python37\lib\logging\__init__.py:894: RuntimeWarning: coroutine 'CustomHandler.emit' was never awaited
  self.emit(record)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback复制代码
🎜스레드 풀에 n개의 스레드가 있는 경우 제출된 작업 수가 n보다 크면 초과 작업은 대기열에 저장됩니다.🎜🎜위의 내보내기 기능을 다시 수정하세요🎜
async def main():
    await logger.debug("今天天气不错")    await logger.debug("是风和日丽的")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())复制代码
🎜여기서 스레드 풀을 하나만 사용하여 초기화하는 이유는 무엇입니까? 풀에 여러 스레드가 있는 경우 고급 대기열이 먼저 전송됩니다. 순서가 반드시 보장되는 것은 아닙니다. 🎜🎜3.3 비동기 aiohttp 라이브러리를 사용하여 요청 보내기🎜🎜위 CustomHandler 클래스의 내보내기 메서드는 요청 자체를 실행에서 차단하고 스크립트가 중단되는 것은 바로 요청 자체를 차단하는 것입니다. .오랜 시간이 지나면 차단 요청 라이브러리를 비동기식 aiohttp로 대체하여 get 및 post 메서드를 실행하고 CustomHandler🎜
raise TypeError('An asyncio.Future, a coroutine or an awaitable is '复制代码
🎜에서 내보내기 메서드를 다시 작성할 수 있습니다. 이때 코드 실행이 중단됩니다🎜
C:\Python37\lib\logging\__init__.py:894: RuntimeWarning: coroutine 'CustomHandler.emit' was never awaited
  self.emit(record)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback复制代码

服务端也没有收到发送日志的请求。

究其原因是由于emit方法中使用async with session.post 函数,它需要在一个使用async 修饰的函数里执行,所以修改emit函数,使用async来修饰,这里emit函数变成了异步的函数, 返回的是一个coroutine 对象,要想执行coroutine对象,需要使用await, 但是脚本里却没有在哪里调用 await emit() ,所以崩溃信息中显示coroutine 'CustomHandler.emit' was never awaited.

既然emit方法返回的是一个coroutine对象,那么我们将它放一个loop中执行

async def main():
    await logger.debug("今天天气不错")    await logger.debug("是风和日丽的")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())复制代码

执行依然报错

raise TypeError('An asyncio.Future, a coroutine or an awaitable is '复制代码

意思是需要的是一个coroutine,但是传进来的对象不是。

这似乎就没有办法了,想要使用异步库来发送,但是却没有可以调用await的地方.

解决办法是有的,我们使用 asyncio.get_event_loop() 获取一个事件循环对象, 我们可以在这个对象上注册很多协程对象,这样当执行事件循环的时候,就是去执行注册在该事件循环上的协程, 我们通过一个小例子来看一下

import asyncio 

async def test(n):
    while n > 0:        await asyncio.sleep(1)
        print("test {}".format(n))
        n -= 1
    return n    
async def test2(n):
    while n >0:        await asyncio.sleep(1)
        print("test2 {}".format(n))
        n -= 1def stoploop(task):
    print("执行结束, task n is {}".format(task.result()))
    loop.stop()

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task2 = loop.create_task(test2(3))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))

loop.run_forever()复制代码

我们使用loop = asyncio.get_event_loop() 创建了一个事件循环对象loop, 并且在loop上创建了两个task, 并且给task1添加了一个回调函数,在task1它执行结束以后,将loop停掉.

注意看上面的代码,我们并没有在某处使用await来执行协程,而是通过将协程注册到某个事件循环对象上,然后调用该循环的run_forever() 函数,从而使该循环上的协程对象得以正常的执行.

上面得到的输出为

test 5
test2 3
test 4
test2 2
test 3
test2 1
test 2
test 1
执行结束, task n is 0复制代码

可以看到,使用事件循环对象创建的task,在该循环执行run_forever() 以后就可以执行了.

如果不执行loop.run_forever() 函数,则注册在它上面的协程也不会执行

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
time.sleep(5)# loop.run_forever()复制代码

上面的代码将loop.run_forever() 注释掉,换成time.sleep(5) 停5秒, 这时脚本不会有任何输出,在停了5秒以后就中止了.

回到之前的日志发送远程服务器的代码,我们可以使用aiohttp封装一个发送数据的函数, 然后在emit中将这个函数注册到全局的事件循环对象loop中,最后再执行loop.run_forever() .

loop = asyncio.get_event_loop()class CustomHandler(logging.Handler):
    def __init__(self, host, uri, method="POST"):
        logging.Handler.__init__(self)
        self.url = "%s/%s" % (host, uri)
        method = method.upper()        if method not in ["GET", "POST"]:            raise ValueError("method must be GET or POST")
        self.method = method    # 使用aiohttp封装发送数据函数
    async def submit(self, data):
        timeout = aiohttp.ClientTimeout(total=6)        if self.method == "GET":            if self.url.find("?") >= 0:
                sep = '&'
            else:
                sep = '?'
            url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": data}))            async with aiohttp.ClientSession(timeout=timeout) as session:                async with session.get(url) as resp:
                    print(await resp.text())        else:
            headers = {                "Content-type": "application/x-www-form-urlencoded",
            }            async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session:                async with session.post(self.url, data={'log': data}) as resp:
                    print(await resp.text())        return True

    def emit(self, record):
        msg = self.format(record)
        loop.create_task(self.submit(msg))# 添加一个httphandlerhttp_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)

logger.debug("今天天气不错")
logger.debug("是风和日丽的")

loop.run_forever()复制代码

这时脚本就可以正常的异步执行了.

loop.create_task(self.submit(msg)) 也可以使用

asyncio.ensure_future(self.submit(msg), loop=loop)

来代替,目的都是将协程对象注册到事件循环中.

但这种方式有一点要注意,loop.run_forever() 将会一直阻塞,所以需要有个地方调用loop.stop()方法. 可以注册到某个task的回调中.

相关免费学习推荐:python视频教程

위 내용은 Python에서 원격 서버에 로그를 비동기적으로 보내는 방법 이해의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 juejin.im에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제