如果你想在Linux伺服器上週期性地執行某個Python 腳本,最出名的選擇應該是Crontab 腳本,但是Crontab 有以下缺點:
<strong>#1.不方便執行<strong>秒級的任務</strong>。 </strong>
<strong>#2.當需要執行的計時任務有數百個的時候,Crontab的<strong>管理就會特別不方便</strong>。 </strong>
另一個選擇是Celery,但是Celery 的設定比較麻煩,如果你只是需要一個輕量級的調度工具,Celery 不會是個好選擇。
在你想要使用一個輕量級的任務排程工具,而且希望它盡量簡單、容易使用、不需要外部依賴,最好能夠容納Crontab 的所有基本功能,那麼Schedule 模組是你的不二之選。
使用它來調度任務可能只需要幾行程式碼,感受一下:
# Python 实用宝典 import schedule import time def job(): print("I'm working...") schedule.every(10).minutes.do(job) while True: schedule.run_pending() time.sleep(1)
上面的程式碼表示每10分鐘執行一次 job 函數,非常簡單方便。你只需要引入schedule 模組,透過呼叫 <strong>#scedule.every(時間數).時間類型.do(job)</strong>
## 發布週期任務。
發佈後的週期任務需要用 <strong>#run_pending</strong>
函數來偵測是否執行,因此需要一個 <strong>While</strong>
循環不斷地輪詢這個函數。
以下具體講Schedule模組的安裝與初級、進階使用方法。
1.準備
#請選擇下列任一種方式輸入指令安裝依賴# :
1. Windows 環境開啟Cmd (開始-執行-CMD)。
2. MacOS 環境 開啟 Terminal (command 空格輸入Terminal)。
3. 如果你用的是VSCode編輯器或Pycharm,可以直接使用介面下方的Terminal.
pip install schedule
#2.基本上使用
最基本的使用在文首已經提到過,下面給大家展示更多的調度任務範例:
# Python 实用宝典 import schedule import time def job(): print("I'm working...") # 每十分钟执行任务 schedule.every(10).minutes.do(job) # 每个小时执行任务 schedule.every().hour.do(job) # 每天的10:30执行任务 schedule.every().day.at("10:30").do(job) # 每个月执行任务 schedule.every().monday.do(job) # 每个星期三的13:15分执行任务 schedule.every().wednesday.at("13:15").do(job) # 每分钟的第17秒执行任务 schedule.every().minute.at(":17").do(job) while True: schedule.run_pending() time.sleep(1)
可以看到,從月到秒的配置,上面的例子都覆蓋到了。不過如果你想只執行一次任務的話,可以這麼配:
# Python 实用宝典 import schedule import time def job_that_executes_once(): # 此处编写的任务只会执行一次... return schedule.CancelJob schedule.every().day.at('22:30').do(job_that_executes_once) while True: schedule.run_pending() time.sleep(1)
參數傳遞
如果你有參數需要傳遞給作業去執行,你只需要這麼做:
# Python 实用宝典 import schedule def greet(name): print('Hello', name) # do() 将额外的参数传递给job函数 schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob')
取得目前所有的作業
#如果你想取得目前所有的作業:
# Python 实用宝典 import schedule def hello(): print('Hello world') schedule.every().second.do(hello) all_jobs = schedule.get_jobs()
取消所有作業
如果某些機制觸發了,你需要立即清除目前程式的所有作業:
# Python 实用宝典 import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().second.do(greet) schedule.clear()
標籤功能
#在設定作業的時候,為了後續方便管理作業,你可以給作業打個標籤,這樣你可以透過標籤過濾來取得作業或取消作業。
# Python 实用宝典 import schedule def greet(name): print('Hello {}'.format(name)) # .tag 打标签 schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') # get_jobs(标签):可以获取所有该标签的任务 friends = schedule.get_jobs('friend') # 取消所有 daily-tasks 标签的任务 schedule.clear('daily-tasks')
設定作業截止時間
如果你需要讓某個作業到某個時間截止,你可以透過這個方法:
# Python 实用宝典 import schedule from datetime import datetime, timedelta, time def job(): print('Boo') # 每个小时运行作业,18:30后停止 schedule.every(1).hours.until("18:30").do(job) # 每个小时运行作业,2030-01-01 18:33 today schedule.every(1).hours.until("2030-01-01 18:33").do(job) # 每个小时运行作业,8个小时后停止 schedule.every(1).hours.until(timedelta(hours=8)).do(job) # 每个小时运行作业,11:32:42后停止 schedule.every(1).hours.until(time(11, 33, 42)).do(job) # 每个小时运行作业,2020-5-17 11:36:20后停止 schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
截止日期之後,該作業將無法運作。
立即執行所有作業,而不管其排程如何
#如果某个机制触发了,你需要立即运行所有作业,可以调用 <strong>schedule.run_all()</strong>
:
# Python 实用宝典 import schedule def job_1(): print('Foo') def job_2(): print('Bar') schedule.every().monday.at("12:40").do(job_1) schedule.every().tuesday.at("16:40").do(job_2) schedule.run_all() # 立即运行所有作业,每次作业间隔10秒 schedule.run_all(delay_seconds=10)
3.高级使用
装饰器安排作业
如果你觉得设定作业这种形式太啰嗦了,也可以使用装饰器模式:
# Python 实用宝典 from schedule import every, repeat, run_pending import time # 此装饰器效果等同于 schedule.every(10).minutes.do(job) @repeat(every(10).minutes) def job(): print("I am a scheduled job") while True: run_pending() time.sleep(1)
并行执行
默认情况下,Schedule 按顺序执行所有作业。其背后的原因是,很难找到让每个人都高兴的并行执行模型。
不过你可以通过多线程的形式来运行每个作业以解决此限制:
# Python 实用宝典 import threading import time import schedule def job1(): print("I'm running on thread %s" % threading.current_thread()) def job2(): print("I'm running on thread %s" % threading.current_thread()) def job3(): print("I'm running on thread %s" % threading.current_thread()) def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10).seconds.do(run_threaded, job1) schedule.every(10).seconds.do(run_threaded, job2) schedule.every(10).seconds.do(run_threaded, job3) while True: schedule.run_pending() time.sleep(1)
日志记录
Schedule 模块同时也支持 logging 日志记录,这么使用:
# Python 实用宝典 import schedule import logging logging.basicConfig() schedule_logger = logging.getLogger('schedule') # 日志级别为DEBUG schedule_logger.setLevel(level=logging.DEBUG) def job(): print("Hello, Logs") schedule.every().second.do(job) schedule.run_all() schedule.clear()
效果如下:
DEBUG:schedule:Running *all* 1 jobs with 0s delay in between DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={}) Hello, Logs DEBUG:schedule:Deleting *all* jobs
异常处理
Schedule 不会自动捕捉异常,它遇到异常会直接抛出,这会导致一个严重的问题:后续所有的作业都会被中断执行,因此我们需要捕捉到这些异常。
你可以手动捕捉,但是某些你预料不到的情况需要程序进行自动捕获,加一个装饰器就能做到了:
# Python 实用宝典 import functools def catch_exceptions(cancel_on_failure=False): def catch_exceptions_decorator(job_func): @functools.wraps(job_func) def wrapper(*args, **kwargs): try: return job_func(*args, **kwargs) except: import traceback print(traceback.format_exc()) if cancel_on_failure: return schedule.CancelJob return wrapper return catch_exceptions_decorator @catch_exceptions(cancel_on_failure=True) def bad_task(): return 1 / 0 schedule.every(5).minutes.do(bad_task)
这样,<strong>bad_task</strong>
在执行时遇到的任何错误,都会被 <strong>catch_exceptions </strong>
捕获,这点在保证调度任务正常运转的时候非常关键。
以上是太實用了! Schedule模組, Python 週期任務神器!的詳細內容。更多資訊請關注PHP中文網其他相關文章!