ホームページ  >  記事  >  バックエンド開発  >  Python でログをリモートサーバーに非同期で送信する方法

Python でログをリモートサーバーに非同期で送信する方法

WBOY
WBOY転載
2023-05-11 10:31:051233ブラウズ

StreamHandler と FileHandler

まず、cmd とファイルに出力するための簡単なコード セットを作成しましょう:

# -*- coding: utf-8 -*-
"""
-------------------------------------------------
 File Name:   loger
 Description :
 Author :    yangyanxing
 date:     2020/9/23
-------------------------------------------------
"""
import logging
import sys
import os
# 初始化logger
logger = logging.getLogger("yyx")
logger.setLevel(logging.DEBUG)
# 设置日志格式
fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d
%H:%M:%S')
# 添加cmd handler
cmd_handler = logging.StreamHandler(sys.stdout)
cmd_handler.setLevel(logging.DEBUG)
cmd_handler.setFormatter(fmt)
# 添加文件的handler
logpath = os.path.join(os.getcwd(), 'debug.log')
file_handler = logging.FileHandler(logpath)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
# 将cmd和file handler添加到logger中
logger.addHandler(cmd_handler)
logger.addHandler(file_handler)
logger.debug("今天天气不错")

まず、ロガーを初期化してセットアップします。ログレベルはDEBUGで、cmd_handlerとfile_handlerを初期化し、最後にそれらをロガーに追加してスクリプトを実行すると、

がcmdに出力されます[2020-09-23 10:45:56] [ DEBUG] 今日は天気が良いです。現在のディレクトリの debug.log ファイルに書き込まれます。

HTTPHandler を追加します。

ログをリモートに送信したい場合は、記録 サーバー上で、HTTPHandler を追加できます。Python の標準ライブラリlogging.handler には、多くのハンドラーが定義されています。それらの一部は直接使用できます。tornado をローカルで使用して、ログを受信して​​出力するためのインターフェイスを作成できます。受信したパラメータをすべて出力します。

# 添加一个httphandler
import logging.handlers
http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天气不错")
结果在服务端我们收到了很多信息

{
'name': [b 'yyx'],
'msg': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'args': [b '()'],
'levelname': [b 'DEBUG'],
'levelno': [b '10'],
'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'],
'filename': [b 'loger.py'],
'module': [b 'loger'],
'exc_info': [b 'None'],
'exc_text': [b 'None'],
'stack_info': [b 'None'],
'lineno': [b '41'],
&#39;funcName&#39;: [b &#39;<module>&#39;],
&#39;created&#39;: [b &#39;1600831054.8881223&#39;],
&#39;msecs&#39;: [b &#39;888.1223201751709&#39;],
&#39;relativeCreated&#39;: [b &#39;22.99976348876953&#39;],
&#39;thread&#39;: [b &#39;14876&#39;],
&#39;threadName&#39;: [b &#39;MainThread&#39;],
&#39;processName&#39;: [b &#39;MainProcess&#39;],
&#39;process&#39;: [b &#39;8648&#39;],
&#39;message&#39;: [b
&#39;\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99&#39;],
&#39;asctime&#39;: [b &#39;2020-09-23 11:17:34&#39;]
}

情報が多いとも言えますが、それは私たちが望むものではありません。単に

[2020] のようなものが欲しいだけです。 -09-23 10:45: 56][デバッグ] 今日は天気が良いですSuch a log
logging.handlers.HTTPHandler は、すべてのログ情報をサーバーに送信するだけです。サーバーがコンテンツをどのように編成するかについては、 , それはサーバーによって行われます. したがって、方法は 2 つあります。1 つはサーバーのコードを変更し、渡されたログ情報に従ってログの内容を再編成する方法です。2 つ目は、クラスを書き換えて、再フォーマットされたログの内容をクラスに送信させる方法です。送信時サーバー。

この方法のほうが柔軟性が高いため、2 番目の方法を使用します。サーバーは記録にのみ使用され、クライアントは送信するコンテンツを決定する必要があります。

クラスを再定義する必要があります。logging.handlers.HTTPHandler クラスを参照して、httpHandler クラスを書き換えることができます。

各ログ クラスは、記録するために Emit メソッドをオーバーライドする必要があります。ロギング時に実際に実行されるのは、emit メソッドです:

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  def emit(self, record):
    &#39;&#39;&#39;
   重写emit方法,这里主要是为了把初始化时的baseParam添加进来
   :param record:
   :return:
   &#39;&#39;&#39;
    msg = self.format(record)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = &#39;&&#39;
      else:
        sep = &#39;?&#39;
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      requests.get(url, timeout=1)
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      requests.post(self.url, data={&#39;log&#39;: msg}, headers=headers,
timeout=1)

上記のコードには、送信するパラメータを定義する行があります (msg = self.format(record))。ログオブジェクトに従って設定されることを示し、対応する内容が形式で返されます。

その後、リクエスト ライブラリを通じてコン​​テンツを送信します。get メソッドまたは post メソッドの使用に関係なく、サーバーは通常どおりログを受信できます。

{&#39;log&#39;: [b&#39;[2020-09-23 11:39:45] [DEBUG]
\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99&#39;]}

バイト型を変換すると、次のようになります。 :

[2020-09-23 11:43:50] [デバッグ] 今日は天気が良いです

リモートを非同期送信していますlogs

ここで問題を考えます。ログをリモート サーバーに送信するとき、リモート サーバーの処理が非常に遅いと、ある程度の時間がかかり、ログの記録が遅くなります。サーバー ログ処理クラスを変更し、5 秒間一時停止して、長い処理プロセスをシミュレートします

async def post(self):
  print(self.getParam(&#39;log&#39;))
  await asyncio.sleep(5)
  self.write({"msg": &#39;ok&#39;})

この時点で、上記のログを出力します:

logger.debug("今天天气不错")
logger.debug("是风和日丽的")

取得した出力:

[2020-09-23 11:47:33] [DEBUG] 今日は天気が良いです
[2020-09 -23 11:47:38] [デバッグ] 風が強く晴れています

それらの時間間隔も 5 秒であることに気付きました。
ここで問題が発生します。元々は単なるログでしたが、今ではスクリプト全体に影響を与える重荷になっているため、リモート ログの書き込みを非同期で処理する必要があります。

1 マルチスレッド処理を使用する

最初に考えるべきことは、ログ送信メソッドを実行するために複数のスレッドを使用する必要があるということです。

def emit(self, record):
  msg = self.format(record)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = &#39;&&#39;
    else:
      sep = &#39;?&#39;
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    t = threading.Thread(target=requests.get, args=(url,))
    t.start()
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    t = threading.Thread(target=requests.post, args=(self.url,), kwargs=
{"data":{&#39;log&#39;: msg},

この方法は可能です ブロックしないという主な目的は達成されますが、ログを出力するたびにスレッドを開く必要があり、これもリソースの無駄です。スレッド プールを使用して処理することもできます

2 スレッド プールを使用して処理

Python の concurrent.futures には、スレッド プールとプロセス プールである ThreadPoolExecutor クラスと ProcessPoolExecutor クラスがあります。初期化中に最初に使用されます。複数のスレッドを定義し、それらのスレッドが対応する関数を処理できるようにすることで、毎回新しいスレッドを作成する必要がなくなります。

#スレッド プールの基本的な使用法:

exector = ThreadPoolExecutor(max_workers=1) # 初始化一个线程池,只有一个线程
exector.submit(fn, args, kwargs) # 将函数submit到线程池中

スレッド プールに n 個のスレッドがある場合、送信されたタスクの数が n よりも大きい場合、超過したタスクはキューに配置されます。

上記の発行関数を再度変更します

exector = ThreadPoolExecutor(max_workers=1)
def emit(self, record):
  msg = self.format(record)
  timeout = aiohttp.ClientTimeout(total=6)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = &#39;&&#39;
    else:
      sep = &#39;?&#39;
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    exector.submit(requests.get, url, timeout=6)
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    exector.submit(requests.post, self.url, data={&#39;log&#39;: msg},
headers=headers, timeout=6)

なぜ 1 つのスレッドだけでスレッド プールを初期化するのでしょうか?これにより、アドバンスト キュー内のログが最初に送信されることが保証されるからです。プール、スレッド内では、順序は必ずしも保証されません。

3 非同期 aiohttp ライブラリを使用してリクエストを送信します

上記の CustomHandler クラスの Emit メソッドは、requests.post を使用してログを送信します。リクエスト自体はブロックされて実行されるため、このリクエストが存在します。スクリプトが長時間スタックするため、ブロッキング リクエスト ライブラリを非同期 aiohttp に置き換えて get メソッドと post メソッドを実行し、CustomHandler

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  async def emit(self, record):
    msg = self.format(record)
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = &#39;&&#39;
      else:
        sep = &#39;?&#39;
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
      async with session.get(self.url) as resp:
          print(await resp.text())
      else:
        headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
      async with session.post(self.url, data={&#39;log&#39;: msg}) as resp:
          print(await resp.text())

で Emit メソッドを書き換えることができます。コードの実行がクラッシュしました:

C:\Python37\lib\logging\__init__.py:894: RuntimeWarning: coroutine
&#39;CustomHandler.emit&#39; was never awaited
self.emit(record)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

サーバーはログを送信するリクエストを受信しませんでした。

その理由は、async with session.post 関数が Emit メソッドで使用されているため、async で変更された関数内で実行する必要があるため、Emit 関数が変更されて async で変更され、emit 関数が非同期関数。コルーチン オブジェクトが返されます。コルーチン オブジェクトを実行するには、await を使用する必要がありますが、await Emit() はスクリプト内のどこにも呼び出されないため、クラッシュ情報は、コルーチン 'CustomHandler.emit' が一度も呼び出されなかったことを示しています。待っていました。

既然emit方法返回的是一个coroutine对象,那么我们将它放一个loop中执行

async def main():
  await logger.debug("今天天气不错")
  await logger.debug("是风和日丽的")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

执行依然报错:

raise TypeError('An asyncio.Future, a coroutine or an awaitable is '

意思是需要的是一个coroutine,但是传进来的对象不是。
这似乎就没有办法了,想要使用异步库来发送,但是却没有可以调用await的地方。

解决办法是有的,我们使用 asyncio.get_event_loop() 获取一个事件循环对象, 我们可以在这个对象上注册很多协程对象,这样当执行事件循环的时候,就是去执行注册在该事件循环上的协程,

我们通过一个小例子来看一下:

import asyncio
async def test(n):
 while n > 0:
   await asyncio.sleep(1)
   print("test {}".format(n))
   n -= 1
 return n

async def test2(n):
 while n >0:
   await asyncio.sleep(1)
   print("test2 {}".format(n))
   n -= 1
def stoploop(task):
 print("执行结束, task n is {}".format(task.result()))
 loop.stop()
loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task2 = loop.create_task(test2(3))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
loop.run_forever()

我们使用 loop = asyncio.get_event_loop() 创建了一个事件循环对象loop, 并且在loop上创建了两个task, 并且给task1添加了一个回调函数,在task1它执行结束以后,将loop停掉。
注意看上面的代码,我们并没有在某处使用await来执行协程,而是通过将协程注册到某个事件循环对象上, 然后调用该循环的 run_forever() 函数,从而使该循环上的协程对象得以正常的执行。

上面得到的输出为:

test 5
test2 3
test 4
test2 2
test 3
test2 1
test 2
test 1
执行结束, task n is 0

可以看到,使用事件循环对象创建的task,在该循环执行run_forever() 以后就可以执行了如果不执行 loop.run_forever() 函数,则注册在它上面的协程也不会执行

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
time.sleep(5)
# loop.run_forever()

上面的代码将loop.run_forever() 注释掉,换成time.sleep(5) 停5秒, 这时脚本不会有任何输出,在停了5秒 以后就中止了,
回到之前的日志发送远程服务器的代码,我们可以使用aiohttp封装一个发送数据的函数, 然后在emit中将 这个函数注册到全局的事件循环对象loop中,最后再执行loop.run_forever()

loop = asyncio.get_event_loop()
class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  # 使用aiohttp封装发送数据函数
  async def submit(self, data):
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if self.url.find("?") >= 0:
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
data}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get(url) as resp:
          print(await resp.text())
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
        async with session.post(self.url, data={'log': data}) as resp:
          print(await resp.text())
    return True
  def emit(self, record):
    msg = self.format(record)
    loop.create_task(self.submit(msg))
# 添加一个httphandler
http_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天气不错")
logger.debug("是风和日丽的")
loop.run_forever()

这时脚本就可以正常的异步执行了:

loop.create_task(self.submit(msg)) 也可以使用
asyncio.ensure_future(self.submit(msg), loop=loop) 来代替,目的都是将协程对象注册到事件循环中。

但这种方式有一点要注意,loop.run_forever() 将会一直阻塞,所以需要有个地方调用 loop.stop() 方法. 可以注册到某个task的回调中。

以上がPython でログをリモートサーバーに非同期で送信する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。