Rumah >pembangunan bahagian belakang >Tutorial Python >Bagaimana untuk melengkapkan arahan ekor dengan Python

Bagaimana untuk melengkapkan arahan ekor dengan Python

WBOY
WBOYke hadapan
2023-05-08 21:04:151822semak imbas

1. Versi 1 - Membaca data masa nyata dari hujung fail

Idea utama ialah: buka fail, alihkan penunjuk ke hujung fail, kemudian keluarkan data jika ada adalah data, dan tidur untuk tempoh masa jika tiada data.

import time
import sys

from typing import Callable, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self):
        with open(self.file_name) as f:
            f.seek(0, 2)  # 从文件结尾处开始seek
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都会每次都打印新的一行
                else:
                    time.sleep(self.interval)


if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()

Selepas itu, buat panggilan berikut:

nama fail python xxx.py

2. Versi kedua--implement tail -f

tail -fSecara lalai, 10 baris data terakhir dibaca dahulu, dan kemudian data masa nyata dibaca dari penghujung Untuk fail kecil, anda boleh membaca semua kandungan fail terlebih dahulu dan mengeluarkan 10 baris terakhir, tetapi membaca teks penuh Prestasi mendapatkan semula 10 baris terakhir adalah tidak tinggi, dan syarat sempadan untuk melancarkan 10 baris adalah. juga sangat rumit. Mari kita lihat pelaksanaan membaca teks penuh terlebih dahulu dan kemudian mendapatkan semula 10 baris terakhir:

import time
import sys

from typing import Callable, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self):
        with open(self.file_name) as f:
            self.read_last_line(f)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都会每次都打印新的一行
                else:
                    time.sleep(self.interval)

    def read_last_line(self, f):
        last_lines = f.readlines()[-10:]
        for line in last_lines:
            self.output(line)

if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()

Anda dapat melihat bahawa pelaksanaannya sangat mudah, berbanding dengan versi pertama, hanya ada satu lagi read_last_line的函数 Langkah seterusnya adalah untuk menyelesaikan masalah prestasi Apabila fail sangat besar, logik ini tidak berfungsi, terutamanya beberapa fail log selalunya bersaiz beberapa G. Dalam sistem Linux, tiada antara muka yang boleh menentukan penunjuk untuk melompat ke 10 baris terakhir Anda hanya boleh menggunakan kaedah berikut untuk mensimulasikan output 10 baris terakhir:

  • Mula-mula, kursor melompat ke aksara terkini, menyimpan kursor semasa, dan kemudian menganggarkan panjang aksara bagi satu baris data, sebaik-baiknya di sini saya memprosesnya berdasarkan baris 1024 aksara

  • dan kemudian Gunakan kaedah cari untuk melompat ke aksara seek(-1024 * 10, 2 Ini adalah anggaran kandungan kami dalam 10 baris terakhir
  • dan kemudian). nilaikan kandungan. Jika Jika panjang aksara lompatan kurang daripada 10 * 1024, ia membuktikan bahawa keseluruhan fail tidak mempunyai 10 baris, dan kaedah
  • asal digunakan

    Jika panjang aksara lompatan adalah sama dengan 1024 * 10, Kemudian gunakan aksara baris baharu untuk mengira berapa banyak baris panjang aksara telah diambil Jika bilangan baris lebih daripada 10, hanya 10 baris terakhir akan menjadi output. Jika hanya 4 baris telah dibaca, teruskan membaca 6*1024 sehingga 10 baris telah dibaca read_last_line

  • Melalui langkah di atas, data 10 baris terakhir telah dikira dan boleh dicetak. Anda boleh memasukkan data tambahan, tetapi pada masa ini kandungan fail mungkin telah berubah, dan kursor kami juga telah berubah, pada masa ini, kursor harus dilompat kembali ke kursor yang baru disimpan untuk mengelakkan hilang atau pencetakan data berulang.

    Setelah analisis selesai, anda boleh mula membina semula fungsi
import time
import sys

from typing import Callable, List, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

    def __call__(self, n: int = 10):
        with open(self.file_name) as f:
            self.read_last_line(f, n)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都会每次都打印新的一行
                else:
                    time.sleep(self.interval)

    def read_last_line(self, file, n):
        read_len: int = self.len_line * n

        # 跳转游标到最后
        file.seek(0, 2)
        # 获取当前结尾的游标位置
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来
                file.seek(0) # 由于read方法是按照游标进行打印, 所以要重置游标
                last_line_list: List[str] = file.read().split('\n')[-n:]
                # 重新获取游标位置
                now_tell: int = file.tell()
                break
            # 跳转到我们预估的字符位置
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                # 如果获取的行数大于要求的行数,则获取前n行的行数
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        # 重置游标,确保接下来打印的数据不重复
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--filename")
    parser.add_argument("-n", "--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))

3. Edisi ketiga - membaca fail log keluaran dengan elegan

.

Anda boleh mendapati bahawa prestasi logik bacaan masa nyata masih sangat lemah. Jika fail dibaca sekali sesaat, prestasi masa nyata akan menjadi terlalu perlahan , pemproses akan menduduki terlalu banyak Situasi prestasi terbaik adalah jika anda boleh mengetahui kemas kini fail dan kemudian mencetak fail, maka prestasi boleh dijamin Mujurlah, dalam Linux read_last_line menyediakan Fungsi ini fail log ialah ia akan dilogrotkan jika log diputarkan, maka kita perlu membuka semula fail dan seterusnya membaca data Dalam kes ini,

juga boleh digunakan Apabila

diperolehi dibuka semula, kami membuka semula fail dan membaca semula

import os
import sys

from typing import Callable, List, NoReturn

import pyinotify

multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF  # 监控多个事件


class InotifyEventHandler(pyinotify.ProcessEvent):  # 定制化事件处理类,注意继承
    """
    执行inotify event的封装
    """
    f: 'open()'
    filename: str
    path: str
    wm: 'pyinotify.WatchManager'
    output: Callable

    def my_init(self, **kargs):
        """pyinotify.ProcessEvent要求不能直接继承__init__, 而是要重写my_init, 我们重写这一段并进行初始化"""

        # 获取文件
        filename: str = kargs.pop('filename')
        if not os.path.exists(filename):
            raise RuntimeError('Not Found filename')
        if '/' not in filename:
            filename = os.getcwd() + '/' + filename
        index = filename.rfind('/')
        if index == len(filename) - 1 or index == -1:
            raise RuntimeError('Not a legal path')

        self.f = None
        self.filename = filename
        self.output: Callable = kargs.pop('output')
        self.wm = kargs.pop('wm')
        # 只监控路径,这样就能知道文件是否移动
        self.path = filename[:index]
        self.wm.add_watch(self.path, multi_event)

    def read_line(self):
        """统一的输出方法"""
        for line in self.f.readlines():
            self.output(line)

    def process_IN_MODIFY(self, event):
        """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生变化, 进行文件读取"""
        if event.pathname == self.filename:
            self.read_line()

    def process_IN_MOVE_SELF(self, event):
        """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生重新打开, 进行文件读取"""
        if event.pathname == self.filename:
            # 检测到文件被移动重新打开文件
            self.f.close()
            self.f = open(self.filename)
            self.read_line()

    def __enter__(self) -> 'InotifyEventHandler':
        self.f = open(self.filename)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.f.close()


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

        wm = pyinotify.WatchManager()  # 创建WatchManager对象
        inotify_event_handler = InotifyEventHandler(
            **dict(filename=file_name, wm=wm, output=output)
        )  # 实例化我们定制化后的事件处理类, 采用**dict传参数
        wm.add_watch('/tmp', multi_event)  # 添加监控的目录,及事件
        self.notifier = pyinotify.Notifier(wm, inotify_event_handler)  # 在notifier实例化时传入,notifier会自动执行
        self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler

    def __call__(self, n: int = 10):
        """通过inotify的with管理打开文件"""
        with self.inotify_event_handle as i:
            # 先读取指定的行数
            self.read_last_line(i.f, n)
            # 启用inotify的监听
            self.notifier.loop()

    def read_last_line(self, file, n):
        read_len: int = self.len_line * n

        # 获取当前结尾的游标位置
        file.seek(0, 2)
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来
                file.seek(0)
                last_line_list: List[str] = file.read().split('\n')[-n:]
                # 重新获取游标位置
                now_tell: int = file.tell()
                break
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                # 如果获取的行数大于要求的行数,则获取前n行的行数
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        # 重置游标,确保接下来打印的数据不重复
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--filename")
    parser.add_argument("-n", "--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))

Anda boleh melihat bahawa fail terbuka asal ditukar kepada menggunakan inotify untuk membuka fail (pada masa ini, kaedah my_init akan dipanggil untuk inisialisasi ), selepas dibuka, kami masih menjalankan n baris kod asal, dan kemudian memberikannya kepada inotify untuk dijalankan Sebelum inotify dijalankan, kami memasang kaedah fail pembukaan semula dan kaedah pencetakan fail dalam peristiwa yang sepadan dengan inotifyy, dan kemudian apabila inotify berjalan , kaedah yang sepadan akan dilaksanakan mengikut acara yang sepadan. inotify

Atas ialah kandungan terperinci Bagaimana untuk melengkapkan arahan ekor dengan Python. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam