Python logging 模块定义了为应用程序和库实现灵活的事件日志记录的函数和类。
程序开发过程中,很多程序都有记录日志的需求,并且日志包含的信息有正常的程序访问日志还可能有错误、警告等信息输出,Python 的 logging 模块提供了标准的日志接口,可以通过它存储各种格式的日志,日志记录提供了一组便利功能,用于简单的日志记录用法。
使用 Python Logging 模块的主要好处是所有 Python 模块都可以参与日志记录Logging 模块提供了大量具有灵活性的功能。
简单且方便的帮助我们输出需要的日志信息:
使用 Python 来写程序或者脚本的话,常常遇到的问题就是需要对日志进行删除。一方面可以帮助我们在程序出问题的时候排除问题,二来可以帮助我们记录需要关注的信息。
但是,使用自带自带的 logging 模块的话,则需要我们进行不同的初始化等相关工作。对应不熟悉该模块的同学来说,还是有些费劲的,比如需要配置 Handler/Formatter 等。 随着业务的复杂度提升, 对日志收集有着更高的要求, 例如: 日志分类, 文件存储, 异步写入, 自定义类型等等
loguru 是一个 Python 简易且强大的第三方日志记录库,该库旨在通过添加一系列有用的功能来解决标准记录器的注意事项,从而减少 Python 日志记录的痛苦。
pip install loguru
有很多优点,以下列举了其中比较重要的几点:
开箱即用,无需准备
无需初始化,导入函数即可使用
更容易的文件日志记录与转存/保留/压缩方式
更优雅的字符串格式化输出
可以在线程或主线程中捕获异常
可以设置不同级别的日志记录样式
支持异步,且线程和多进程安全
支持惰性计算
适用于脚本和库
完全兼容标准日志记录
更好的日期时间处理
from loguru import logger logger.debug("That's it, beautiful and simple logging!")
无需初始化,导入函数即可使用, 那么你肯定要问, 如何解决一下问题?
如何添加处理程序(handler)呢?
如何设置日志格式(logs formatting)呢?
如何过滤消息(filter messages)呢?
如何如何设置级别(log level)呢?
# add logger.add(sys.stderr, \ format="{time} {level} {message}",\ filter="my_module",\ level="INFO")
是不是很easy~
# 日志文件记录 logger.add("file_{time}.log") # 日志文件转存 logger.add("file_{time}.log", rotation="500 MB") logger.add("file_{time}.log", rotation="12:00") logger.add("file_{time}.log", rotation="1 week") # 多次时间之后清理 logger.add("file_X.log", retention="10 days") # 使用zip文件格式保存 logger.add("file_Y.log", compression="zip")
logger.info( "If you're using Python {}, prefer {feature} of course!", 3.10, feature="f-strings")
@logger.catch def my_function(x, y, z): # An error? It's caught anyway! return 1 / (x + y + z) my_function(0, 0, 0)
Loguru 会自动为不同的日志级别,添加不同的颜色进行区分, 也支持自定义颜色哦~
logger.add(sys.stdout, colorize=True, format="<green>{time}</green> <level>{message}</level>") logger.add('logs/z_{time}.log', level='DEBUG', format='{time:YYYY-MM-DD :mm:ss} - {level} - {file} - {line} - {message}', rotation="10 MB")
默认情况下,添加到 logger 中的日志信息都是线程安全的。但这并不是多进程安全的,我们可以通过添加 enqueue 参数来确保日志完整性。
如果我们想要在异步任务中使用日志记录的话,也是可以使用同样的参数来保证的。并且通过 complete() 来等待执行完成。
# 异步写入 logger.add("some_file.log", enqueue=True)
你没有看错, 只需要enqueue=True
即可异步执行
用于记录代码中发生的异常的 bug 跟踪,Loguru 通过允许显示整个堆栈跟踪(包括变量值)来帮助您识别问题
logger.add("out.log", backtrace=True, diagnose=True) def func(a, b): return a / b def nested(c): try: func(5, c) except ZeroDivisionError: logger.exception("What?!") nested(0)
对日志进行序列化以便更容易地解析或传递数据结构,使用序列化参数,在将每个日志消息发送到配置的接收器之前,将其转换为 JSON 字符串。
同时,使用 bind() 方法,可以通过修改额外的 record 属性来将日志记录器消息置于上下文中。还可以通过组合 bind() 和 filter 对日志进行更细粒度的控制。
最后 patch() 方法允许将动态值附加到每个新消息的记录 dict 上。
# 序列化为json格式 logger.add(custom_sink_function, serialize=True) # bind方法的用处 logger.add("file.log", format="{extra[ip]} {extra[user]} {message}") context_logger = logger.bind(ip="192.168.2.174", user="someone") context_logger.info("Contextualize your logger easily") context_logger.bind(user="someone_else").info("Inline binding of extra attribute") context_logger.info("Use kwargs to add context during formatting: {user}", user="anybody") # 粒度控制 logger.add("special.log", filter=lambda record: "special" in record["extra"]) logger.debug("This message is not logged to the file") logger.bind(special=True).info("This message, though, is logged to the file!") # patch()方法的用处 logger.add(sys.stderr, format="{extra[utc]} {message}") loggerlogger = logger.patch(lambda record: record["extra"].update(utc=datetime.utcnow()))
有时希望在生产环境中记录详细信息而不会影响性能,可以使用 opt() 方法来实现这一点。
logger.opt(lazy=True).debug("If sink level <= DEBUG: {x}", x=lambda: expensive_function(2**64)) # By the way, "opt()" serves many usages logger.opt(exception=True).info("Error stacktrace added to the log message (tuple accepted too)") logger.opt(colors=True).info("Per message <blue>colors</blue>") logger.opt(record=True).info("Display values from the record (eg. {record[thread]})") logger.opt(raw=True).info("Bypass sink formatting\n") logger.opt(depth=1).info("Use parent stack context (useful within wrapped functions)") logger.opt(capture=False).info("Keyword arguments not added to {dest} dict", dest="extra")
new_level = logger.level("SNAKY", no=38, color="<yellow>", icon="????") logger.log("SNAKY", "Here we go!")
# For scripts config = { "handlers": [ {"sink": sys.stdout, "format": "{time} - {message}"}, {"sink": "file.log", "serialize": True}, ], "extra": {"user": "someone"} } logger.configure(**config) # For libraries logger.disable("my_library") logger.info("No matter added sinks, this message is not displayed") logger.enable("my_library") logger.info("This message however is propagated to the sinks")
希望使用 Loguru 作为内置的日志处理程序?
需要将 Loguru 消息到标准日志?
想要拦截标准的日志消息到 Loguru 中汇总?
handler = logging.handlers.SysLogHandler(address=('localhost', 514)) logger.add(handler) class PropagateHandler(logging.Handler): def emit(self, record): logging.getLogger(record.name).handle(record) logger.add(PropagateHandler(), format="{message}") class InterceptHandler(logging.Handler): def emit(self, record): # Get corresponding Loguru level if it exists try: level = logger.level(record.levelname).name except ValueError: level = record.levelno # Find caller from where originated the logged message frame, depth = logging.currentframe(), 2 while frame.f_code.co_filename == logging.__file__: frameframe = frame.f_back depth += 1 logger.opt(depthdepth=depth, exception=record.exc_info).log(level, record.getMessage()) logging.basicConfig(handlers=[InterceptHandler()], level=0)
从生成的日志中提取特定的信息通常很有用,这就是为什么 Loguru 提供了一个 parse() 方法来帮助处理日志和正则表达式。
pattern = r"(?P<time>.*) - (?P<level>[0-9]+) - (?P<message>.*)" # Regex with named groups caster_dict = dict(time=dateutil.parser.parse, level=int) # Transform matching groups for groups in logger.parse("file.log", pattern, cast=caster_dict): print("Parsed:", groups) # {"level": 30, "message": "Log example", "time": datetime(2018, 12, 09, 11, 23, 55)}
import notifiers params = { "username": "you@gmail.com", "password": "abc123", "to": "dest@gmail.com" } # Send a single notification notifier = notifiers.get_notifier("gmail") notifier.notify(message="The application is running!", **params) # Be alerted on each error message from notifiers.logging import NotificationHandler handler = NotificationHandler("gmail", defaults=params) logger.add(handler, level="ERROR")
现在最关键的一个问题是如何兼容别的 logger,比如说 tornado 或者 django 有一些默认的 logger。
经过研究,最好的解决方案是参考官方文档的,完全整合 logging 的工作方式。比如下面将所有的 logging都用 loguru 的 logger 再发送一遍消息。
import logging import sys from pathlib import Path from flask import Flask from loguru import logger app = Flask(__name__) class InterceptHandler(logging.Handler): def emit(self, record): loggerlogger_opt = logger.opt(depth=6, exception=record.exc_info) logger_opt.log(record.levelname, record.getMessage()) def configure_logging(flask_app: Flask): """配置日志""" path = Path(flask_app.config['LOG_PATH']) if not path.exists(): path.mkdir(parents=True) log_name = Path(path, 'sips.log') logging.basicConfig(handlers=[InterceptHandler(level='INFO')], level='INFO') # 配置日志到标准输出流 logger.configure(handlers=[{"sink": sys.stderr, "level": 'INFO'}]) # 配置日志到输出到文件 logger.add(log_name, rotation="500 MB", encoding='utf-8', colorize=False, level='INFO')
介绍,主要函数的使用方法和细节 - add()的创建和删除
add() 非常重要的参数 sink 参数
具体的实现规范可以参见官方文档
可以实现自定义 Handler 的配置,比如 FileHandler、StreamHandler 等等
可以自行定义输出实现
代表文件路径,会自动创建对应路径的日志文件并将日志输出进去
例如 sys.stderr 或者 open(‘file.log’, ‘w’) 都可以
可以传入一个 file 对象
可以直接传入一个 str 字符串或者 pathlib.Path 对象
可以是一个方法
可以是一个 logging 模块的 Handler
可以是一个自定义的类
def add(self, sink, *, level=_defaults.LOGURU_LEVEL, format=_defaults.LOGURU_FORMAT, filter=_defaults.LOGURU_FILTER, colorize=_defaults.LOGURU_COLORIZE, serialize=_defaults.LOGURU_SERIALIZE, backtrace=_defaults.LOGURU_BACKTRACE, diagnose=_defaults.LOGURU_DIAGNOSE, enqueue=_defaults.LOGURU_ENQUEUE, catch=_defaults.LOGURU_CATCH, **kwargs ):
另外添加 sink 之后我们也可以对其进行删除,相当于重新刷新并写入新的内容。删除的时候根据刚刚 add 方法返回的 id 进行删除即可。可以发现,在调用 remove 方法之后,确实将历史 log 删除了。但实际上这并不是删除,只不过是将 sink 对象移除之后,在这之前的内容不会再输出到日志中,这样我们就可以实现日志的刷新重新写入操作
from loguru import logger trace = logger.add('runtime.log') logger.debug('this is a debug message') logger.remove(trace) logger.debug('this is another debug message')
我们在开发流程中, 通过日志快速定位问题, 高效率解决问题, 我认为 loguru 能帮你解决不少麻烦, 赶快试试吧~
当然, 使用各种也有不少麻烦, 例如:
--- Logging error in Loguru Handler #3 ---
Record was: None
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/loguru/_handler.py", line 272, in _queued_writer
message = queue.get()
File "/usr/local/lib/python3.9/multiprocessing/queues.py", line 366, in get
res = self._reader.recv_bytes()
File "/usr/local/lib/python3.9/multiprocessing/connection.py", line 221, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/local/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes
buf = self._recv(4)
File "/usr/local/lib/python3.9/multiprocessing/connection.py", line 384, in _recv
chunk = read(handle, remaining)
OSError: [Errno 9] Bad file descriptor
--- End of logging error ---
解决办法:
尝试将logs文件夹忽略git提交, 避免和服务器文件冲突即可;
当然也不止这个原因引起这个问题, 也可能是三方库(ciscoconfparse)冲突所致.解决办法: https://github.com/Delgan/loguru/issues/534
File "/home/ronaldinho/xxx/xxx/venv/lib/python3.9/site-packages/loguru/_logger.py", line 939, in add
handler = Handler(
File "/home/ronaldinho/xxx/xxx/venv/lib/python3.9/site-packages/loguru/_handler.py", line 86, in __init__
self._queue = multiprocessing.SimpleQueue()
File "/home/ronaldinho/.pyenv/versions/3.9.4/lib/python3.9/multiprocessing/context.py", line 113, in SimpleQueue
return SimpleQueue(ctx=self.get_context())
File "/home/ronaldinho/.pyenv/versions/3.9.4/lib/python3.9/multiprocessing/queues.py", line 342, in __init__
self._rlock = ctx.Lock()
File "/home/ronaldinho/.pyenv/versions/3.9.4/lib/python3.9/multiprocessing/context.py", line 68, in Lock
return Lock(ctx=self.get_context())
File "/home/ronaldinho/.pyenv/versions/3.9.4/lib/python3.9/multiprocessing/synchronize.py", line 162, in __init__
File "/home/ronaldinho/.pyenv/versions/3.9.4/lib/python3.9/multiprocessing/synchronize.py", line 57, in __init__
OSError: [Errno 24] Too many open files
你可以 remove()添加的处理程序,它应该释放文件句柄。
以上是Python3 Loguru输出日志工具怎么使用的详细内容。更多信息请关注PHP中文网其他相关文章!