安裝apscheduler 模組
pip install apscheduler
#APScheduler( Advanced Python Scheduler)是一個輕量級的Python定時任務排程框架(Python函式庫)。
APScheduler有三個內建的調度系統,其中包括:
cron式調度(可選開始/結束時間)
基於間隔的執行(以偶數間隔運行作業,也可以選擇開始/結束時間)
#一次延遲執行任務(在指定的日期/時間內執行一次作業)
APScheduler可以任意混合和匹配調度系統和作業儲存的後端,其中支援後端儲存作業包括:
Memory
SQLAlchemy
MongoDB
Redis
RethinkDB
ZooKeeper
#triggers(觸發器)中包含調度邏輯,每個作業都由自己的觸發器來決定下次運行時間。除了他們自己初始配置意外,觸發器完全是無狀態的。
job stores(作業記憶體)儲存已排程的作業,預設的作業記憶體只是簡單地把作業保存在記憶體中,其他的作業記憶體則是將作業儲存在資料庫中。當作業被儲存到一個持久的作業記憶體中的時候,該作業的資料會被序列化,並在載入時被反序列化。作業記憶體不能共用調度器。
executors(執行器)處理作業的運行,他們通常透過在作業中提交指定的可調用物件到一個執行緒或進程池來進行。當作業完成時,執行器將會通知調度器。
schedulers(調度器)配置作業記憶體和執行器可以在調度器中完成,例如新增、修改和移除作業。根據不同的應用場景可以選用不同的調度器,可選的有BlockingScheduler,BackgroundScheduler,AsyncIOScheduler,GeventScheduler,TornadoScheduler,TwistedScheduler,QtScheduler 7種。
觸發器
當你排程作業的時候,你需要為這個作業選擇一個觸發器,用來描述這個作業何時被觸發,APScheduler有三種內建的觸發器類型:
date: 一次指定日期;
interval: 在某個時間範圍內間隔多久執行一次;
cron :Linux crontab格式相容,最強大。
date 最基本的一種調度,作業只會執行一次。它的參數如下:
1.run_date
(datetime|str) – 作業的運作日期或時間
2.timezone
(datetime.tzinfo|str) &ndash ; 指定時區
作業記憶體
如果你的應用程式在每次啟動的時候都會重新建立作業,那麼使用預設的作業記憶體(MemoryJobStore)即可,但是如果你需要在調度器重新啟動或應用程式奔潰的情況下任然保留作業,你應該根據你的應用程式環境來選擇特定的作業記憶體。例如:使用Mongo或SQLAlchemy JobStore (用於支援大多數RDBMS)
執行器
對執行器的選擇取決於你使用上面哪些框架,大多數情況下,使用預設的ThreadPoolExecutor已經能夠滿足需求。如果你的應用程式涉及CPU密集型操作,你可以考慮使用ProcessPoolExecutor來使用更多的CPU核心。你也可以同時使用兩者,將ProcessPoolExecutor當作第二致動器。
選擇適當的調度器
BlockingScheduler : 當調度器是你應用程式中唯一要執行的東西時
#BackgroundScheduler : 當你沒有執行任何其他框架並希望調度器在你應用的背景執行時使用。
AsyncIOScheduler : 當你的程式使用了asyncio(一個非同步框架)的時候使用。
GeventScheduler : 當你的程式使用了gevent(高效能的Python並發框架)的時候使用。
TornadoScheduler : 當你的程式是基於Tornado(一個網頁框架)的時候使用。
TwistedScheduler : 當你的程式使用了Twisted(一個非同步框架)的時候使用
# -*- coding:utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler import datetime def my_job1(): print('my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) def my_job2(): print('my_job2 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) sched = BlockingScheduler() # 每隔5秒运行一次my_job1 sched.add_job(my_job1, 'interval', seconds=5, id='my_job1') # 每隔5秒运行一次my_job2 sched.add_job(my_job2, 'cron', second='*/5', id='my_job2') sched.start()裝飾器模式新增作業。
# -*- coding:utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler import datetime sched = BlockingScheduler() # 每隔5秒运行一次my_job1 @sched.scheduled_job('interval', seconds=5, id='my_job1') def my_job1(): print('my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 每隔5秒运行一次my_job2 @sched.scheduled_job('cron', second='*/5', id='my_job2') def my_job2(): print('my_job2 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) sched.start()
移除作業
沒有移除作業# -*- coding:utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler import datetime def my_job(text=""): print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) sched = BlockingScheduler() job = sched.add_job(my_job, 'interval', seconds=2, args=['第一个作业']) # #如果有多个任务序列的话可以给每个任务设置ID号,可以根据ID号选择清除对象,且remove放到start前才有效 sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二个作业']) sched.start()程式碼執行結果:
使用remove() 移除作业
# -*- coding:utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler import datetime def my_job(text=""): print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) sched = BlockingScheduler() job = sched.add_job(my_job, 'interval', seconds=2, args=['第一个作业']) job.remove() # #如果有多个任务序列的话可以给每个任务设置ID号,可以根据ID号选择清除对象,且remove放到start前才有效 sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二个作业']) sched.start()
代码执行结果:
使用remove_job()移除作业
# -*- coding:utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler import datetime def my_job(text=""): print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) sched = BlockingScheduler() job = sched.add_job(my_job, 'interval', seconds=2, args=['第一个作业']) # #如果有多个任务序列的话可以给每个任务设置ID号,可以根据ID号选择清除对象,且remove放到start前才有效 sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二个作业']) sched.remove_job('my_job_id') sched.start()
代码执行结果:
APScheduler有3中内置的触发器类型:
新建一个调度器(scheduler);
添加一个调度任务(job store);
运行调度任务。
代码实现
# -*- coding:utf-8 -*- import time import datetime from apscheduler.schedulers.blocking import BlockingScheduler def my_job(text="默认值"): print(text, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) sched = BlockingScheduler() sched.add_job(my_job, 'interval', seconds=3, args=['3秒定时']) # 2018-3-17 00:00:00 执行一次,args传递一个text参数 sched.add_job(my_job, 'date', run_date=datetime.date(2019, 10, 17), args=['根据年月日定时执行']) # 2018-3-17 13:46:00 执行一次,args传递一个text参数 sched.add_job(my_job, 'date', run_date=datetime.datetime(2019, 10, 17, 14, 10, 0), args=['根据年月日时分秒定时执行']) # sched.start() """ interval 间隔调度,参数如下: weeks (int) – 间隔几周 days (int) – 间隔几天 hours (int) – 间隔几小时 minutes (int) – 间隔几分钟 seconds (int) – 间隔多少秒 start_date (datetime|str) – 开始日期 end_date (datetime|str) – 结束日期 timezone (datetime.tzinfo|str) – 时区 """ """ cron参数如下: year (int|str) – 年,4位数字 month (int|str) – 月 (范围1-12) day (int|str) – 日 (范围1-31) week (int|str) – 周 (范围1-53) day_of_week (int|str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun) hour (int|str) – 时 (范围0-23) minute (int|str) – 分 (范围0-59) second (int|str) – 秒 (范围0-59) start_date (datetime|str) – 最早开始日期(包含) end_date (datetime|str) – 最晚结束时间(包含) timezone (datetime.tzinfo|str) – 指定时区 """ # my_job将会在6,7,8,11,12月的第3个周五的1,2,3点运行 sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # 截止到2018-12-30 00:00:00,每周一到周五早上五点半运行job_function sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2018-12-31') # 表示2017年3月22日17时19分07秒执行该程序 sched.add_job(my_job, 'cron', year=2017, month=3, day=22, hour=17, minute=19, second=7) # 表示任务在6,7,8,11,12月份的第三个星期五的00:00,01:00,02:00,03:00 执行该程序 sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # 表示从星期一到星期五5:30(AM)直到2014-05-30 00:00:00 sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30') # 表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5 sched.add_job(my_job, 'cron', second='*/5', args=['5秒定时']) sched.start()
cron表达式 | 参数 | 描述 |
---|---|---|
* | any | Fire on every value |
*/a | any | Fire every a values, starting from the minimum |
a-b | any | Fire on any value within the a-b range (a must be smaller than b) |
a-b/c | any | Fire every c values within the a-b range |
xth y | day | Fire on the x -th occurrence of weekday y within the month |
last x | day | Fire on the last occurrence of weekday x within the month |
last | day | Fire on the last day within the month |
x,y,z | any | Fire on any matching expression; can combine any number of any of the above expressions |
使用SQLAlchemy作业存储器存放作业
# -*- coding:utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime import logging sched = BlockingScheduler() def my_job(): print('my_job is running, Now is %s' % datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 使用sqlalchemy作业存储器 # 根据自己电脑安装的库选择用什么连接 ,如pymysql 其中:scrapy表示数据库的名称,操作数据库之前应创建对应的数据库 url = 'mysql+pymysql://root:123456@localhost:3306/scrapy?charset=utf8' sched.add_jobstore('sqlalchemy', url=url) # 添加作业 sched.add_job(my_job, 'interval', id='myjob', seconds=5) log = logging.getLogger('apscheduler.executors.default') log.setLevel(logging.INFO) # DEBUG # 设定日志格式 fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s') h = logging.StreamHandler() h.setFormatter(fmt) log.addHandler(h) sched.start()
暂停和恢复作业
# 暂停作业: apsched.job.Job.pause() apsched.schedulers.base.BaseScheduler.pause_job() # 恢复作业: apsched.job.Job.resume() apsched.schedulers.base.BaseScheduler.resume_job()
获得job列表
get_jobs(),它会返回所有的job实例;
使用print_jobs()来输出所有格式化的作业列表;
get_job(job_id=“任务ID”)获取指定任务的作业列表。
代码实现:
# -*- coding:utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler import datetime def my_job(text=""): print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) sched = BlockingScheduler() job = sched.add_job(my_job, 'interval', seconds=2, args=['第一个作业']) sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二个作业']) print(sched.get_jobs()) print(sched.get_job(job_id="my_job_id")) sched.print_jobs() sched.start()
关闭调度器
默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将wait选项设置为False。
sched.shutdown() sched.shutdown(wait=False)
以上是如何安裝並使用Python中的第三方模組apscheduler?的詳細內容。更多資訊請關注PHP中文網其他相關文章!