Maison  >  Article  >  développement back-end  >  Analyse détaillée de la mise en œuvre par Python des tâches planifiées apscheduler

Analyse détaillée de la mise en œuvre par Python des tâches planifiées apscheduler

WBOY
WBOYavant
2022-10-10 16:29:332368parcourir

Cet article vous apporte des connaissances pertinentes sur Python, qui présente principalement des problèmes liés à la mise en œuvre de tâches planifiées. Vous pouvez utiliser des packages tiers pour gérer les tâches planifiées. Relativement parlant, apscheduler est plus facile à utiliser. un regard sur les méthodes utilisées, j'espère que cela sera utile à tout le monde.

Analyse détaillée de la mise en œuvre par Python des tâches planifiées apscheduler

【Recommandation associée : Tutoriel vidéo Python3

Première introduction à apscheduler

Prenons un exemple simple pour voir comment apscheduler est utilisé.

#encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def sch_test():
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, 测试apscheduler'.format(now))
task = BlockingScheduler()
task.add_job(func=sch_test, trigger='cron', second='*/10')
task.start()

L'exemple ci-dessus est très simple. Nous devons d'abord définir un objet apscheduler, puis add_job pour ajouter la tâche, et enfin commencer à démarrer la tâche.

L'exemple est d'exécuter la tâche sch_test toutes les 10 secondes. Les résultats d'exécution sont les suivants :

时间:2022-10-08 15:16:30, 测试apscheduler
时间:2022-10-08 15:16:40, 测试apscheduler
时间:2022-10-08 15:16:50, 测试apscheduler
时间:2022-10-08 15:17:00, 测试apscheduler

Si nous voulons transporter des paramètres lors de l'exécution de la fonction de tâche, ajoutez simplement des arguments à la fonction add_job, tels que task.add_job( func=sch_test, args =('a'), trigger='cron', second='*/10').

Quels sont les modules d'apscheduler ?

Dans l'exemple ci-dessus, nous avons une compréhension préliminaire de la façon d'utiliser apschedulerl. Ensuite, nous devons connaître le cadre de conception d'apscheduler. apscheduler comporte quatre modules principaux : les déclencheurs, les job_stores, les exécuteurs et les planificateurs.

1. Déclencheurs :

Les déclencheurs font référence à la méthode de déclenchement spécifiée par la tâche. Dans l'exemple, nous utilisons la méthode "cron". Nous pouvons choisir entre cron, date et intervalle.

Cron représente une tâche planifiée, similaire à la crontab Linux, qui est déclenchée à une heure spécifiée.

Les paramètres disponibles sont les suivants :

Analyse détaillée de la mise en œuvre par Python des tâches planifiées apscheduler

De plus, nous pouvons également utiliser des types d'expression pour définir cron. Par exemple, les plus couramment utilisés sont :

Analyse détaillée de la mise en œuvre par Python des tâches planifiées apscheduler

Exemple d'utilisation, exécuté une fois par jour à 7h20 :

task.add_job(func=sch_test, args=('scheduled task',), trigger='cron ',

hour='7', minute='20')

date représente une tâche ponctuelle spécifique à une certaine heure

Exemple d'utilisation :

# 使用run_date指定运行时间
task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30))
# 或者用next_run_time
task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))

interval représente une tâche cyclique, spécifiant un intervalle de temps ; , à chaque fois Exécuter une fois par intervalle.

interval peut définir les paramètres suivants :

Analyse détaillée de la mise en œuvre par Python des tâches planifiées apscheduler

Exemple d'utilisation, exécutez la tâche sch_test toutes les 3 secondes :

task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)。

Prenons un exemple pour utiliser les trois déclencheurs :

# encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
task = BlockingScheduler()
task.add_job(func=sch_test, args=('一次性任务',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
task.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)
task.start()

Imprimez quelques résultats :

时间:2022-10-08 15:45:49, 一次性任务测试apscheduler
时间:2022-10-08 15:45:49, 循环任务测试apscheduler
时间:2022-10-08 15:45:50, 定时任务测试apscheduler
时间:2022-10-08 15:45:52, 循环任务测试apscheduler
时间:2022-10-08 15:45:55, 定时任务测试apscheduler
时间:2022-10-08 15:45:55, 循环任务测试apscheduler
时间:2022-10-08 15:45:58, 循环任务测试apscheduler

Grâce au code exemples et affichages de résultats, nous pouvons clairement comprendre les différences dans l'utilisation des différents déclencheurs.

2. Mémoire de tâches job_stores

Comme son nom l'indique, la mémoire de tâches est l'endroit où les tâches sont stockées, et elles sont stockées en mémoire par défaut. Nous pouvons également personnaliser la méthode de stockage, comme la sauvegarde des tâches dans MySQL. Il existe plusieurs options ici :

Analyse détaillée de la mise en œuvre par Python des tâches planifiées apscheduler

Généralement, la valeur par défaut est de la stocker dans la mémoire, mais si le programme échoue et redémarre, la tâche sera extraite et réexécutée. Si vous avez des exigences élevées en matière d'exécution de la tâche, vous pouvez le faire. choisissez un autre stockage.

Exemple d'utilisation du stockage SQLAlchemyJobStore :

from apscheduler.schedulers.blocking import BlockingScheduler
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
sched = BlockingScheduler()
# 使用mysql存储任务
sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
sched.start()

3. Exécuteurs exécuteurs

La fonction de l'exécuteur est de placer les tâches dans le pool de threads ou le pool de processus à exécuter. Il existe plusieurs options :

Analyse détaillée de la mise en œuvre par Python des tâches planifiées apscheduler

La valeur par défaut est ThreadPoolExecutor, et les plus couramment utilisées sont les exécuteurs de threads et de pools de processus. Si l'application est une opération gourmande en CPU, ProcessPoolExecutor peut être utilisé pour l'exécuter.

4. Planificateurs planificateurs

Le planificateur appartient au noyau d'apscheduler. Il joue le rôle de coordination de l'ensemble du système apscheduler. La mémoire, l'exécuteur et le déclencheur s'exécutent normalement sous sa planification. Il existe plusieurs planificateurs :

Analyse détaillée de la mise en œuvre par Python des tâches planifiées apscheduler

Pas dans des scénarios spécifiques, le planificateur le plus couramment utilisé est BlockingScheduler.

Surveillance des exceptions

Lorsque la tâche planifiée est en cours d'exécution, si une erreur se produit, un mécanisme de surveillance doit être mis en place. Nous utilisons généralement le module de journalisation pour enregistrer les informations sur l'erreur.

Exemple d'utilisation :

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
# logging日志配置打印格式及保存位置
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='sche.log',
                    filemode='a')
def log_listen(event):
if event.exception :
print ( '任务出错,报错信息:{}'.format(event.exception))
else:
print ( '任务正常运行...' )
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
    print(1/0)
sched = BlockingScheduler()
# 使用mysql存储任务
sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
# 配置任务执行完成及错误时的监听
sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 配置日志监听
sched._logger = logging
sched.start()

utilisation du package apscheduler

上面介绍了apscheduler框架的主要模块,我们基本能掌握怎样使用apscheduler了。下面就来封装一下apscheduler吧,以后要用直接在这份代码上修改就行了。

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
import logging.handlers
import os
import datetime
class LoggerUtils():
    def init_logger(self, logger_name):
        # 日志格式
        formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
        log_obj = logging.getLogger(logger_name)
        log_obj.setLevel(logging.INFO)
        # 设置log存储位置
        path = '/data/logs/'
        filename = '{}{}.log'.format(path, logger_name)
        if not os.path.exists(path):
            os.makedirs(path)
        # 设置日志按照时间分割
        timeHandler = logging.handlers.TimedRotatingFileHandler(
           filename,
           when='D',  # 按照什么维度切割, S:秒,M:分,H:小时,D:天,W:周
           interval=1, # 多少天切割一次
           backupCount=10  # 保留几天
        )
        timeHandler.setLevel(logging.INFO)
        timeHandler.setFormatter(formatter)
        log_obj.addHandler(timeHandler)
        return log_obj
class Scheduler(LoggerUtils):
    def __init__(self):
        # 执行器设置
        executors = {
            'default': ThreadPoolExecutor(10),  # 设置一个名为“default”的ThreadPoolExecutor,其worker值为10
            'processpool': ProcessPoolExecutor(5)  # 设置一个名为“processpool”的ProcessPoolExecutor,其worker值为5
        }
        self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors)
        # 存储器设置
        # 这里使用sqlalchemy存储器,将任务存储在mysql
        sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
        self.scheduler.add_jobstore('sqlalchemy',url=sql_url)
        def log_listen(event):
            if event.exception:
                # 日志记录
                self.scheduler._logger.error(event.traceback)
    
        # 配置任务执行完成及错误时的监听
        self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
        # 配置日志监听
        self.scheduler._logger = self.init_logger('sche_test')
    def add_job(self, *args, **kwargs):
        """添加任务"""
        self.scheduler.add_job(*args, **kwargs)
    def start(self):
        """开启任务"""
        self.scheduler.start()
# 测试任务
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
    print(1/0)
# 添加任务,开启任务
sched = Scheduler()
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
# 开启任务
sched.start()

【相关推荐:Python3视频教程

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer