1.第一版--從檔案尾部讀取即時資料
主要想法是: 開啟檔案, 把指標移到檔案最後, 然後有資料則輸出資料, 無資料則休眠一段時間.
import time import sys from typing import Callable, NoReturn class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval def __call__(self): with open(self.file_name) as f: f.seek(0, 2) # 从文件结尾处开始seek while True: line: str = f.readline() if line: self.output(line) # 使用print都会每次都打印新的一行 else: time.sleep(self.interval) if __name__ == '__main__': filename: str = sys.argv[0] Tail(filename)()
之後只要做如下呼叫即可:
python xxx.py filename
2.第二版--實作tail -f
tail -f
預設先讀取最後10行資料,再從檔案尾部讀取即時資料.如果對於小檔案,可以先讀取所有檔案內容,並輸出最後10行,但是讀取全文再獲取最後10行的性能不高, 而從後滾10行的邊界條件也很複雜, 先看先讀取全文再獲取最後10行的實現:
import time import sys from typing import Callable, NoReturn class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval def __call__(self): with open(self.file_name) as f: self.read_last_line(f) while True: line: str = f.readline() if line: self.output(line) # 使用print都会每次都打印新的一行 else: time.sleep(self.interval) def read_last_line(self, f): last_lines = f.readlines()[-10:] for line in last_lines: self.output(line) if __name__ == '__main__': filename: str = sys.argv[0] Tail(filename)()
可以看到實現很簡單, 相比第一版只多了個read_last_line的函數
, 接下來就要解決性能的問題了, 當文件很大的時候, 這個邏輯是不行的, 特別是有些日誌檔常有幾個G大, 如果全讀出來內存就爆了. 而在Linux系統中, 沒有一個接口可以指定指針跳到倒數10行, 只能使用如下方法來模擬輸出倒數10行:
-
首先遊標跳到最新的字元, 儲存目前遊標, 然後預估一行資料的字元長度, 最好偏多, 這裡我按1024字元長度為一行來處理
然後利用seek的方法,跳到seek(-1024 * 10, 2)的字元, 這就是我們預估的倒數10行內的內容
#接著對內容進行判斷, 如果跳轉的字元長度小於10 * 1024, 則證明整個檔案沒有10行, 則採用原來的
read_last_line
方法.-
如果跳到字元長度等於1024 * 10, 則利用換行符計算已取字元長度共有多少行,如果行數大於10,那隻輸出最後10行,如果只讀了4行,則繼續讀6*1024,直到讀滿10行為止
透過以上步奏, 就把倒數10行的資料計算好了可以列印出來, 可以進入追加資料了, 但是這時候文件內容可能改變了, 我們的遊標也發生改變了, 這時候要把遊標跳回到剛才保存的遊標,防止漏打或者重複打印數據.
分析完畢後, 就可以開始重構read_last_line
函數了.
import time import sys from typing import Callable, List, NoReturn class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1, len_line: int = 1024 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval self.len_line: int = len_line def __call__(self, n: int = 10): with open(self.file_name) as f: self.read_last_line(f, n) while True: line: str = f.readline() if line: self.output(line) # 使用print都会每次都打印新的一行 else: time.sleep(self.interval) def read_last_line(self, file, n): read_len: int = self.len_line * n # 跳转游标到最后 file.seek(0, 2) # 获取当前结尾的游标位置 now_tell: int = file.tell() while True: if read_len > file.tell(): # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来 file.seek(0) # 由于read方法是按照游标进行打印, 所以要重置游标 last_line_list: List[str] = file.read().split('\n')[-n:] # 重新获取游标位置 now_tell: int = file.tell() break # 跳转到我们预估的字符位置 file.seek(-read_len, 2) read_str: str = file.read(read_len) cnt: int = read_str.count('\n') if cnt >= n: # 如果获取的行数大于要求的行数,则获取前n行的行数 last_line_list: List[str] = read_str.split('\n')[-n:] break else: # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取 if cnt == 0: line_per: int = read_len else: line_per: int = int(read_len / cnt) read_len = line_per * n for line in last_line_list: self.output(line + '\n') # 重置游标,确保接下来打印的数据不重复 file.seek(now_tell) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument("-f", "--filename") parser.add_argument("-n", "--num", default=10) args, unknown = parser.parse_known_args() if not args.filename: raise RuntimeError('filename args error') Tail(args.filename)(int(args.num))
3.第三版--優雅的讀取輸出日誌檔案
可以發現即時讀取那區塊的邏輯效能還是很差, 如果每秒讀一次文件,實時性就太慢了,把間隔改小了,則處理器佔用太多. 性能最好的情況是如果能得知文件更新再進行打印文件, 那性能就能得到保障了.慶幸的是,在Linux中inotify
提供了這樣的功能. 此外,日誌文件有一個特點就是會進行logrotate,如果日誌被logrotate了,那我們就需要重新打開文件,並進一步讀取資料, 這種情況也可以利用到inotify
, 當inotify
取得到檔案重新開啟的事件時,我們就重新開啟檔案,再讀取.
import os import sys from typing import Callable, List, NoReturn import pyinotify multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF # 监控多个事件 class InotifyEventHandler(pyinotify.ProcessEvent): # 定制化事件处理类,注意继承 """ 执行inotify event的封装 """ f: 'open()' filename: str path: str wm: 'pyinotify.WatchManager' output: Callable def my_init(self, **kargs): """pyinotify.ProcessEvent要求不能直接继承__init__, 而是要重写my_init, 我们重写这一段并进行初始化""" # 获取文件 filename: str = kargs.pop('filename') if not os.path.exists(filename): raise RuntimeError('Not Found filename') if '/' not in filename: filename = os.getcwd() + '/' + filename index = filename.rfind('/') if index == len(filename) - 1 or index == -1: raise RuntimeError('Not a legal path') self.f = None self.filename = filename self.output: Callable = kargs.pop('output') self.wm = kargs.pop('wm') # 只监控路径,这样就能知道文件是否移动 self.path = filename[:index] self.wm.add_watch(self.path, multi_event) def read_line(self): """统一的输出方法""" for line in self.f.readlines(): self.output(line) def process_IN_MODIFY(self, event): """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生变化, 进行文件读取""" if event.pathname == self.filename: self.read_line() def process_IN_MOVE_SELF(self, event): """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生重新打开, 进行文件读取""" if event.pathname == self.filename: # 检测到文件被移动重新打开文件 self.f.close() self.f = open(self.filename) self.read_line() def __enter__(self) -> 'InotifyEventHandler': self.f = open(self.filename) return self def __exit__(self, exc_type, exc_val, exc_tb): self.f.close() class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1, len_line: int = 1024 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval self.len_line: int = len_line wm = pyinotify.WatchManager() # 创建WatchManager对象 inotify_event_handler = InotifyEventHandler( **dict(filename=file_name, wm=wm, output=output) ) # 实例化我们定制化后的事件处理类, 采用**dict传参数 wm.add_watch('/tmp', multi_event) # 添加监控的目录,及事件 self.notifier = pyinotify.Notifier(wm, inotify_event_handler) # 在notifier实例化时传入,notifier会自动执行 self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler def __call__(self, n: int = 10): """通过inotify的with管理打开文件""" with self.inotify_event_handle as i: # 先读取指定的行数 self.read_last_line(i.f, n) # 启用inotify的监听 self.notifier.loop() def read_last_line(self, file, n): read_len: int = self.len_line * n # 获取当前结尾的游标位置 file.seek(0, 2) now_tell: int = file.tell() while True: if read_len > file.tell(): # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来 file.seek(0) last_line_list: List[str] = file.read().split('\n')[-n:] # 重新获取游标位置 now_tell: int = file.tell() break file.seek(-read_len, 2) read_str: str = file.read(read_len) cnt: int = read_str.count('\n') if cnt >= n: # 如果获取的行数大于要求的行数,则获取前n行的行数 last_line_list: List[str] = read_str.split('\n')[-n:] break else: # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取 if cnt == 0: line_per: int = read_len else: line_per: int = int(read_len / cnt) read_len = line_per * n for line in last_line_list: self.output(line + '\n') # 重置游标,确保接下来打印的数据不重复 file.seek(now_tell) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument("-f", "--filename") parser.add_argument("-n", "--num", default=10) args, unknown = parser.parse_known_args() if not args.filename: raise RuntimeError('filename args error') Tail(args.filename)(int(args.num))
可以看到, 從原本的open打開文件改為用inotify打開文件(這時候會調用my_init方法進行初始化), 打開後還是運行我們打開原來n行的代碼, 然後就交給inotify運行. 在inotify運行之前, 我們把重新打開文件方法和打印文件方法都掛載在inotifiy對應的事件裡, 之後inotify運行時, 會根據對應的事件執行對應的方法。
以上是如何用Python完成tail指令的詳細內容。更多資訊請關注PHP中文網其他相關文章!

Arraysinpython,尤其是Vianumpy,ArecrucialInsCientificComputingfortheireftheireffertheireffertheirefferthe.1)Heasuedfornumerericalicerationalation,dataAnalysis和Machinelearning.2)Numpy'Simpy'Simpy'simplementIncressionSressirestrionsfasteroperoperoperationspasterationspasterationspasterationspasterationspasterationsthanpythonlists.3)inthanypythonlists.3)andAreseNableAblequick

你可以通過使用pyenv、venv和Anaconda來管理不同的Python版本。 1)使用pyenv管理多個Python版本:安裝pyenv,設置全局和本地版本。 2)使用venv創建虛擬環境以隔離項目依賴。 3)使用Anaconda管理數據科學項目中的Python版本。 4)保留系統Python用於系統級任務。通過這些工具和策略,你可以有效地管理不同版本的Python,確保項目順利運行。

numpyarrayshaveseveraladagesoverandastardandpythonarrays:1)基於基於duetoc的iMplation,2)2)他們的aremoremoremorymorymoremorymoremorymoremorymoremoremory,尤其是WithlargedAtasets和3)效率化,效率化,矢量化函數函數函數函數構成和穩定性構成和穩定性的操作,製造

數組的同質性對性能的影響是雙重的:1)同質性允許編譯器優化內存訪問,提高性能;2)但限制了類型多樣性,可能導致效率低下。總之,選擇合適的數據結構至關重要。

到CraftCraftExecutablePythcripts,lollow TheSebestPractices:1)Addashebangline(#!/usr/usr/bin/envpython3)tomakethescriptexecutable.2)setpermissionswithchmodwithchmod xyour_script.3)

numpyArraysareAreBetterFornumericalialoperations andmulti-demensionaldata,而learthearrayModuleSutableforbasic,內存效率段

numpyArraySareAreBetterForHeAvyNumericalComputing,而lelethearRayModulesiutable-usemoblemory-connerage-inderabledsswithSimpleDatateTypes.1)NumpyArsofferVerverVerverVerverVersAtility andPerformanceForlargedForlargedAtatasetSetsAtsAndAtasEndCompleXoper.2)

ctypesallowscreatingingangandmanipulatingc-stylarraysinpython.1)usectypestoInterfacewithClibrariesForperfermance.2)createc-stylec-stylec-stylarraysfornumericalcomputations.3)passarraystocfunctions foreforfunctionsforeffortions.however.however,However,HoweverofiousofmemoryManageManiverage,Pressiveo,Pressivero


熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

Dreamweaver Mac版
視覺化網頁開發工具

VSCode Windows 64位元 下載
微軟推出的免費、功能強大的一款IDE編輯器

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

Safe Exam Browser
Safe Exam Browser是一個安全的瀏覽器環境,安全地進行線上考試。該軟體將任何電腦變成一個安全的工作站。它控制對任何實用工具的訪問,並防止學生使用未經授權的資源。

Dreamweaver CS6
視覺化網頁開發工具