搜索
首页后端开发Python教程如何用Python完成tail命令

1.第一版--从文件尾部读取实时数据

主要思路是: 打开文件, 把指针移动到文件最后, 然后有数据则输出数据, 无数据则休眠一段时间.

import time
import sys

from typing import Callable, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self):
        with open(self.file_name) as f:
            f.seek(0, 2)  # 从文件结尾处开始seek
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都会每次都打印新的一行
                else:
                    time.sleep(self.interval)


if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()

之后只要做如下调用即可:

python xxx.py filename 

2.第二版--实现tail -f

tail -f默认先读取最后10行数据,再从文件尾部读取实时数据.如果对于小文件,可以先读取所有文件内容,并输出最后10行, 但是读取全文再获取最后10行的性能不高, 而从后滚10行的边界条件也很复杂, 先看先读取全文再获取最后10行的实现:

import time
import sys

from typing import Callable, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self):
        with open(self.file_name) as f:
            self.read_last_line(f)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都会每次都打印新的一行
                else:
                    time.sleep(self.interval)

    def read_last_line(self, f):
        last_lines = f.readlines()[-10:]
        for line in last_lines:
            self.output(line)

if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()

可以看到实现很简单, 相比第一版只多了个read_last_line的函数, 接下来就要解决性能的问题了, 当文件很大的时候, 这个逻辑是不行的, 特别是有些日志文件经常有几个G大, 如果全读出来内存就爆了. 而在Linux系统中, 没有一个接口可以指定指针跳到倒数10行, 只能使用如下方法来模拟输出倒数10行:

  • 首先游标跳转到最新的字符, 保存当前游标, 然后预估一行数据的字符长度, 最好偏多, 这里我按1024字符长度为一行来处理

  • 然后利用seek的方法,跳转到seek(-1024 * 10, 2)的字符, 这就是我们预估的倒数10行内的内容

  • 接着对内容进行判断, 如果跳转的字符长度小于 10 * 1024, 则证明整个文件没有10行, 则采用原来的read_last_line方法.

  • 如果跳转到字符长度等于1024 * 10, 则利用换行符计算已取字符长度共有多少行,如果行数大于10,那只输出最后10行,如果只读了4行,则继续读6*1024,直到读满10行为止

通过以上步奏, 就把倒数10行的数据计算好了可以打印出来, 可以进入追加数据了, 但是这时候文件内容可能发生改变了, 我们的游标也发生改变了, 这时候要把游标跳回到刚才保存的游标,防止漏打或者重复打印数据.

分析完毕后, 就可以开始重构read_last_line函数了.

import time
import sys

from typing import Callable, List, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

    def __call__(self, n: int = 10):
        with open(self.file_name) as f:
            self.read_last_line(f, n)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都会每次都打印新的一行
                else:
                    time.sleep(self.interval)

    def read_last_line(self, file, n):
        read_len: int = self.len_line * n

        # 跳转游标到最后
        file.seek(0, 2)
        # 获取当前结尾的游标位置
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来
                file.seek(0) # 由于read方法是按照游标进行打印, 所以要重置游标
                last_line_list: List[str] = file.read().split('\n')[-n:]
                # 重新获取游标位置
                now_tell: int = file.tell()
                break
            # 跳转到我们预估的字符位置
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                # 如果获取的行数大于要求的行数,则获取前n行的行数
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        # 重置游标,确保接下来打印的数据不重复
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--filename")
    parser.add_argument("-n", "--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))

3.第三版--优雅的读取输出日志文件

可以发现实时读取那块的逻辑性能还是很差, 如果每秒读一次文件,实时性就太慢了,把间隔改小了,则处理器占用太多. 性能最好的情况是如果能得知文件更新再进行打印文件, 那性能就能得到保障了.庆幸的是,在Linux中inotify提供了这样的功能. 此外,日志文件有一个特点就是会进行logrotate,如果日志被logrotate了,那我们就需要重新打开文件,并进一步读取数据, 这种情况也可以利用到inotify, 当inotify获取到文件重新打开的事件时,我们就重新打开文件,再读取.

import os
import sys

from typing import Callable, List, NoReturn

import pyinotify

multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF  # 监控多个事件


class InotifyEventHandler(pyinotify.ProcessEvent):  # 定制化事件处理类,注意继承
    """
    执行inotify event的封装
    """
    f: 'open()'
    filename: str
    path: str
    wm: 'pyinotify.WatchManager'
    output: Callable

    def my_init(self, **kargs):
        """pyinotify.ProcessEvent要求不能直接继承__init__, 而是要重写my_init, 我们重写这一段并进行初始化"""

        # 获取文件
        filename: str = kargs.pop('filename')
        if not os.path.exists(filename):
            raise RuntimeError('Not Found filename')
        if '/' not in filename:
            filename = os.getcwd() + '/' + filename
        index = filename.rfind('/')
        if index == len(filename) - 1 or index == -1:
            raise RuntimeError('Not a legal path')

        self.f = None
        self.filename = filename
        self.output: Callable = kargs.pop('output')
        self.wm = kargs.pop('wm')
        # 只监控路径,这样就能知道文件是否移动
        self.path = filename[:index]
        self.wm.add_watch(self.path, multi_event)

    def read_line(self):
        """统一的输出方法"""
        for line in self.f.readlines():
            self.output(line)

    def process_IN_MODIFY(self, event):
        """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生变化, 进行文件读取"""
        if event.pathname == self.filename:
            self.read_line()

    def process_IN_MOVE_SELF(self, event):
        """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生重新打开, 进行文件读取"""
        if event.pathname == self.filename:
            # 检测到文件被移动重新打开文件
            self.f.close()
            self.f = open(self.filename)
            self.read_line()

    def __enter__(self) -> 'InotifyEventHandler':
        self.f = open(self.filename)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.f.close()


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

        wm = pyinotify.WatchManager()  # 创建WatchManager对象
        inotify_event_handler = InotifyEventHandler(
            **dict(filename=file_name, wm=wm, output=output)
        )  # 实例化我们定制化后的事件处理类, 采用**dict传参数
        wm.add_watch('/tmp', multi_event)  # 添加监控的目录,及事件
        self.notifier = pyinotify.Notifier(wm, inotify_event_handler)  # 在notifier实例化时传入,notifier会自动执行
        self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler

    def __call__(self, n: int = 10):
        """通过inotify的with管理打开文件"""
        with self.inotify_event_handle as i:
            # 先读取指定的行数
            self.read_last_line(i.f, n)
            # 启用inotify的监听
            self.notifier.loop()

    def read_last_line(self, file, n):
        read_len: int = self.len_line * n

        # 获取当前结尾的游标位置
        file.seek(0, 2)
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来
                file.seek(0)
                last_line_list: List[str] = file.read().split('\n')[-n:]
                # 重新获取游标位置
                now_tell: int = file.tell()
                break
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                # 如果获取的行数大于要求的行数,则获取前n行的行数
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        # 重置游标,确保接下来打印的数据不重复
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--filename")
    parser.add_argument("-n", "--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))

可以看到, 从原本的open打开文件改为用inotify打开文件(这时候会调用my_init方法进行初始化), 打开后还是运行我们打开原来n行的代码, 然后就交给inotify运行. 在inotify运行之前, 我们把重新打开文件方法和打印文件方法都挂载在inotifiy对应的事件里, 之后inotify运行时, 会根据对应的事件执行对应的方法。

以上是如何用Python完成tail命令的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文转载于:亿速云。如有侵权,请联系admin@php.cn删除
Python vs. C:了解关键差异Python vs. C:了解关键差异Apr 21, 2025 am 12:18 AM

Python和C 各有优势,选择应基于项目需求。1)Python适合快速开发和数据处理,因其简洁语法和动态类型。2)C 适用于高性能和系统编程,因其静态类型和手动内存管理。

Python vs.C:您的项目选择哪种语言?Python vs.C:您的项目选择哪种语言?Apr 21, 2025 am 12:17 AM

选择Python还是C 取决于项目需求:1)如果需要快速开发、数据处理和原型设计,选择Python;2)如果需要高性能、低延迟和接近硬件的控制,选择C 。

达到python目标:每天2小时的力量达到python目标:每天2小时的力量Apr 20, 2025 am 12:21 AM

通过每天投入2小时的Python学习,可以有效提升编程技能。1.学习新知识:阅读文档或观看教程。2.实践:编写代码和完成练习。3.复习:巩固所学内容。4.项目实践:应用所学于实际项目中。这样的结构化学习计划能帮助你系统掌握Python并实现职业目标。

最大化2小时:有效的Python学习策略最大化2小时:有效的Python学习策略Apr 20, 2025 am 12:20 AM

在两小时内高效学习Python的方法包括:1.回顾基础知识,确保熟悉Python的安装和基本语法;2.理解Python的核心概念,如变量、列表、函数等;3.通过使用示例掌握基本和高级用法;4.学习常见错误与调试技巧;5.应用性能优化与最佳实践,如使用列表推导式和遵循PEP8风格指南。

在Python和C之间进行选择:适合您的语言在Python和C之间进行选择:适合您的语言Apr 20, 2025 am 12:20 AM

Python适合初学者和数据科学,C 适用于系统编程和游戏开发。1.Python简洁易用,适用于数据科学和Web开发。2.C 提供高性能和控制力,适用于游戏开发和系统编程。选择应基于项目需求和个人兴趣。

Python与C:编程语言的比较分析Python与C:编程语言的比较分析Apr 20, 2025 am 12:14 AM

Python更适合数据科学和快速开发,C 更适合高性能和系统编程。1.Python语法简洁,易于学习,适用于数据处理和科学计算。2.C 语法复杂,但性能优越,常用于游戏开发和系统编程。

每天2小时:Python学习的潜力每天2小时:Python学习的潜力Apr 20, 2025 am 12:14 AM

每天投入两小时学习Python是可行的。1.学习新知识:用一小时学习新概念,如列表和字典。2.实践和练习:用一小时进行编程练习,如编写小程序。通过合理规划和坚持不懈,你可以在短时间内掌握Python的核心概念。

Python与C:学习曲线和易用性Python与C:学习曲线和易用性Apr 19, 2025 am 12:20 AM

Python更易学且易用,C 则更强大但复杂。1.Python语法简洁,适合初学者,动态类型和自动内存管理使其易用,但可能导致运行时错误。2.C 提供低级控制和高级特性,适合高性能应用,但学习门槛高,需手动管理内存和类型安全。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

适用于 Eclipse 的 SAP NetWeaver 服务器适配器

适用于 Eclipse 的 SAP NetWeaver 服务器适配器

将Eclipse与SAP NetWeaver应用服务器集成。

Dreamweaver Mac版

Dreamweaver Mac版

视觉化网页开发工具

SublimeText3 英文版

SublimeText3 英文版

推荐:为Win版本,支持代码提示!

螳螂BT

螳螂BT

Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。

WebStorm Mac版

WebStorm Mac版

好用的JavaScript开发工具