Home >Backend Development >Python Tutorial >How to use sched module in Python to implement scheduled tasks
Let’s take a look at the following case first. The code is as follows
import sched import time def say_hello(name): print(f"Hello, world, {name}") scheduler = sched.scheduler(time.time, time.sleep) scheduler.enter(5, 1, say_hello, ("张三", )) scheduler.run()
The first step in the above code is to instantiate a timer through the following code
import sched scheduler = sched.scheduler()
Next we use the enter()
method to perform the scheduled task operation. The parameters are the delay time, the priority of the task, the specific execution function and the execution function. parameter. Code like the above will be executed after a delay of 5 seconds say_hello()
Function
Of course, if the delay times are equal, we can set the priority of task execution to specify the function method The running sequence, for example, has the following code
import sched import time def say_hello(name): print(f"Hello, world, {name}") def say_hello_2(name): print(f"Hello, {name}") scheduler = sched.scheduler(time.time, time.sleep) scheduler.enter(5, 2, say_hello, ("张三", )) scheduler.enter(5, 1, say_hello_2, ("李四", )) scheduler.run()
Like the above code, although the delay time is the same, the priority of the say_hello()
method is obviously higher than say_hello_2 ()
The method is lower, so the latter will be executed first.
In addition to delaying the execution of the function, we can also make it execute repeatedly. Specifically, the code is as follows
import sched import time def say_hello(): print("Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) def repeat_task(): scheduler.enter(5, 1, say_hello, ()) scheduler.enter(5, 1, repeat_task, ()) repeat_task() scheduler.run()
Here we have created a new onerepeat_task()
Custom function, calling scheduler.enter()
method executes the previously defined say_hello()
function
At the same time, we can also let the task be executed at a specified time. Here we use the scheduler.entertabs()
method. The code is as follows
import sched import time def say_hello(): print("Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 指定时间执行任务 specific_time = time.time() + 5 # 距离现在的5秒钟之后执行 scheduler.enterabs(specific_time, 1, say_hello, ()) scheduler.run()
We pass in The parameters allow it to execute the task at the specified time, that is, 5 seconds from now.
Here we still call enter()
Method to run multiple tasks, the code is as follows
import sched import time def task_one(): print("Task One - Hello, world!") def task_two(): print("Task Two - Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 任务一在两秒钟只有执行 scheduler.enter(2, 1, task_one, ()) # 任务二在五秒钟之后运行 scheduler.enter(5, 1, task_two, ()) scheduler.run()
Two functions are defined here, task_one
and task_two
have the same execution logic, and print out "Hello , world!", then task_one()
is executed after two seconds and task_two()
is executed after 5 seconds. The priority of both executions is the same. of.
This time we give different priorities to task_one()
and task_two()
, Take a look at the execution results as follows
import sched import time def task_one(): print("Task One - Hello, world!") def task_two(): print("Task Two - Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 优先级是1 scheduler.enter(2, 2, task_one, ()) # 优先级是2 scheduler.enter(5, 1, task_two, ()) scheduler.run()
output
Task One - Hello, world!
Task Two - Hello, world!
The above code will run the task_one()
function after a pause of two seconds, and then execute the task_two()
function after a pause of 3 seconds.
We add a cancellation method to the scheduled task, the code is as follows
import sched import time def task_one(): print("Task One - Hello, world!") def task_two(): print("Task Two - Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 任务一在两秒钟只有执行 task_one_event = scheduler.enter(2, 1, task_one, ()) # 任务二在五秒钟之后运行 task_two_event = scheduler.enter(5, 1, task_two, ()) # 取消执行task_one scheduler.cancel(task_one_event) scheduler.run()
We cancel the task_one()
method that will be executed after two seconds, and finally only execute it The task_two()
method will print out "Task Two - Hello, world!"
Let's write a backup script every day Back up the file at a fixed time. The code is as follows
import sched import time import shutil def backup_files(): source = '路径/files' destination = '路径二' shutil.copytree(source, destination) def schedule_backup(): # 创建新的定时器 scheduler = sched.scheduler(time.time, time.sleep) # 备份程序在每天的1点来执行 backup_time = time.strptime('01:00:00', '%H:%M:%S') backup_event = scheduler.enterabs(time.mktime(backup_time), 1, backup_files, ()) # 开启定时任务 scheduler.run() schedule_backup()
We use the copytree()
method in the shutil
module to copy the file, and then at 1 o'clock every day on time Execute
Finally, we will execute the program for regularly distributing emails. The code is as follows
import sched import time import smtplib from email.mime.text import MIMEText def send_email(subject, message, from_addr, to_addr, smtp_server): # 邮件的主体信息 email = MIMEText(message) email['Subject'] = subject email['From'] = from_addr email['To'] = to_addr # 发邮件 with smtplib.SMTP(smtp_server) as server: server.send_message(email) def send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time): # 创建定时任务的示例 scheduler = sched.scheduler(time.time, time.sleep) # 定时邮件 scheduler.enterabs(scheduled_time, 1, send_email, argument=(subject, message, from_addr, to_addr, smtp_server)) # 开启定时器 scheduler.run() subject = 'Test Email' message = 'This is a test email' from_addr = 'test@example.com' to_addr = 'test@example.com' smtp_server = 'smtp.test.com' scheduled_time = time.time() + 60 # 一分钟之后执行程序 send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time)
The above is the detailed content of How to use sched module in Python to implement scheduled tasks. For more information, please follow other related articles on the PHP Chinese website!