Home >Backend Development >Python Tutorial >So practical! Schedule module, Python's periodic task artifact!
If you want to periodically execute a Python script on a Linux server, the most famous choice should be the Crontab script, but Crontab has the following disadvantages:
<strong>1. It is inconvenient to perform <strong>second-level tasks</strong>. </strong>
<strong> 2. When there are hundreds of scheduled tasks that need to be executed, Crontab The <strong> management will be particularly inconvenient</strong>. </strong>
Another option is Celery, but the configuration of Celery is more troublesome. If you just need a lightweight scheduling tool, Celery is not would be a good choice.
If you want to use a lightweight task scheduling tool, and hope that it is as simple and easy to use as possible without external dependencies, and preferably can accommodate all the basic functions of Crontab, then the Schedule module is for you the best choice.
Using it to schedule tasks may only require a few lines of code, give it a feel:
# Python 实用宝典 import schedule import time def job(): print("I'm working...") schedule.every(10).minutes.do(job) while True: schedule.run_pending() time.sleep(1)
The above code means that the job function is executed every 10 minutes, which is very simple and convenient. You only need to introduce the schedule module and call <strong>scedule.every(time number).Time type.do(job)</strong>
Release periodic tasks.
Periodic tasks after release need to be done using the ##run_pending<strong></strong>
function To detect whether it is executed, a While<strong></strong>
loop is required to continuously poll this function.
1. Preparation
Please choose any of the following methods to enter the command to install dependencies :
1. Open Cmd (Start-Run-CMD) in Windows environment.
2. MacOS environment Open Terminal (command space and enter Terminal).
3. If you are using VSCode editor or Pycharm, you can directly use the Terminal at the bottom of the interface.
pip install schedule
2. Basic usage
The most basic usage has been mentioned at the beginning of the article. Here are more examples of scheduling tasks:# Python 实用宝典 import schedule import time def job(): print("I'm working...") # 每十分钟执行任务 schedule.every(10).minutes.do(job) # 每个小时执行任务 schedule.every().hour.do(job) # 每天的10:30执行任务 schedule.every().day.at("10:30").do(job) # 每个月执行任务 schedule.every().monday.do(job) # 每个星期三的13:15分执行任务 schedule.every().wednesday.at("13:15").do(job) # 每分钟的第17秒执行任务 schedule.every().minute.at(":17").do(job) while True: schedule.run_pending() time.sleep(1)As you can see, the configuration from months to seconds is covered by the above example. However
If you want to run the task only once, you can configure it like this:
# Python 实用宝典 import schedule import time def job_that_executes_once(): # 此处编写的任务只会执行一次... return schedule.CancelJob schedule.every().day.at('22:30').do(job_that_executes_once) while True: schedule.run_pending() time.sleep(1)
Parameter passing
If you have parameters that need to be passed to the job To execute, you only need to do this:# Python 实用宝典 import schedule def greet(name): print('Hello', name) # do() 将额外的参数传递给job函数 schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob')
Get all current jobs
If you want to get all current jobs:# Python 实用宝典 import schedule def hello(): print('Hello world') schedule.every().second.do(hello) all_jobs = schedule.get_jobs()
Cancel all jobs
If some mechanism is triggered, you need to clear all jobs of the current program immediately:# Python 实用宝典 import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().second.do(greet) schedule.clear()
Tag function
When setting up a job, in order to facilitate the subsequent management of the job, you can tag the job, so that you can filter the job to obtain the job or cancel the job.# Python 实用宝典 import schedule def greet(name): print('Hello {}'.format(name)) # .tag 打标签 schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') # get_jobs(标签):可以获取所有该标签的任务 friends = schedule.get_jobs('friend') # 取消所有 daily-tasks 标签的任务 schedule.clear('daily-tasks')
Set homework deadline
If you need to make a homework due at a certain time, you can use this method:# Python 实用宝典 import schedule from datetime import datetime, timedelta, time def job(): print('Boo') # 每个小时运行作业,18:30后停止 schedule.every(1).hours.until("18:30").do(job) # 每个小时运行作业,2030-01-01 18:33 today schedule.every(1).hours.until("2030-01-01 18:33").do(job) # 每个小时运行作业,8个小时后停止 schedule.every(1).hours.until(timedelta(hours=8)).do(job) # 每个小时运行作业,11:32:42后停止 schedule.every(1).hours.until(time(11, 33, 42)).do(job) # 每个小时运行作业,2020-5-17 11:36:20后停止 schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)After the deadline, the job will not run.
Run all jobs immediately regardless of their schedule
如果某个机制触发了,你需要立即运行所有作业,可以调用 <strong>schedule.run_all()</strong>
:
# Python 实用宝典 import schedule def job_1(): print('Foo') def job_2(): print('Bar') schedule.every().monday.at("12:40").do(job_1) schedule.every().tuesday.at("16:40").do(job_2) schedule.run_all() # 立即运行所有作业,每次作业间隔10秒 schedule.run_all(delay_seconds=10)
3.高级使用
装饰器安排作业
如果你觉得设定作业这种形式太啰嗦了,也可以使用装饰器模式:
# Python 实用宝典 from schedule import every, repeat, run_pending import time # 此装饰器效果等同于 schedule.every(10).minutes.do(job) @repeat(every(10).minutes) def job(): print("I am a scheduled job") while True: run_pending() time.sleep(1)
并行执行
默认情况下,Schedule 按顺序执行所有作业。其背后的原因是,很难找到让每个人都高兴的并行执行模型。
不过你可以通过多线程的形式来运行每个作业以解决此限制:
# Python 实用宝典 import threading import time import schedule def job1(): print("I'm running on thread %s" % threading.current_thread()) def job2(): print("I'm running on thread %s" % threading.current_thread()) def job3(): print("I'm running on thread %s" % threading.current_thread()) def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10).seconds.do(run_threaded, job1) schedule.every(10).seconds.do(run_threaded, job2) schedule.every(10).seconds.do(run_threaded, job3) while True: schedule.run_pending() time.sleep(1)
日志记录
Schedule 模块同时也支持 logging 日志记录,这么使用:
# Python 实用宝典 import schedule import logging logging.basicConfig() schedule_logger = logging.getLogger('schedule') # 日志级别为DEBUG schedule_logger.setLevel(level=logging.DEBUG) def job(): print("Hello, Logs") schedule.every().second.do(job) schedule.run_all() schedule.clear()
效果如下:
DEBUG:schedule:Running *all* 1 jobs with 0s delay in between DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={}) Hello, Logs DEBUG:schedule:Deleting *all* jobs
异常处理
Schedule 不会自动捕捉异常,它遇到异常会直接抛出,这会导致一个严重的问题:后续所有的作业都会被中断执行,因此我们需要捕捉到这些异常。
你可以手动捕捉,但是某些你预料不到的情况需要程序进行自动捕获,加一个装饰器就能做到了:
# Python 实用宝典 import functools def catch_exceptions(cancel_on_failure=False): def catch_exceptions_decorator(job_func): @functools.wraps(job_func) def wrapper(*args, **kwargs): try: return job_func(*args, **kwargs) except: import traceback print(traceback.format_exc()) if cancel_on_failure: return schedule.CancelJob return wrapper return catch_exceptions_decorator @catch_exceptions(cancel_on_failure=True) def bad_task(): return 1 / 0 schedule.every(5).minutes.do(bad_task)
这样,<strong>bad_task</strong>
在执行时遇到的任何错误,都会被 <strong>catch_exceptions </strong>
捕获,这点在保证调度任务正常运转的时候非常关键。
The above is the detailed content of So practical! Schedule module, Python's periodic task artifact!. For more information, please follow other related articles on the PHP Chinese website!