Heim >Backend-Entwicklung >Python-Tutorial >So verwenden Sie das Sched-Modul in Python, um geplante Aufgaben zu implementieren

So verwenden Sie das Sched-Modul in Python, um geplante Aufgaben zu implementieren

WBOY
WBOYnach vorne
2023-04-18 14:38:172175Durchsuche

Schnelltest

Schauen wir uns zunächst den folgenden Fall an. Der Code lautet wie folgt:

import sched
import time

def say_hello(name):
    print(f"Hello, world, {name}")

scheduler = sched.scheduler(time.time, time.sleep)

scheduler.enter(5, 1, say_hello, ("张三", ))
scheduler.run()

Der erste Schritt im obigen Code besteht darin, einen Timer durch den folgenden Code zu instanziieren:

import sched

scheduler = sched.scheduler()

Dann übergeben wir enter (. ) Methode zum Ausführen der Ausführung geplanter Aufgaben. Die Parameter sind die Verzögerungszeit, die Priorität der Aufgabe, die spezifische Ausführungsfunktion und die Parameter in der Ausführungsfunktion. Code wie der obige führt die Funktion say_hello() nach einer Verzögerung von 5 Sekunden ausenter()方法来执行定时任务的操作,其中的参数分别是延迟的时间、任务的优先级以及具体的执行函数和执行函数中的参数。像如上的代码就会在延迟5秒钟之后执行say_hello()函数

当然要是延迟的时间相等的时候,我们可以设置任务执行的优先级来指定函数方法运行的顺序,例如有如下的代码

import sched
import time

def say_hello(name):
    print(f"Hello, world, {name}")

def say_hello_2(name):
    print(f"Hello, {name}")

scheduler = sched.scheduler(time.time, time.sleep)

scheduler.enter(5, 2, say_hello, ("张三", ))
scheduler.enter(5, 1, say_hello_2, ("李四", ))
scheduler.run()

如上述代码,尽管延迟的时间都是一样的,但是say_hello()方法的优先级明显要比say_hello_2()方法要低一些,因此后者会优先执行。

进阶使用

除了让函数延迟执行,我们还可以让其重复执行,具体这样来操作,代码如下

import sched
import time

def say_hello():
    print("Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

def repeat_task():
    scheduler.enter(5, 1, say_hello, ())
    scheduler.enter(5, 1, repeat_task, ())

repeat_task()
scheduler.run()

这里我们新建了一个repeat_task()自定义函数,调用了scheduler.enter()方法5秒钟执行一次之前定义的say_hello()函数

在固定时间执行任务

同时我们还可以让任务在指定的时间执行,这里用到scheduler.entertabs()方法,代码如下

import sched
import time

def say_hello():
    print("Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 指定时间执行任务
specific_time = time.time() + 5  # 距离现在的5秒钟之后执行
scheduler.enterabs(specific_time, 1, say_hello, ())

scheduler.run()

我们传入其中参数使其在指定的时间,也就是距离当下的5秒钟之后来执行任务

执行多个任务

这里仍然是调用enter()方法来运行多个任务,代码如下

import sched
import time

def task_one():
    print("Task One - Hello, world!")
    
def task_two():
    print("Task Two - Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 任务一在两秒钟只有执行
scheduler.enter(2, 1, task_one, ())

# 任务二在五秒钟之后运行
scheduler.enter(5, 1, task_two, ())

scheduler.run()

这里定义了两个函数,task_onetask_two里面分是同样的执行逻辑,打印出“Hello, world!”,然后task_one()是在两秒钟之后执行而task_two()则是在5秒钟之后执行,两者执行的优先级都是一样的。

以不同的优先级执行不同的任务

这回我们给task_one()task_two()赋予不同的优先级,看一看执行的结果如下

import sched
import time

def task_one():
    print("Task One - Hello, world!")
    
def task_two():
    print("Task Two - Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 优先级是1
scheduler.enter(2, 2, task_one, ())

# 优先级是2
scheduler.enter(5, 1, task_two, ())

scheduler.run()

output

Task One - Hello, world!
Task Two - Hello, world!

上述的代码会在停顿两秒之后运行task_one()函数,再停顿3秒之后执行task_two()函数

定时任务加上取消方法

我们给定时任务添加上取消的方法,代码如下

import sched
import time

def task_one():
    print("Task One - Hello, world!")
    
def task_two():
    print("Task Two - Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 任务一在两秒钟只有执行
task_one_event = scheduler.enter(2, 1, task_one, ())

# 任务二在五秒钟之后运行
task_two_event = scheduler.enter(5, 1, task_two, ())

# 取消执行task_one
scheduler.cancel(task_one_event)

scheduler.run()

我们将两秒钟之后执行的task_one()方法给取消掉,最后就只执行了task_two()方法,也就打印出来“Task Two - Hello, world!”

执行备份程序

我们来写一个备份的脚本,在每天固定的时间将文件备份,代码如下

import sched
import time
import shutil

def backup_files():
    source = '路径/files'
    destination = '路径二'
    shutil.copytree(source, destination)

def schedule_backup():
    # 创建新的定时器
    scheduler = sched.scheduler(time.time, time.sleep)

    # 备份程序在每天的1点来执行
    backup_time = time.strptime('01:00:00', '%H:%M:%S')
    backup_event = scheduler.enterabs(time.mktime(backup_time), 1, backup_files, ())

    # 开启定时任务
    scheduler.run()

schedule_backup()

我们通过shutil模块当中的copytree()

Wenn die Verzögerungszeiten gleich sind, können wir natürlich die Priorität der Aufgabenausführung festlegen, um die Reihenfolge festzulegen, in der ausgeführt wird Funktionsmethoden ausgeführt werden, gibt es beispielsweise den folgenden Code

import sched
import time
import smtplib
from email.mime.text import MIMEText

def send_email(subject, message, from_addr, to_addr, smtp_server):
    # 邮件的主体信息
    email = MIMEText(message)
    email['Subject'] = subject
    email['From'] = from_addr
    email['To'] = to_addr

    # 发邮件
    with smtplib.SMTP(smtp_server) as server:
        server.send_message(email)

def send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time):
    # 创建定时任务的示例
    scheduler = sched.scheduler(time.time, time.sleep)

    # 定时邮件
    scheduler.enterabs(scheduled_time, 1, send_email, argument=(subject, message, from_addr, to_addr, smtp_server))

    # 开启定时器
    scheduler.run()

subject = 'Test Email'
message = 'This is a test email'
from_addr = 'test@example.com'
to_addr = 'test@example.com'
smtp_server = 'smtp.test.com'

scheduled_time = time.time() + 60 # 一分钟之后执行程序
send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time)

Wie im obigen Code ist die Priorität der Methode say_hello() offensichtlich höher als die von say_hello_2() Die Methode ist niedriger, daher wird letztere zuerst ausgeführt.

Erweiterte Verwendung🎜🎜Zusätzlich zur Verzögerung der Ausführung der Funktion können wir sie auch wiederholt ausführen lassen. Konkret lautet der Code wie folgt:🎜rrreee🎜Hier haben wir einen neuen repeat_task() erstellt Benutzerdefinierte Funktion: Die Methode scheduler.enter() wird aufgerufen, um die zuvor definierte Funktion say_hello() auszuführen 🎜

Aufgaben zu einem festen Zeitpunkt ausführen

🎜 Gleichzeitig können wir die Aufgabe auch zu einem bestimmten Zeitpunkt ausführen lassen. Hier wird die Methode scheduler.entertabs() verwendet. Der Code lautet wie folgt: Wir übergeben die Parameter an Lassen Sie es zum angegebenen Zeitpunkt ausführen, der 5 Jahre vom aktuellen Zeitpunkt entfernt liegt. Führen Sie die Aufgabe nach Sekunden aus. 🎜

Führen Sie mehrere Aufgaben aus

🎜Hier rufen wir immer noch enter() auf Methode zum Ausführen mehrerer Aufgaben: 🎜rrree🎜Hier sind zwei Funktionen definiert: task_one und task_two haben die gleiche Ausführungslogik. wird ausgedruckt und dann wird task_one() in beiden Funktionen verwendet. Es wird nach Sekunden ausgeführt und task_two() wird nach 5 Sekunden ausgeführt beide sind gleich. 🎜

Führen Sie verschiedene Aufgaben mit unterschiedlichen Prioritäten aus

🎜Dieses Mal geben wir task_one() und task_two() unterschiedliche Prioritäten, siehe Das Ausführungsergebnis ist wie folgt folgt🎜rrreee🎜Ausgabe🎜
🎜Aufgabe Eins – Hallo, Welt!
Aufgabe Zwei – Hallo, Welt!🎜
🎜Der obige Code wird nach einer zweisekündigen Pause ausgeführttask_one( )-Funktion, warten Sie dann 3 Sekunden und führen Sie dann die Funktion task_two() aus🎜

Abbruchmethode zu geplanten Aufgaben hinzufügen

🎜Wir fügen Abbruchmethode zu geplanten Aufgaben hinzu, Der Code lautet wie folgt🎜rrreee🎜Wir brechen die Methode task_one() ab, die nach zwei Sekunden ausgeführt wird, und führen schließlich nur noch die Methode task_two() aus, also Drucken out „Aufgabe Zwei – Hallo Welt!“ 🎜🎜Führen Sie das Backup-Programm aus🎜🎜Lassen Sie uns ein Backup-Skript schreiben, um die Datei jeden Tag zu einem festen Zeitpunkt zu sichern. Der Code lautet wie folgt: 🎜rrreee🎜Wir verwenden shutil Die <code>copytree()-Methode im Modul führt die Kopierdatei aus und führt sie dann jeden Tag pünktlich um 1 Uhr aus🎜🎜Führen Sie das Programm zum regelmäßigen Verteilen von E-Mails aus🎜🎜Schließlich haben wir führt das Programm zum regelmäßigen Versenden von E-Mails aus. Der Code lautet wie folgt: rrreee

Das obige ist der detaillierte Inhalt vonSo verwenden Sie das Sched-Modul in Python, um geplante Aufgaben zu implementieren. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen