本篇文章為大家帶來了關於Python的相關知識,其中主要介紹了關於實現定時任務的相關問題,可以使用第三方包來管理定時任務,相對來說apscheduler使用起來更簡單,下面一起來看看使用的方法,希望對大家有幫助。
【相關推薦:Python3影片教學 】
來個簡單的例子看看apscheduler是如何使用的。
#encoding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def sch_test(): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, 测试apscheduler'.format(now)) task = BlockingScheduler() task.add_job(func=sch_test, trigger='cron', second='*/10') task.start()
上述範例很簡單,我們首先要定義一個apscheduler的對象,然後add_job加入任務,最後start開啟任務就行了。
範例是每隔10秒執行一次sch_test任務,運行結果如下:
时间:2022-10-08 15:16:30, 测试apscheduler 时间:2022-10-08 15:16:40, 测试apscheduler 时间:2022-10-08 15:16:50, 测试apscheduler 时间:2022-10-08 15:17:00, 测试apscheduler
如果我們要在執行任務函數時攜帶參數,只要在add_job函數中加入args就行,例如task .add_job(func=sch_test, args=('a'), trigger='cron', second='*/10')。
上面範例我們初步了解如何使用apschedulerl了,接下來需要知道apscheduler的設計架構。 apscheduler有四個主要模組,分別是:觸發器triggers、任務記憶體job_stores、執行器executors、調度器schedulers。
1. 觸發器triggers:
#觸發器指的是任務指定的觸發方式,例子中我們用的是“cron”方式。我們可以選擇cron、date、interval中的一個。
cron表示的是定時任務,類似linux crontab,在指定的時間觸發。
可用參數如下:
除此之外,我們也可用表達式類型去設定cron。例如常用的有:
使用方法範例,在每天7點20分執行一次:
task.add_job(func=sch_test, args=( '定時任務',), trigger='cron',
hour='7', minute='20')
date表示具體到某個時間的一次性任務;
使用方法範例:
# 使用run_date指定运行时间 task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30)) # 或者用next_run_time task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
interval表示的是循環任務,指定一個間隔時間,每過間隔時間執行一次。
interval可設定如下的參數:
使用方法範例,每隔3秒執行一次sch_test任務:
task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)。
來個例子把3種觸發器都使用一遍:
# encoding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, {}测试apscheduler'.format(now, job_type)) task = BlockingScheduler() task.add_job(func=sch_test, args=('一次性任务',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3)) task.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5') task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3) task.start()
列印部分結果:
时间:2022-10-08 15:45:49, 一次性任务测试apscheduler 时间:2022-10-08 15:45:49, 循环任务测试apscheduler 时间:2022-10-08 15:45:50, 定时任务测试apscheduler 时间:2022-10-08 15:45:52, 循环任务测试apscheduler 时间:2022-10-08 15:45:55, 定时任务测试apscheduler 时间:2022-10-08 15:45:55, 循环任务测试apscheduler 时间:2022-10-08 15:45:58, 循环任务测试apscheduler
透過程式碼範例和結果展示,我們可清晰的知道不同觸發器的使用差異。
2. 任務記憶體job_stores
#顧名思義,任務記憶體是儲存任務的地方,預設都是儲存在記憶體中。我們也可自訂儲存方式,例如將任務儲存到mysql。這裡有以下幾種選擇:
通常預設儲存在記憶體即可,但若程式故障重啟的話,會重新拉取任務運行了,如果你對任務的執行要求高,那麼可以選擇其他的記憶體。
使用SQLAlchemyJobStore記憶體範例:
from apscheduler.schedulers.blocking import BlockingScheduler def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, {}测试apscheduler'.format(now, job_type)) sched = BlockingScheduler() # 使用mysql存储任务 sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8' sched.add_jobstore('sqlalchemy',url=sql_url) # 添加任务 sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5') sched.start()
#3. 執行器executors
執行器的功能就是將任務放到執行緒池或進程池中運行。有以下幾種選擇:
預設是ThreadPoolExecutor, 常用的也就是第執行緒和進程池執行器。如果應用是CPU密集型操作,可用ProcessPoolExecutor來執行。
4. 調度器schedulers
調度器屬於apscheduler的核心,它扮演著統籌整個apscheduler系統的角色,記憶體、執行器、觸發器在它的調度下正常運作。調度器有以下幾個:
不是特定場景下,我們最常用的是BlockingScheduler調度器。
異常監聽
定時任務在執行時,若出現錯誤,需要設定監聽機制,我們通常會結合logging模組來記錄錯誤訊息。
使用範例:
from apscheduler.schedulers.blocking import BlockingScheduler import datetime from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR import logging # logging日志配置打印格式及保存位置 logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='sche.log', filemode='a') def log_listen(event): if event.exception : print ( '任务出错,报错信息:{}'.format(event.exception)) else: print ( '任务正常运行...' ) def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, {}测试apscheduler'.format(now, job_type)) print(1/0) sched = BlockingScheduler() # 使用mysql存储任务 sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8' sched.add_jobstore('sqlalchemy',url=sql_url) # 添加任务 sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5') # 配置任务执行完成及错误时的监听 sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) # 配置日志监听 sched._logger = logging sched.start()
apscheduler的封裝使用
#上面介绍了apscheduler框架的主要模块,我们基本能掌握怎样使用apscheduler了。下面就来封装一下apscheduler吧,以后要用直接在这份代码上修改就行了。
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR import logging import logging.handlers import os import datetime class LoggerUtils(): def init_logger(self, logger_name): # 日志格式 formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s') log_obj = logging.getLogger(logger_name) log_obj.setLevel(logging.INFO) # 设置log存储位置 path = '/data/logs/' filename = '{}{}.log'.format(path, logger_name) if not os.path.exists(path): os.makedirs(path) # 设置日志按照时间分割 timeHandler = logging.handlers.TimedRotatingFileHandler( filename, when='D', # 按照什么维度切割, S:秒,M:分,H:小时,D:天,W:周 interval=1, # 多少天切割一次 backupCount=10 # 保留几天 ) timeHandler.setLevel(logging.INFO) timeHandler.setFormatter(formatter) log_obj.addHandler(timeHandler) return log_obj class Scheduler(LoggerUtils): def __init__(self): # 执行器设置 executors = { 'default': ThreadPoolExecutor(10), # 设置一个名为“default”的ThreadPoolExecutor,其worker值为10 'processpool': ProcessPoolExecutor(5) # 设置一个名为“processpool”的ProcessPoolExecutor,其worker值为5 } self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors) # 存储器设置 # 这里使用sqlalchemy存储器,将任务存储在mysql sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8' self.scheduler.add_jobstore('sqlalchemy',url=sql_url) def log_listen(event): if event.exception: # 日志记录 self.scheduler._logger.error(event.traceback) # 配置任务执行完成及错误时的监听 self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) # 配置日志监听 self.scheduler._logger = self.init_logger('sche_test') def add_job(self, *args, **kwargs): """添加任务""" self.scheduler.add_job(*args, **kwargs) def start(self): """开启任务""" self.scheduler.start() # 测试任务 def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, {}测试apscheduler'.format(now, job_type)) print(1/0) # 添加任务,开启任务 sched = Scheduler() # 添加任务 sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5') # 开启任务 sched.start()
【相关推荐:Python3视频教程 】
以上是詳細解析Python實作定時任務之apscheduler的詳細內容。更多資訊請關注PHP中文網其他相關文章!