Python logging 模組定義了為應用程式和函式庫實作靈活的事件日誌記錄的函數和類別。
程式開發過程中,許多程式都有記錄日誌的需求,且日誌包含的資訊有正常的程式存取日誌還可能有錯誤、警告等資訊輸出,Python 的logging 模組提供了標準的日誌接口,可以透過它儲存各種格式的日誌,日誌記錄提供了一組便利功能,用於簡單的日誌記錄用法。
使用 Python Logging 模組的主要好處是所有 Python 模組都可以參與日誌記錄Logging 模組提供了大量具有靈活性的功能。
簡單又方便的幫助我們輸出所需的日誌訊息:
使用Python 來寫程式或腳本的話,常常遇到的問題就是需要對日誌進行刪除。一方面可以幫助我們在程序出問題的時候排除問題,二來可以幫助我們記錄需要關注的資訊。
但是,使用自備自帶的 logging 模組的話,則需要我們進行不同的初始化等相關工作。對應不熟悉該模組的同學來說,還是有些費力的,例如需要配置 Handler/Formatter 等。隨著業務的複雜度提升, 對日誌收集有著更高的要求, 例如: 日誌分類, 文件存儲, 異步寫入, 自定義類型等等
loguru 是一個Python 簡易且強大的第三方日誌記錄庫,該庫旨在透過添加一系列有用的功能來解決標準記錄器的注意事項,從而減少Python 日誌記錄的痛苦。
pip install loguru
有很多優點,以下列舉了其中比較重要的幾點:
開箱即用,無需準備
無需初始化,導入函數即可使用
#更容易的檔案日誌記錄與轉存/保留/壓縮方式
更優雅的字串格式化輸出
可以在線程或主線程中捕獲異常
可以設定不同級別的日誌記錄樣式
支援異步,且線程和多進程安全性
支援惰性計算
適用於腳本和函式庫
完全相容於標準日誌記錄
更好的日期時間處理
from loguru import logger logger.debug("That's it, beautiful and simple logging!")
無需初始化,導入函數即可使用, 那麼你肯定要問, 如何解決一下問題?
#如何添加處理程序(handler)呢?
如何設定日誌格式(logs formatting)呢?
如何過濾訊息(filter messages)呢?
如何如何設定等級(log level)呢?
# add logger.add(sys.stderr, \ format="{time} {level} {message}",\ filter="my_module",\ level="INFO")
是不是很easy~
# 日志文件记录 logger.add("file_{time}.log") # 日志文件转存 logger.add("file_{time}.log", rotation="500 MB") logger.add("file_{time}.log", rotation="12:00") logger.add("file_{time}.log", rotation="1 week") # 多次时间之后清理 logger.add("file_X.log", retention="10 days") # 使用zip文件格式保存 logger.add("file_Y.log", compression="zip")
logger.info( "If you're using Python {}, prefer {feature} of course!", 3.10, feature="f-strings")
@logger.catch def my_function(x, y, z): # An error? It's caught anyway! return 1 / (x + y + z) my_function(0, 0, 0)
logger.add(sys.stdout, colorize=True, format="<green>{time}</green> <level>{message}</level>") logger.add('logs/z_{time}.log', level='DEBUG', format='{time:YYYY-MM-DD :mm:ss} - {level} - {file} - {line} - {message}', rotation="10 MB")8.支援異步且線程和多進程安全
# 异步写入 logger.add("some_file.log", enqueue=True)你沒有看錯, 只需要
enqueue=True即可非同步執行
logger.add("out.log", backtrace=True, diagnose=True) def func(a, b): return a / b def nested(c): try: func(5, c) except ZeroDivisionError: logger.exception("What?!") nested(0)10. 結構化日誌記錄
# 序列化为json格式 logger.add(custom_sink_function, serialize=True) # bind方法的用处 logger.add("file.log", format="{extra[ip]} {extra[user]} {message}") context_logger = logger.bind(ip="192.168.2.174", user="someone") context_logger.info("Contextualize your logger easily") context_logger.bind(user="someone_else").info("Inline binding of extra attribute") context_logger.info("Use kwargs to add context during formatting: {user}", user="anybody") # 粒度控制 logger.add("special.log", filter=lambda record: "special" in record["extra"]) logger.debug("This message is not logged to the file") logger.bind(special=True).info("This message, though, is logged to the file!") # patch()方法的用处 logger.add(sys.stderr, format="{extra[utc]} {message}") loggerlogger = logger.patch(lambda record: record["extra"].update(utc=datetime.utcnow()))11. 惰性計算有時希望在生產環境中記錄詳細資訊而不會影響效能,可以使用 opt() 方法來實現這一點。
logger.opt(lazy=True).debug("If sink level <= DEBUG: {x}", x=lambda: expensive_function(2**64)) # By the way, "opt()" serves many usages logger.opt(exception=True).info("Error stacktrace added to the log message (tuple accepted too)") logger.opt(colors=True).info("Per message <blue>colors</blue>") logger.opt(record=True).info("Display values from the record (eg. {record[thread]})") logger.opt(raw=True).info("Bypass sink formatting\n") logger.opt(depth=1).info("Use parent stack context (useful within wrapped functions)") logger.opt(capture=False).info("Keyword arguments not added to {dest} dict", dest="extra")12. 可自訂的等級
new_level = logger.level("SNAKY", no=38, color="<yellow>", icon="????") logger.log("SNAKY", "Here we go!")
# For scripts config = { "handlers": [ {"sink": sys.stdout, "format": "{time} - {message}"}, {"sink": "file.log", "serialize": True}, ], "extra": {"user": "someone"} } logger.configure(**config) # For libraries logger.disable("my_library") logger.info("No matter added sinks, this message is not displayed") logger.enable("my_library") logger.info("This message however is propagated to the sinks")
handler = logging.handlers.SysLogHandler(address=('localhost', 514)) logger.add(handler) class PropagateHandler(logging.Handler): def emit(self, record): logging.getLogger(record.name).handle(record) logger.add(PropagateHandler(), format="{message}") class InterceptHandler(logging.Handler): def emit(self, record): # Get corresponding Loguru level if it exists try: level = logger.level(record.levelname).name except ValueError: level = record.levelno # Find caller from where originated the logged message frame, depth = logging.currentframe(), 2 while frame.f_code.co_filename == logging.__file__: frameframe = frame.f_back depth += 1 logger.opt(depthdepth=depth, exception=record.exc_info).log(level, record.getMessage()) logging.basicConfig(handlers=[InterceptHandler()], level=0)
从生成的日志中提取特定的信息通常很有用,这就是为什么 Loguru 提供了一个 parse() 方法来帮助处理日志和正则表达式。
pattern = r"(?P<time>.*) - (?P<level>[0-9]+) - (?P<message>.*)" # Regex with named groups caster_dict = dict(time=dateutil.parser.parse, level=int) # Transform matching groups for groups in logger.parse("file.log", pattern, cast=caster_dict): print("Parsed:", groups) # {"level": 30, "message": "Log example", "time": datetime(2018, 12, 09, 11, 23, 55)}
import notifiers params = { "username": "you@gmail.com", "password": "abc123", "to": "dest@gmail.com" } # Send a single notification notifier = notifiers.get_notifier("gmail") notifier.notify(message="The application is running!", **params) # Be alerted on each error message from notifiers.logging import NotificationHandler handler = NotificationHandler("gmail", defaults=params) logger.add(handler, level="ERROR")
现在最关键的一个问题是如何兼容别的 logger,比如说 tornado 或者 django 有一些默认的 logger。
经过研究,最好的解决方案是参考官方文档的,完全整合 logging 的工作方式。比如下面将所有的 logging都用 loguru 的 logger 再发送一遍消息。
import logging import sys from pathlib import Path from flask import Flask from loguru import logger app = Flask(__name__) class InterceptHandler(logging.Handler): def emit(self, record): loggerlogger_opt = logger.opt(depth=6, exception=record.exc_info) logger_opt.log(record.levelname, record.getMessage()) def configure_logging(flask_app: Flask): """配置日志""" path = Path(flask_app.config['LOG_PATH']) if not path.exists(): path.mkdir(parents=True) log_name = Path(path, 'sips.log') logging.basicConfig(handlers=[InterceptHandler(level='INFO')], level='INFO') # 配置日志到标准输出流 logger.configure(handlers=[{"sink": sys.stderr, "level": 'INFO'}]) # 配置日志到输出到文件 logger.add(log_name, rotation="500 MB", encoding='utf-8', colorize=False, level='INFO')
介绍,主要函数的使用方法和细节 - add()的创建和删除
add() 非常重要的参数 sink 参数
具体的实现规范可以参见官方文档
可以实现自定义 Handler 的配置,比如 FileHandler、StreamHandler 等等
可以自行定义输出实现
代表文件路径,会自动创建对应路径的日志文件并将日志输出进去
例如 sys.stderr 或者 open(‘file.log’, ‘w’) 都可以
可以传入一个 file 对象
可以直接传入一个 str 字符串或者 pathlib.Path 对象
可以是一个方法
可以是一个 logging 模块的 Handler
可以是一个自定义的类
def add(self, sink, *, level=_defaults.LOGURU_LEVEL, format=_defaults.LOGURU_FORMAT, filter=_defaults.LOGURU_FILTER, colorize=_defaults.LOGURU_COLORIZE, serialize=_defaults.LOGURU_SERIALIZE, backtrace=_defaults.LOGURU_BACKTRACE, diagnose=_defaults.LOGURU_DIAGNOSE, enqueue=_defaults.LOGURU_ENQUEUE, catch=_defaults.LOGURU_CATCH, **kwargs ):
另外添加 sink 之后我们也可以对其进行删除,相当于重新刷新并写入新的内容。删除的时候根据刚刚 add 方法返回的 id 进行删除即可。可以发现,在调用 remove 方法之后,确实将历史 log 删除了。但实际上这并不是删除,只不过是将 sink 对象移除之后,在这之前的内容不会再输出到日志中,这样我们就可以实现日志的刷新重新写入操作
from loguru import logger trace = logger.add('runtime.log') logger.debug('this is a debug message') logger.remove(trace) logger.debug('this is another debug message')
我们在开发流程中, 通过日志快速定位问题, 高效率解决问题, 我认为 loguru 能帮你解决不少麻烦, 赶快试试吧~
当然, 使用各种也有不少麻烦, 例如:
--- Logging error in Loguru Handler #3 ---
Record was: None
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/loguru/_handler.py", line 272, in _queued_writer
message = queue.get()
File "/usr/local/lib/python3.9/multiprocessing/queues.py", line 366, in get
res = self._reader.recv_bytes()
File "/usr/local/lib/python3.9/multiprocessing/connection.py", line 221, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/local/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes
buf = self._recv(4)
File "/usr/local/lib/python3.9/multiprocessing/connection.py", line 384, in _recv
chunk = read(handle, remaining)
OSError: [Errno 9] Bad file descriptor
--- End of logging error ---
解决办法:
尝试将logs文件夹忽略git提交, 避免和服务器文件冲突即可;
当然也不止这个原因引起这个问题, 也可能是三方库(ciscoconfparse)冲突所致.解决办法: https://github.com/Delgan/loguru/issues/534
File "/home/ronaldinho/xxx/xxx/venv/lib/python3.9/site-packages/loguru/_logger.py", line 939, in add
handler = Handler(
File "/home/ronaldinho/xxx/xxx/venv/lib/python3.9/site-packages/loguru/_handler.py", line 86, in __init__
self._queue = multiprocessing.SimpleQueue()
File "/home/ronaldinho/.pyenv/versions/3.9.4/lib/python3.9/multiprocessing/context.py", line 113, in SimpleQueue
return SimpleQueue(ctx=self.get_context())
File "/home/ronaldinho/.pyenv/versions/3.9.4/lib/python3.9/multiprocessing/queues.py", line 342, in __init__
self._rlock = ctx.Lock()
File "/home/ronaldinho/.pyenv/versions/3.9.4/lib/python3.9/multiprocessing/context.py", line 68, in Lock
return Lock(ctx=self.get_context())
File "/home/ronaldinho/.pyenv/versions/3.9.4/lib/python3.9/multiprocessing/synchronize.py", line 162, in __init__
File "/home/ronaldinho/.pyenv/versions/3.9.4/lib/python3.9/multiprocessing/synchronize.py", line 57, in __init__
OSError: [Errno 24] Too many open files
你可以 remove()添加的处理程序,它应该释放文件句柄。
以上是Python3 Loguru輸出日誌工具怎麼使用的詳細內容。更多資訊請關注PHP中文網其他相關文章!