我們先來看下面這個案例,程式碼如下
import sched import time def say_hello(name): print(f"Hello, world, {name}") scheduler = sched.scheduler(time.time, time.sleep) scheduler.enter(5, 1, say_hello, ("张三", )) scheduler.run()
那麼上述的程式碼中,第一步首先則是實例化一個計時器,透過如下的程式碼
import sched scheduler = sched.scheduler()
接下來我們透過enter()
方法來執行定時任務的操作,其中的參數分別是延遲的時間、任務的優先權以及具體的執行函數和執行函數中的參數。像如上的程式碼就會在延遲5秒鐘之後執行say_hello()
函數
#當然要是延遲的時間相等的時候,我們可以設定任務執行的優先權來指定函數方法運行的順序,例如有如下的程式碼
import sched import time def say_hello(name): print(f"Hello, world, {name}") def say_hello_2(name): print(f"Hello, {name}") scheduler = sched.scheduler(time.time, time.sleep) scheduler.enter(5, 2, say_hello, ("张三", )) scheduler.enter(5, 1, say_hello_2, ("李四", )) scheduler.run()
如上述程式碼,儘管延遲的時間都是一樣的,但是say_hello()
方法的優先權明顯要比say_hello_2 ()
方法要低一些,因此後者會優先執行。
除了讓函數延遲執行,我們還可以讓其重複執行,具體這樣來操作,程式碼如下
import sched import time def say_hello(): print("Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) def repeat_task(): scheduler.enter(5, 1, say_hello, ()) scheduler.enter(5, 1, repeat_task, ()) repeat_task() scheduler.run()
這裡我們新建了一個repeat_task()
自訂函數,呼叫了scheduler.enter()
方法5秒鐘執行一次之前定義的say_hello()
函數
同時我們也可以讓任務在指定的時間執行,這裡用到scheduler.entertabs()
方法,程式碼如下
import sched import time def say_hello(): print("Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 指定时间执行任务 specific_time = time.time() + 5 # 距离现在的5秒钟之后执行 scheduler.enterabs(specific_time, 1, say_hello, ()) scheduler.run()
我們傳入其中參數使其在指定的時間,也就是距離當下的5秒鐘之後來執行任務
這裡仍然是呼叫enter()
方法來執行多個任務,程式碼如下
import sched import time def task_one(): print("Task One - Hello, world!") def task_two(): print("Task Two - Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 任务一在两秒钟只有执行 scheduler.enter(2, 1, task_one, ()) # 任务二在五秒钟之后运行 scheduler.enter(5, 1, task_two, ()) scheduler.run()
這裡定義了兩個函數,task_one
和task_two
裡面分數是同樣的執行邏輯,列印出「Hello , world!”,然後task_one()
是在兩秒鐘之後執行而task_two()
則是在5秒鐘之後執行,兩者執行的優先權都是一樣的。
這回我們給task_one()
和task_two()
賦予不同的優先權,看一看執行的結果如下
import sched import time def task_one(): print("Task One - Hello, world!") def task_two(): print("Task Two - Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 优先级是1 scheduler.enter(2, 2, task_one, ()) # 优先级是2 scheduler.enter(5, 1, task_two, ()) scheduler.run()
output
Task One - Hello, world!
Task Two - Hello, world!
#上述的程式碼會在停頓兩秒之後執行task_one()
函數,再停頓3秒之後執行task_two()
函數
我們給定時任務添加上取消的方法,程式碼如下
import sched import time def task_one(): print("Task One - Hello, world!") def task_two(): print("Task Two - Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 任务一在两秒钟只有执行 task_one_event = scheduler.enter(2, 1, task_one, ()) # 任务二在五秒钟之后运行 task_two_event = scheduler.enter(5, 1, task_two, ()) # 取消执行task_one scheduler.cancel(task_one_event) scheduler.run()
我們將兩秒鐘之後執行的task_one()
方法給取消掉,最後就只執行了task_two()
方法,也就列印出來「Task Two - Hello, world!」
我們來寫一個備份的腳本,在每天固定的時間將文件備份,程式碼如下
import sched import time import shutil def backup_files(): source = '路径/files' destination = '路径二' shutil.copytree(source, destination) def schedule_backup(): # 创建新的定时器 scheduler = sched.scheduler(time.time, time.sleep) # 备份程序在每天的1点来执行 backup_time = time.strptime('01:00:00', '%H:%M:%S') backup_event = scheduler.enterabs(time.mktime(backup_time), 1, backup_files, ()) # 开启定时任务 scheduler.run() schedule_backup()
我們透過shutil
模組當中的copytree()
方法來執行拷貝文件,然後在每天的1點準時執行
最後我們來執行定時分發郵件的程序,代碼如下
import sched import time import smtplib from email.mime.text import MIMEText def send_email(subject, message, from_addr, to_addr, smtp_server): # 邮件的主体信息 email = MIMEText(message) email['Subject'] = subject email['From'] = from_addr email['To'] = to_addr # 发邮件 with smtplib.SMTP(smtp_server) as server: server.send_message(email) def send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time): # 创建定时任务的示例 scheduler = sched.scheduler(time.time, time.sleep) # 定时邮件 scheduler.enterabs(scheduled_time, 1, send_email, argument=(subject, message, from_addr, to_addr, smtp_server)) # 开启定时器 scheduler.run() subject = 'Test Email' message = 'This is a test email' from_addr = 'test@example.com' to_addr = 'test@example.com' smtp_server = 'smtp.test.com' scheduled_time = time.time() + 60 # 一分钟之后执行程序 send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time)
以上是Python怎麼用sched模組實現定時任務的詳細內容。更多資訊請關注PHP中文網其他相關文章!