Heim  >  Artikel  >  Backend-Entwicklung  >  Werfen wir einen Blick auf das asynchrone Senden von Protokollen an einen Remote-Server in Python

Werfen wir einen Blick auf das asynchrone Senden von Protokollen an einen Remote-Server in Python

coldplay.xixi
coldplay.xixinach vorne
2020-10-14 17:33:402876Durchsuche

Werfen wir einen Blick auf das asynchrone Senden von Protokollen an einen Remote-Server in Python

Weitere verwandte kostenlose Lerninhalte: Python-Video-Tutorial

Die häufigste Art, Protokolle in Python zu verwenden, besteht darin, Protokolle in der Konsole und in Dateien auszugeben, und das Protokollierungsmodul ist ebenfalls gut ausgestattet Die Verwendung entsprechender Klassen ist ebenfalls sehr praktisch, aber manchmal müssen wir Protokolle an das Remote-Ende senden oder direkt in die Datenbank schreiben

1. StreamHandler und FileHandler

Schreiben wir zunächst eine Reihe von Codes, die einfach an cmd und Dateien ausgegeben werden out [2020-09-23 10:45:56] [DEBUG] Das Wetter ist heute gut in cmd und wird in die Datei debug.log im aktuellen Verzeichnis geschrieben .

2. HTTPHandler hinzufügen

[2020-09-23 10:45:56] [DEBUG] 今天天气不错 且会写入到当前目录下的debug.log文件中.

二、添加HTTPHandler

如果想要在记录时将日志发送到远程服务器上,可以添加一个 HTTPHandler , 在python标准库logging.handler中,已经为我们定义好了很多handler,有些我们可以直接用,本地使用tornado写一个接收日志的接口,将接收到的参数全都打印出来

# -*- coding: utf-8 -*-"""
-------------------------------------------------
   File Name:     loger
   Description :
   Author :       yangyanxing
   date:          2020/9/23
