ホームページ  >  記事  >  バックエンド開発  >  とても実用的です! Python の定期タスク アーティファクトであるスケジュール モジュール!

とても実用的です! Python の定期タスク アーティファクトであるスケジュール モジュール!

王林
王林転載
2023-04-12 14:01:081889ブラウズ

とても実用的です! Python の定期タスク アーティファクトであるスケジュール モジュール!

Linux サーバー上で Python スクリプトを定期的に実行する場合、最も有名な選択肢は Crontab スクリプトですが、Crontab には次の欠点があります。

<strong> 1. <strong>第 2 レベルのタスク </strong> を実行するのは不便です。 </strong>

<strong> 2. 実行する必要があるスケジュールされたタスクが数百ある場合、Crontab <strong> 管理者は特に不便になります </strong>。 </strong>

もう 1 つのオプションは Celery ですが、Celery の設定はさらに面倒です。軽量なスケジュール ツールが必要なだけの場合、Celery は適していません。良い選択になりますように。

軽量のタスク スケジュール ツールを使用し、外部依存関係なしにできるだけシンプルで使いやすく、できれば Crontab のすべての基本機能に対応できることを希望する場合は、スケジュール モジュールを使用します。あなたにとって最良の選択。

これを使用してタスクをスケジュールするには、数行のコードが必要なだけです。実際に試してみてください:

# Python 实用宝典
import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

上記のコードは、ジョブ関数が 10 分ごとに実行されることを意味します。これは非常に簡単です。そして便利です。スケジュール モジュールを導入し、 <strong> scedule.every(timenumber).Time type.do(job) </strong> を呼び出すだけです。 定期タスクをリリースします。

リリース後の定期的なタスクは、 ## run_pending <strong></strong> を使用して実行する必要があります。関数が実行されたかどうかを検出するには、継続的にポーリングする while <strong></strong> ループが必要ですこの機能。

以下では、スケジュール モジュールのインストールと基本的および高度な使用法について説明します。

1. 準備

次のいずれかの方法を選択して、依存関係をインストールするコマンドを入力してください。

1. Windows 環境で Cmd (Start-Run-CMD) を開きます。

2. MacOS 環境 ターミナルを開きます (コマンドスペースを入力してターミナルに入ります)。

#3. VSCode エディターまたは Pycharm を使用している場合は、インターフェイスの下部にあるターミナルを直接使用できます。

##2. 基本的な使用法

最も基本的な使用法については記事の冒頭で説明しましたが、ここではタスクのスケジュール設定の例をさらに示します:

pip install schedule
ご覧のとおり、数か月から数秒までの構成が上記の例でカバーされています。ただし、 タスクを 1 回だけ実行したい場合は 、次のように設定できます。
# Python 实用宝典
import schedule
import time

def job():
    print("I'm working...")

# 每十分钟执行任务
schedule.every(10).minutes.do(job)
# 每个小时执行任务
schedule.every().hour.do(job)
# 每天的10:30执行任务
schedule.every().day.at("10:30").do(job)
# 每个月执行任务
schedule.every().monday.do(job)
# 每个星期三的13:15分执行任务
schedule.every().wednesday.at("13:15").do(job)
# 每分钟的第17秒执行任务
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

パラメータの受け渡し

次のようなパラメータがある場合実行するには、これを実行するだけです:

# Python 实用宝典
import schedule
import time

def job_that_executes_once():
    # 此处编写的任务只会执行一次...
    return schedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

while True:
    schedule.run_pending()
    time.sleep(1)

Get all current jobs

すべての現在のジョブを取得したい場合:

# Python 实用宝典
import schedule

def greet(name):
    print('Hello', name)

# do() 将额外的参数传递给job函数
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')

すべてのジョブをキャンセル

何らかのメカニズムがトリガーされた場合は、現在のプログラムのすべてのジョブを直ちにクリアする必要があります:

# Python 实用宝典
import schedule

def hello():
    print('Hello world')

schedule.every().second.do(hello)

all_jobs = schedule.get_jobs()

タグfunction

ジョブを設定するときに、その後のジョブの管理を容易にするために、ジョブにタグを付けて、ジョブをフィルタリングしてジョブを取得したり、ジョブをキャンセルしたりできます。

# Python 实用宝典
import schedule

def greet(name):
    print('Hello {}'.format(name))

schedule.every().second.do(greet)

schedule.clear()

宿題の期限を設定する

宿題の期限を特定の時間にする必要がある場合は、次の方法を使用できます:

# Python 实用宝典
import schedule

def greet(name):
    print('Hello {}'.format(name))

# .tag 打标签
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

# get_jobs(标签):可以获取所有该标签的任务
friends = schedule.get_jobs('friend')

# 取消所有 daily-tasks 标签的任务
schedule.clear('daily-tasks')

期限切れの場合、ジョブは実行されません。

すべてのジョブをスケジュールに関係なくすぐに実行します

如果某个机制触发了,你需要立即运行所有作业,可以调用 <strong>​schedule.run_all()​</strong> :

# Python 实用宝典
import schedule

def job_1():
    print('Foo')

def job_2():
    print('Bar')

schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all()

# 立即运行所有作业,每次作业间隔10秒
schedule.run_all(delay_seconds=10)

3.高级使用

装饰器安排作业

如果你觉得设定作业这种形式太啰嗦了,也可以使用装饰器模式:

# Python 实用宝典
from schedule import every, repeat, run_pending
import time

# 此装饰器效果等同于 schedule.every(10).minutes.do(job)
@repeat(every(10).minutes)
def job():
    print("I am a scheduled job")

while True:
    run_pending()
    time.sleep(1)

并行执行

默认情况下,Schedule 按顺序执行所有作业。其背后的原因是,很难找到让每个人都高兴的并行执行模型。

不过你可以通过多线程的形式来运行每个作业以解决此限制:

# Python 实用宝典
import threading
import time
import schedule

def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)

while True:
    schedule.run_pending()
    time.sleep(1)

日志记录

Schedule 模块同时也支持 logging 日志记录,这么使用:

# Python 实用宝典
import schedule
import logging

logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
# 日志级别为DEBUG
schedule_logger.setLevel(level=logging.DEBUG)

def job():
    print("Hello, Logs")

schedule.every().second.do(job)

schedule.run_all()

schedule.clear()

效果如下:

DEBUG:schedule:Running *all* 1 jobs with 0s delay in between
DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={})
Hello, Logs
DEBUG:schedule:Deleting *all* jobs

异常处理

Schedule 不会自动捕捉异常,它遇到异常会直接抛出,这会导致一个严重的问题:后续所有的作业都会被中断执行,因此我们需要捕捉到这些异常。

你可以手动捕捉,但是某些你预料不到的情况需要程序进行自动捕获,加一个装饰器就能做到了:

# Python 实用宝典
import functools

def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator

@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0

schedule.every(5).minutes.do(bad_task)

这样,<strong>​bad_task​</strong> 在执行时遇到的任何错误,都会被 <strong>​catch_exceptions ​</strong> 捕获,这点在保证调度任务正常运转的时候非常关键。

以上がとても実用的です! Python の定期タスク アーティファクトであるスケジュール モジュール!の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事は51cto.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。