>  기사  >  백엔드 개발  >  Python으로 tail 명령을 완료하는 방법

Python으로 tail 명령을 완료하는 방법

WBOY
WBOY앞으로
2023-05-08 21:04:151782검색

1. 첫 번째 버전 - 파일 끝부터 실시간 데이터 읽기

주요 아이디어는 파일을 열고 포인터를 파일 끝으로 이동한 다음 데이터가 있으면 데이터를 출력하고 절전 모드로 전환하는 것입니다. 일정 기간 동안 데이터가 없으면

import time
import sys

from typing import Callable, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self):
        with open(self.file_name) as f:
            f.seek(0, 2)  # 从文件结尾处开始seek
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都会每次都打印新的一行
                else:
                    time.sleep(self.interval)


if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()

그 후 다음을 호출하세요:

python xxx.py filename

2. 두 번째 버전 - tail -f

tail을 구현하세요. -f 기본적으로 마지막 10줄의 데이터를 먼저 읽은 다음 파일 끝부터 실시간 데이터를 읽습니다. 작은 파일의 경우 모든 파일 내용을 먼저 읽고 마지막 10줄을 출력할 수 있습니다. 그러나 전체 텍스트를 읽고 마지막 10줄을 가져오는 성능은 높지 않으며, 10줄을 롤백하는 경계 조건도 매우 복잡합니다. 전체 텍스트를 먼저 읽은 다음 구현하는 방법을 살펴보겠습니다. 마지막 10줄 가져오기: tail -f默认先读取最后10行数据,再从文件尾部读取实时数据.如果对于小文件,可以先读取所有文件内容,并输出最后10行, 但是读取全文再获取最后10行的性能不高, 而从后滚10行的边界条件也很复杂, 先看先读取全文再获取最后10行的实现:

import time
import sys

from typing import Callable, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self):
        with open(self.file_name) as f:
            self.read_last_line(f)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都会每次都打印新的一行
                else:
                    time.sleep(self.interval)

    def read_last_line(self, f):
        last_lines = f.readlines()[-10:]
        for line in last_lines:
            self.output(line)

if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()

可以看到实现很简单, 相比第一版只多了个read_last_line的函数, 接下来就要解决性能的问题了, 当文件很大的时候, 这个逻辑是不行的, 特别是有些日志文件经常有几个G大, 如果全读出来内存就爆了. 而在Linux系统中, 没有一个接口可以指定指针跳到倒数10行, 只能使用如下方法来模拟输出倒数10行:

  • 首先游标跳转到最新的字符, 保存当前游标, 然后预估一行数据的字符长度, 最好偏多, 这里我按1024字符长度为一行来处理

  • 然后利用seek的方法,跳转到seek(-1024 * 10, 2)的字符, 这就是我们预估的倒数10行内的内容

  • 接着对内容进行判断, 如果跳转的字符长度小于 10 * 1024, 则证明整个文件没有10行, 则采用原来的read_last_line方法.

  • 如果跳转到字符长度等于1024 * 10, 则利用换行符计算已取字符长度共有多少行,如果行数大于10,那只输出最后10行,如果只读了4行,则继续读6*1024,直到读满10行为止

通过以上步奏, 就把倒数10行的数据计算好了可以打印出来, 可以进入追加数据了, 但是这时候文件内容可能发生改变了, 我们的游标也发生改变了, 这时候要把游标跳回到刚才保存的游标,防止漏打或者重复打印数据.

分析完毕后, 就可以开始重构read_last_line函数了.

import time
import sys

from typing import Callable, List, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

    def __call__(self, n: int = 10):
        with open(self.file_name) as f:
            self.read_last_line(f, n)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都会每次都打印新的一行
                else:
                    time.sleep(self.interval)

    def read_last_line(self, file, n):
        read_len: int = self.len_line * n

        # 跳转游标到最后
        file.seek(0, 2)
        # 获取当前结尾的游标位置
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来
                file.seek(0) # 由于read方法是按照游标进行打印, 所以要重置游标
                last_line_list: List[str] = file.read().split('\n')[-n:]
                # 重新获取游标位置
                now_tell: int = file.tell()
                break
            # 跳转到我们预估的字符位置
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                # 如果获取的行数大于要求的行数,则获取前n行的行数
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        # 重置游标,确保接下来打印的数据不重复
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--filename")
    parser.add_argument("-n", "--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))

3.第三版--优雅的读取输出日志文件

可以发现实时读取那块的逻辑性能还是很差, 如果每秒读一次文件,实时性就太慢了,把间隔改小了,则处理器占用太多. 性能最好的情况是如果能得知文件更新再进行打印文件, 那性能就能得到保障了.庆幸的是,在Linux中inotify提供了这样的功能. 此外,日志文件有一个特点就是会进行logrotate,如果日志被logrotate了,那我们就需要重新打开文件,并进一步读取数据, 这种情况也可以利用到inotify, 当inotify

import os
import sys

from typing import Callable, List, NoReturn

import pyinotify

multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF  # 监控多个事件


class InotifyEventHandler(pyinotify.ProcessEvent):  # 定制化事件处理类,注意继承
    """
    执行inotify event的封装
    """
    f: 'open()'
    filename: str
    path: str
    wm: 'pyinotify.WatchManager'
    output: Callable

    def my_init(self, **kargs):
        """pyinotify.ProcessEvent要求不能直接继承__init__, 而是要重写my_init, 我们重写这一段并进行初始化"""

        # 获取文件
        filename: str = kargs.pop('filename')
        if not os.path.exists(filename):
            raise RuntimeError('Not Found filename')
        if '/' not in filename:
            filename = os.getcwd() + '/' + filename
        index = filename.rfind('/')
        if index == len(filename) - 1 or index == -1:
            raise RuntimeError('Not a legal path')

        self.f = None
        self.filename = filename
        self.output: Callable = kargs.pop('output')
        self.wm = kargs.pop('wm')
        # 只监控路径,这样就能知道文件是否移动
        self.path = filename[:index]
        self.wm.add_watch(self.path, multi_event)

    def read_line(self):
        """统一的输出方法"""
        for line in self.f.readlines():
            self.output(line)

    def process_IN_MODIFY(self, event):
        """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生变化, 进行文件读取"""
        if event.pathname == self.filename:
            self.read_line()

    def process_IN_MOVE_SELF(self, event):
        """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生重新打开, 进行文件读取"""
        if event.pathname == self.filename:
            # 检测到文件被移动重新打开文件
            self.f.close()
            self.f = open(self.filename)
            self.read_line()

    def __enter__(self) -> 'InotifyEventHandler':
        self.f = open(self.filename)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.f.close()


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

        wm = pyinotify.WatchManager()  # 创建WatchManager对象
        inotify_event_handler = InotifyEventHandler(
            **dict(filename=file_name, wm=wm, output=output)
        )  # 实例化我们定制化后的事件处理类, 采用**dict传参数
        wm.add_watch('/tmp', multi_event)  # 添加监控的目录,及事件
        self.notifier = pyinotify.Notifier(wm, inotify_event_handler)  # 在notifier实例化时传入,notifier会自动执行
        self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler

    def __call__(self, n: int = 10):
        """通过inotify的with管理打开文件"""
        with self.inotify_event_handle as i:
            # 先读取指定的行数
            self.read_last_line(i.f, n)
            # 启用inotify的监听
            self.notifier.loop()

    def read_last_line(self, file, n):
        read_len: int = self.len_line * n

        # 获取当前结尾的游标位置
        file.seek(0, 2)
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来
                file.seek(0)
                last_line_list: List[str] = file.read().split('\n')[-n:]
                # 重新获取游标位置
                now_tell: int = file.tell()
                break
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                # 如果获取的行数大于要求的行数,则获取前n行的行数
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        # 重置游标,确保接下来打印的数据不重复
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--filename")
    parser.add_argument("-n", "--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))

첫 번째 버전에 비해 구현이 매우 간단하다는 것을 알 수 있습니다. 다음으로 성능 문제를 해결해야 합니다. 파일이 매우 크면 이 논리가 작동하지 않습니다. 특히 일부 로그 파일은 크기가 수 G인 경우가 많습니다. Linux 시스템에서는 인터페이스가 포인터를 지정할 수 없습니다. 마지막 10줄로 이동하려면 다음 방법만 사용하여 마지막 10줄의 출력을 시뮬레이션할 수 있습니다.

  • 먼저 커서가 최신 줄로 이동합니다. 문자를 저장하고 현재 커서를 저장한 다음 데이터 행의 문자 길이를 추정하면 너무 긴 것이 좋습니다. 여기서는 한 행의 길이를 1024자 기준으로 처리합니다🎜
  • 🎜 그런 다음 탐색 메소드를 사용하여 탐색(-1024 * 10 , 2) 문자로 점프합니다. 이것이 마지막 10줄 내의 추정 내용입니다🎜
  • 🎜그런 다음 점프한 내용의 길이를 판단합니다. 문자가 10 * 1024보다 작으면 전체 파일에 10줄이 없다는 것을 증명하며 원래 read_last_line 방법이 사용됩니다. 🎜
  • 🎜문자 길이가 점프는 1024 * 10과 같으며 줄 바꿈 문자를 사용하여 문자 길이가 몇 줄인지 계산합니다. 줄 수가 10보다 크면 마지막 10줄만 출력됩니다. 읽었으면 10줄을 읽을 때까지 6*1024를 계속 읽으세요🎜
🎜위 단계를 거친 후 10까지 카운트다운합니다. 행 데이터를 계산한 후 인쇄할 수 있으며, 그러나 이때 파일 내용이 변경되었을 수 있으며 커서도 변경되었을 수 있습니다. 이때 커서는 항목 누락을 방지하기 위해 방금 저장된 커서로 다시 이동해야 합니다. 🎜🎜분석이 완료되면 read_last_line 함수 재구성을 시작할 수 있습니다. 🎜rrreee🎜3. 세 번째 버전 - 출력 로그 파일을 우아하게 읽습니다.🎜🎜논리적 읽기를 발견할 수 있습니다. 해당 블록을 가져오는 성능은 여전히 ​​매우 낮습니다. 파일을 초당 한 번 읽으면 실시간 성능이 너무 느려집니다. 간격을 더 작은 값으로 변경하면 프로세서가 너무 많은 성능을 차지하게 됩니다. 파일 업데이트를 알 수 있고 파일을 인쇄하면 성능을 보장할 수 있습니다. 다행히 Linux의 inotify에서는 이러한 기능도 제공합니다. 로그가 logrotated되면 파일을 다시 열고 데이터를 더 읽어야 합니다. 이 경우 inotify를 사용할 수도 있습니다. 파일이 다시 열리는 이벤트가 발생하면 파일을 다시 열고 다시 읽으면 됩니다.🎜rrreee🎜파일이 open이 아닌 inotify로 열린 것을 확인할 수 있습니다(이때, 연 후 초기화를 위해 my_init 메소드가 호출됩니다). , 우리는 원래 n 줄을 연 코드를 실행한 다음 inotify가 실행되도록 둡니다. inotify가 실행되기 전에 해당 inotify 이벤트에 파일 열기 메서드와 인쇄 파일 메서드를 마운트합니다. , 해당 이벤트에 따라 해당 메서드가 실행됩니다. 🎜

위 내용은 Python으로 tail 명령을 완료하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제