-------------------------------------------------
"""import loggingimport sysimport os# 初始化loggerlogger = logging.getLogger("yyx")
logger.setLevel(logging.DEBUG)# 设置日志格式fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S')# 添加cmd handlercmd_handler = logging.StreamHandler(sys.stdout)
cmd_handler.setLevel(logging.DEBUG)
cmd_handler.setFormatter(fmt)# 添加文件的handlerlogpath = os.path.join(os.getcwd(), 'debug.log')
file_handler = logging.FileHandler(logpath)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)# 将cmd和file handler添加到logger中logger.addHandler(cmd_handler)
logger.addHandler(file_handler)

logger.debug("今天天气不错")复制代码

结果在服务端我们收到了很多信息

# 添加一个httphandlerimport logging.handlers
http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)

logger.debug("今天天气不错")复制代码

可以说是信息非常之多,但是却并不是我们想要的样子,我们只是想要类似于 [2020-09-23 10:45:56] [DEBUG] 今天天气不错 这样的日志.

logging.handlers.HTTPHandler 只是简单的将日志所有信息发送给服务端,至于服务端要怎么组织内容是由服务端来完成. 所以我们可以有两种方法,一种是改服务端代码,根据传过来的日志信息重新组织一下日志内容, 第二种是我们重新写一个类,让它在发送的时候将重新格式化日志内容发送到服务端.

我们采用第二种方法,因为这种方法比较灵活, 服务端只是用于记录,发送什么内容应该是由客户端来决定。

我们需要重新定义一个类,我们可以参考logging.handlers.HTTPHandler 这个类,重新写一个httpHandler类

每个日志类都需要重写emit方法,记录日志时真正要执行是也就是这个emit方法

{
    'name': [b 'yyx'],
    'msg': [b '\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
    'args': [b '()'],
    'levelname': [b 'DEBUG'],
    'levelno': [b '10'],
    'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'],
    'filename': [b 'loger.py'],
    'module': [b 'loger'],
    'exc_info': [b 'None'],
    'exc_text': [b 'None'],
    'stack_info': [b 'None'],
    'lineno': [b '41'],
    'funcName': [b '<module>'],
    'created': [b '1600831054.8881223'],
    'msecs': [b '888.1223201751709'],
    'relativeCreated': [b '22.99976348876953'],
    'thread': [b '14876'],
    'threadName': [b 'MainThread'],
    'processName': [b 'MainProcess'],
    'process': [b '8648'],
    'message': [b '\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
    'asctime': [b '2020-09-23 11:17:34']
}复制代码

上面代码中有一行定义发送的参数 msg = self.format(record)

这行代码表示,将会根据日志对象设置的格式返回对应的内容. 之后再将内容通过requests库进行发送,无论使用get 还是post方式,服务端都可以正常的接收到日志

[2020-09-23 11:43:50] [DEBUG] 今天天气不错Wenn Sie das Protokoll beim Aufzeichnen an den Remote-Server senden möchten, können Sie in logging.handler einen HTTPHandler hinzufügen Für uns wurden viele Handler definiert, von denen wir einige direkt verwenden können. Wir verwenden Tornado lokal, um eine Schnittstelle zum Empfangen von Protokollen zu schreiben und alle empfangenen Parameter auszudrucken serverseitig

class CustomHandler(logging.Handler):
    def __init__(self, host, uri, method="POST"):
        logging.Handler.__init__(self)
        self.url = "%s/%s" % (host, uri)
        method = method.upper()        if method not in ["GET", "POST"]:            raise ValueError("method must be GET or POST")
        self.method = method    def emit(self, record):
        '''
        :param record:
        :return:
        '''
        msg = self.format(record)        if self.method == "GET":            if (self.url.find("?") >= 0):
                sep = '&'
            else:
                sep = '?'
            url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
            requests.get(url, timeout=1)        else:
            headers = {                "Content-type": "application/x-www-form-urlencoded",                "Content-length": str(len(msg))
            }
            requests.post(self.url, data={'log': msg}, headers=headers, timeout=1)复制代码

Man kann sagen, dass es viele Informationen gibt, aber es ist nicht das, was wir wollen. Wir wollen nur etwas Ähnliches wie [2020-09-23 10:45:56] [DEBUG] Das Das Wetter ist heute gut code> Ein solches Protokoll.

logging.handlers.HTTPHandler sendet einfach alle Protokollinformationen an den Server. Die Art und Weise, wie der Server den Inhalt organisiert, erfolgt durch Wir können also den Servercode ändern und den Protokollinhalt basierend auf den übergebenen Protokollinformationen neu organisieren. Die zweite besteht darin, eine Klasse neu zu schreiben, damit sie den Protokollinhalt beim Senden neu formatieren kann .

Wir verwenden die zweite Methode, da diese Methode flexibler ist. Der Server wird nur für die Aufzeichnung verwendet und der Client sollte entscheiden, welche Inhalte gesendet werden sollen.

Wir müssen eine Klasse neu definieren. Wir können auf die Klasse logging.handlers.HTTPHandler verweisen und eine httpHandler-Klasse neu schreiben.

Jede Protokollklasse muss die Emit-Methode neu schreiben Protokolle aufzeichnen? Die Ausführung ist diese Emit-Methode

async def post(self):
    print(self.getParam('log'))    await asyncio.sleep(5)
    self.write({"msg": 'ok'})复制代码

Der obige Code enthält eine Zeile, die die zu sendenden Parameter definiert msg = self.format(record)

Diese Codezeile bedeutet das Es wird gemäß dem Format zurückgegeben, das durch den entsprechenden Inhalt des Protokollobjekts festgelegt wird. Danach wird der Inhalt über die Anforderungsbibliothek gesendet. Unabhängig von der Verwendung der Get- oder Post-Methode kann der Server das Protokoll normal empfangen

[2020-09-23 11:43:50] [DEBUG] Das Wetter ist heute gut

3. Senden Sie Remote-Protokolle asynchron

Wenn das Protokoll an den Remote-Server gesendet wird, dauert es eine gewisse Zeit, wenn es sehr langsam ist . Dann wird die Protokollaufzeichnung langsamer. Ändern Sie die Verarbeitungsklasse des Serverprotokolls und lassen Sie sie 5 Sekunden lang anhalten, um einen langen Verarbeitungsprozess zu simulieren. Zu diesem Zeitpunkt drucken wir das obige Protokoll aus. Die erhaltene Ausgabe lautet

logger.debug("今天天气不错")
logger.debug("是风和日丽的")复制代码
Uns ist aufgefallen, dass das Zeitintervall ebenfalls 5 Sekunden beträgt.

Dann kommt jetzt das Problem. Ursprünglich war es nur ein Protokoll, aber jetzt ist es zu einer Belastung geworden, die das gesamte Skript in die Länge zieht, sodass wir das Schreiben von Remote-Protokollen asynchron durchführen müssen.

3.1 Multithread-Verarbeitung verwenden

Das erste, woran Sie denken sollten, ist die Verwendung von Multithreads zum Ausführen der Protokollsendemethode

[2020-09-23 11:47:33] [DEBUG] 今天天气不错
[2020-09-23 11:47:38] [DEBUG] 是风和日丽的复制代码
Diese Methode kann den Hauptzweck erreichen, nicht zu blockieren, sondern jedes Mal, wenn ein Protokoll gedruckt wird, Es muss ein Thread eröffnet werden, was eine ziemliche Ressourcenverschwendung darstellt. Wir können Thread-Pools auch zum Verarbeiten verwenden

3.2 Verwenden Sie Thread-Pools zum Verarbeiten

Pythons concurrent.futures verfügt über ThreadPoolExecutor- und ProcessPoolExecutor-Klassen, bei denen es sich um Thread-Pools und Prozesspools handelt. Sie definieren zuerst mehrere Threads und lassen diese Threads dann verarbeiten Verarbeiten Sie die entsprechende Funktion, sodass Sie nicht jedes Mal einen neuen Thread erstellen müssen

Grundlegende Verwendung des Thread-Pools

def emit(self, record):
    msg = self.format(record)    if self.method == "GET":        if (self.url.find("?") >= 0):
            sep = '&'
        else:
            sep = '?'
        url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
        t = threading.Thread(target=requests.get, args=(url,))
        t.start()    else:
        headers = {            "Content-type": "application/x-www-form-urlencoded",            "Content-length": str(len(msg))
        }
        t = threading.Thread(target=requests.post, args=(self.url,), kwargs={"data":{'log': msg}, "headers":headers})
        t.start()复制代码
Wenn sich n Threads im Thread-Pool befinden und die Anzahl der übermittelten Aufgaben größer als n ist , die überschüssigen Aufgaben werden in die Warteschlange gestellt.

Ändern Sie die Emit-Funktion oben noch einmal

exector = ThreadPoolExecutor(max_workers=1) # 初始化一个线程池,只有一个线程exector.submit(fn, args, kwargs) # 将函数submit到线程池中复制代码
Warum hier nur einen Thread-Pool mit nur einem Thread initialisieren? Denn auf diese Weise kann sichergestellt werden, dass die Protokolle in der erweiterten Warteschlange gespeichert werden Wenn der Pool mehrere Threads enthält, ist die Reihenfolge nicht unbedingt garantiert. 🎜🎜3.3 Verwenden Sie die asynchrone aiohttp-Bibliothek, um Anfragen zu senden🎜🎜Die Emit-Methode in der CustomHandler-Klasse oben verwendet request.post, um Protokolle zu senden. Die Ausführung der Anfragen selbst wird blockiert, und genau aufgrund ihrer Existenz bleibt das Skript hängen . Nach langer Zeit können wir die Bibliothek für blockierende Anforderungen durch asynchrones aiohttp ersetzen, um die Get- und Post-Methoden auszuführen, und die Emit-Methode in einem CustomHandler neu schreiben. 🎜
exector = ThreadPoolExecutor(max_workers=1)def emit(self, record):
    msg = self.format(record)
    timeout = aiohttp.ClientTimeout(total=6)    if self.method == "GET":        if (self.url.find("?") >= 0):
            sep = '&'
        else:
            sep = '?'
        url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
        exector.submit(requests.get, url, timeout=6)    else:
        headers = {            "Content-type": "application/x-www-form-urlencoded",            "Content-length": str(len(msg))
        }
        exector.submit(requests.post, self.url, data={'log': msg}, headers=headers, timeout=6)复制代码
🎜Zu diesem Zeitpunkt stürzt die Codeausführung ab Es wurde eine Anfrage zum Senden von Protokollen empfangen. 🎜

究其原因是由于emit方法中使用async with session.post 函数,它需要在一个使用async 修饰的函数里执行,所以修改emit函数,使用async来修饰,这里emit函数变成了异步的函数, 返回的是一个coroutine 对象,要想执行coroutine对象,需要使用await, 但是脚本里却没有在哪里调用 await emit() ,所以崩溃信息中显示coroutine 'CustomHandler.emit' was never awaited.

既然emit方法返回的是一个coroutine对象,那么我们将它放一个loop中执行

async def main():
    await logger.debug("今天天气不错")    await logger.debug("是风和日丽的")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())复制代码

执行依然报错

raise TypeError('An asyncio.Future, a coroutine or an awaitable is '复制代码

意思是需要的是一个coroutine,但是传进来的对象不是。

这似乎就没有办法了,想要使用异步库来发送,但是却没有可以调用await的地方.

解决办法是有的,我们使用 asyncio.get_event_loop() 获取一个事件循环对象, 我们可以在这个对象上注册很多协程对象,这样当执行事件循环的时候,就是去执行注册在该事件循环上的协程, 我们通过一个小例子来看一下

import asyncio 

async def test(n):
    while n > 0:        await asyncio.sleep(1)
        print("test {}".format(n))
        n -= 1
    return n    
async def test2(n):
    while n >0:        await asyncio.sleep(1)
        print("test2 {}".format(n))
        n -= 1def stoploop(task):
    print("执行结束, task n is {}".format(task.result()))
    loop.stop()

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task2 = loop.create_task(test2(3))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))

loop.run_forever()复制代码

我们使用loop = asyncio.get_event_loop() 创建了一个事件循环对象loop, 并且在loop上创建了两个task, 并且给task1添加了一个回调函数,在task1它执行结束以后,将loop停掉.

注意看上面的代码,我们并没有在某处使用await来执行协程,而是通过将协程注册到某个事件循环对象上,然后调用该循环的run_forever() 函数,从而使该循环上的协程对象得以正常的执行.

上面得到的输出为

test 5
test2 3
test 4
test2 2
test 3
test2 1
test 2
test 1
执行结束, task n is 0复制代码

可以看到,使用事件循环对象创建的task,在该循环执行run_forever() 以后就可以执行了.

如果不执行loop.run_forever() 函数,则注册在它上面的协程也不会执行

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
time.sleep(5)# loop.run_forever()复制代码

上面的代码将loop.run_forever() 注释掉,换成time.sleep(5) 停5秒, 这时脚本不会有任何输出,在停了5秒以后就中止了.

回到之前的日志发送远程服务器的代码,我们可以使用aiohttp封装一个发送数据的函数, 然后在emit中将这个函数注册到全局的事件循环对象loop中,最后再执行loop.run_forever() .

loop = asyncio.get_event_loop()class CustomHandler(logging.Handler):
    def __init__(self, host, uri, method="POST"):
        logging.Handler.__init__(self)
        self.url = "%s/%s" % (host, uri)
        method = method.upper()        if method not in ["GET", "POST"]:            raise ValueError("method must be GET or POST")
        self.method = method    # 使用aiohttp封装发送数据函数
    async def submit(self, data):
        timeout = aiohttp.ClientTimeout(total=6)        if self.method == "GET":            if self.url.find("?") >= 0:
                sep = '&'
            else:
                sep = '?'
            url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": data}))            async with aiohttp.ClientSession(timeout=timeout) as session:                async with session.get(url) as resp:
                    print(await resp.text())        else:
            headers = {                "Content-type": "application/x-www-form-urlencoded",
            }            async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session:                async with session.post(self.url, data={'log': data}) as resp:
                    print(await resp.text())        return True

    def emit(self, record):
        msg = self.format(record)
        loop.create_task(self.submit(msg))# 添加一个httphandlerhttp_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)

logger.debug("今天天气不错")
logger.debug("是风和日丽的")

loop.run_forever()复制代码

这时脚本就可以正常的异步执行了.

loop.create_task(self.submit(msg)) 也可以使用

asyncio.ensure_future(self.submit(msg), loop=loop)

来代替,目的都是将协程对象注册到事件循环中.

但这种方式有一点要注意,loop.run_forever() 将会一直阻塞,所以需要有个地方调用loop.stop()方法. 可以注册到某个task的回调中.

Das obige ist der detaillierte Inhalt vonWerfen wir einen Blick auf das asynchrone Senden von Protokollen an einen Remote-Server in Python. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:juejin.im. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen