Heim >Backend-Entwicklung >Python-Tutorial >So praktisch! Zeitplanmodul, Pythons periodisches Aufgabenartefakt!

So praktisch! Zeitplanmodul, Pythons periodisches Aufgabenartefakt!

王林
王林nach vorne
2023-04-12 14:01:082095Durchsuche

So praktisch! Zeitplanmodul, Pythons periodisches Aufgabenartefakt!

Wenn Sie regelmäßig ein Python-Skript auf einem Linux-Server ausführen möchten, sollte die bekannteste Wahl das Crontab-Skript sein, aber Crontab hat die folgenden Nachteile: #🎜🎜 #

<strong>​1.不方便执行<strong>秒级的任务</strong>。​</strong> 

<strong>​2.当需要执行的定时任务有上百个的时候,Crontab的<strong>管理就会特别不方便</strong>。​</strong> 

另外一个选择是 Celery,但是 Celery 的配置比较麻烦,如果你只是需要一个轻量级的调度工具,Celery 不会是一个好选择。

在你想要使用一个轻量级的任务调度工具,而且希望它尽量简单、容易使用、不需要外部依赖,最好能够容纳 Crontab 的所有基本功能,那么 Schedule 模块是你的不二之选。

使用它来调度任务可能只需要几行代码,感受一下:

# Python 实用宝典
import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

上面的代码表示每10分钟执行一次 job 函数,非常简单方便。你只需要引入 schedule 模块,通过调用 <strong>​scedule.every(时间数).时间类型.do(job)​</strong>  发布周期任务。

发布后的周期任务需要用 <strong>​run_pending​</strong> 函数来检测是否执行,因此需要一个 <strong>​While​</strong>

pip install schedule Der obige Code bedeutet, dass die Jobfunktion alle 10 Minuten ausgeführt wird sehr einfach und bequem. Sie müssen das Zeitplanmodul nur einführen, indem Sie

<span style="color: #666666;">​scedule.every(time number) . Time type.do(job)​</span>

Regelmäßige Aufgaben freigeben.

Regelmäßige Aufgaben nach der Veröffentlichung müssen mit

<strong> ​run_pending​ </strong>

Funktion zum Erkennen, ob ausgeführt werden soll, also ein

<strong>​While​</strong>

Die Schleife fragt diese Funktion kontinuierlich ab.

Im Folgenden geht es um die Installation sowie die grundlegende und erweiterte Verwendung des Zeitplanmoduls.

1.Vorbereitung

Bitte wählen Sie die aus Geben Sie folgenden Befehl auf beliebige Weise ein, um die Abhängigkeiten zu installieren

:

1. Öffnen Sie Cmd (Start-Ausführen-CMD) in der Windows-Umgebung.

2. Öffnen Sie das Terminal (Befehl+Leertaste, um das Terminal aufzurufen).

3. Wenn Sie den VSCode-Editor oder Pycharm verwenden, können Sie direkt das Terminal am unteren Rand der Benutzeroberfläche verwenden.

# Python 实用宝典
import schedule
import time

def job():
    print("I'm working...")

# 每十分钟执行任务
schedule.every(10).minutes.do(job)
# 每个小时执行任务
schedule.every().hour.do(job)
# 每天的10:30执行任务
schedule.every().day.at("10:30").do(job)
# 每个月执行任务
schedule.every().monday.do(job)
# 每个星期三的13:15分执行任务
schedule.every().wednesday.at("13:15").do(job)
# 每分钟的第17秒执行任务
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)
# 🎜🎜#

2. Grundlegende Verwendung

#🎜🎜#Die grundlegendste Verwendung wurde am Anfang erwähnt Im folgenden Artikel möchte ich Ihnen weitere Beispiele für Planungsaufgaben zeigen: #🎜🎜#
# Python 实用宝典
import schedule
import time

def job_that_executes_once():
    # 此处编写的任务只会执行一次...
    return schedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

while True:
    schedule.run_pending()
    time.sleep(1)
#🎜🎜# Wie Sie sehen, wird die Konfiguration von Monaten auf Sekunden durch die obigen Beispiele abgedeckt. Allerdings #🎜🎜# Wenn Sie die Aufgabe nur einmal #🎜🎜# ausführen möchten, können Sie sie wie folgt konfigurieren: #🎜🎜#
# Python 实用宝典
import schedule

def greet(name):
    print('Hello', name)

# do() 将额外的参数传递给job函数
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
#🎜🎜##🎜🎜#Parameterübergabe#🎜🎜##🎜🎜## 🎜🎜#Wenn Sie Parameter haben, die zur Ausführung an den Job übergeben werden müssen, müssen Sie nur Folgendes tun: #🎜🎜#
# Python 实用宝典
import schedule

def hello():
    print('Hello world')

schedule.every().second.do(hello)

all_jobs = schedule.get_jobs()
#🎜🎜##🎜🎜#Alle aktuellen Jobs abrufen #🎜🎜##🎜🎜# #🎜🎜#Wenn Sie alle aktuellen Jobs erhalten möchten: #🎜🎜#
# Python 实用宝典
import schedule

def greet(name):
    print('Hello {}'.format(name))

schedule.every().second.do(greet)

schedule.clear()
#🎜🎜##🎜🎜#Alle Jobs abbrechen #🎜🎜##🎜🎜##🎜🎜# Wenn ein Mechanismus ausgelöst wird, müssen Sie dies tun Alle Jobs des aktuellen Programms sofort löschen: #🎜🎜#
# Python 实用宝典
import schedule

def greet(name):
    print('Hello {}'.format(name))

# .tag 打标签
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

# get_jobs(标签):可以获取所有该标签的任务
friends = schedule.get_jobs('friend')

# 取消所有 daily-tasks 标签的任务
schedule.clear('daily-tasks')
#🎜🎜##🎜🎜#Tag-Funktion#🎜🎜##🎜🎜##🎜🎜#Beim Einrichten einer Aufgabe, für die Bequemlichkeit der späteren Verwaltung, Sie Sie können die Aufgabe mit Tags versehen, sodass Sie nach Tags gefilterte Jobs abrufen oder Jobs abbrechen können. #🎜🎜#
# Python 实用宝典
import schedule
from datetime import datetime, timedelta, time

def job():
    print('Boo')

# 每个小时运行作业,18:30后停止
schedule.every(1).hours.until("18:30").do(job)

# 每个小时运行作业,2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)

# 每个小时运行作业,8个小时后停止
schedule.every(1).hours.until(timedelta(hours=8)).do(job)

# 每个小时运行作业,11:32:42后停止
schedule.every(1).hours.until(time(11, 33, 42)).do(job)

# 每个小时运行作业,2020-5-17 11:36:20后停止
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
#🎜🎜##🎜🎜#Aufgabenfrist festlegen#🎜🎜##🎜🎜##🎜🎜#Wenn Sie eine Aufgabe zu einem bestimmten Zeitpunkt fällig machen müssen, können Sie diese Methode verwenden:# 🎜🎜 #
# Python 实用宝典
import schedule

def job_1():
    print('Foo')

def job_2():
    print('Bar')

schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all()

# 立即运行所有作业,每次作业间隔10秒
schedule.run_all(delay_seconds=10)
#🎜🎜#Der Job wird nach Ablauf der Frist nicht ausgeführt. #🎜🎜##🎜🎜##🎜🎜#Alle Jobs sofort ausführen, unabhängig von ihrem Zeitplan #🎜🎜##🎜🎜#

如果某个机制触发了,你需要立即运行所有作业,可以调用 <strong>​schedule.run_all()​</strong> :

# Python 实用宝典
import schedule

def job_1():
    print('Foo')

def job_2():
    print('Bar')

schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all()

# 立即运行所有作业,每次作业间隔10秒
schedule.run_all(delay_seconds=10)

3.高级使用

装饰器安排作业

如果你觉得设定作业这种形式太啰嗦了,也可以使用装饰器模式:

# Python 实用宝典
from schedule import every, repeat, run_pending
import time

# 此装饰器效果等同于 schedule.every(10).minutes.do(job)
@repeat(every(10).minutes)
def job():
    print("I am a scheduled job")

while True:
    run_pending()
    time.sleep(1)

并行执行

默认情况下,Schedule 按顺序执行所有作业。其背后的原因是,很难找到让每个人都高兴的并行执行模型。

不过你可以通过多线程的形式来运行每个作业以解决此限制:

# Python 实用宝典
import threading
import time
import schedule

def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)

while True:
    schedule.run_pending()
    time.sleep(1)

日志记录

Schedule 模块同时也支持 logging 日志记录,这么使用:

# Python 实用宝典
import schedule
import logging

logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
# 日志级别为DEBUG
schedule_logger.setLevel(level=logging.DEBUG)

def job():
    print("Hello, Logs")

schedule.every().second.do(job)

schedule.run_all()

schedule.clear()

效果如下:

DEBUG:schedule:Running *all* 1 jobs with 0s delay in between
DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={})
Hello, Logs
DEBUG:schedule:Deleting *all* jobs

异常处理

Schedule 不会自动捕捉异常,它遇到异常会直接抛出,这会导致一个严重的问题:后续所有的作业都会被中断执行,因此我们需要捕捉到这些异常。

你可以手动捕捉,但是某些你预料不到的情况需要程序进行自动捕获,加一个装饰器就能做到了:

# Python 实用宝典
import functools

def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator

@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0

schedule.every(5).minutes.do(bad_task)

这样,<strong>​bad_task​</strong> 在执行时遇到的任何错误,都会被 <strong>​catch_exceptions ​</strong> 捕获,这点在保证调度任务正常运转的时候非常关键。

Das obige ist der detaillierte Inhalt vonSo praktisch! Zeitplanmodul, Pythons periodisches Aufgabenartefakt!. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:51cto.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen