Heim  >  Artikel  >  Backend-Entwicklung  >  Detaillierte Analyse der Python-Implementierung des Apschedulers für geplante Aufgaben

Detaillierte Analyse der Python-Implementierung des Apschedulers für geplante Aufgaben

WBOY
WBOYnach vorne
2022-10-10 16:29:332287Durchsuche

Dieser Artikel vermittelt Ihnen relevantes Wissen über Python, das hauptsächlich Probleme im Zusammenhang mit der Implementierung geplanter Aufgaben behandelt. Relativ gesehen ist Apscheduler einfacher zu verwenden Ein Blick auf die verwendeten Methoden wird hoffentlich für alle hilfreich sein.

Detaillierte Analyse der Python-Implementierung des Apschedulers für geplante Aufgaben

【Verwandte Empfehlung: Python3-Video-Tutorial

Erste Einführung in Apscheduler

Nehmen wir ein einfaches Beispiel, um zu sehen, wie Apscheduler verwendet wird.

#encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def sch_test():
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, 测试apscheduler'.format(now))
task = BlockingScheduler()
task.add_job(func=sch_test, trigger='cron', second='*/10')
task.start()

Das obige Beispiel ist sehr einfach. Wir müssen zuerst ein Apscheduler-Objekt definieren, dann add_job, um die Aufgabe hinzuzufügen, und schließlich mit dem Öffnen der Aufgabe beginnen.

Das Beispiel besteht darin, die sch_test-Aufgabe alle 10 Sekunden auszuführen. Die Ausführungsergebnisse sind wie folgt:

时间:2022-10-08 15:16:30, 测试apscheduler
时间:2022-10-08 15:16:40, 测试apscheduler
时间:2022-10-08 15:16:50, 测试apscheduler
时间:2022-10-08 15:17:00, 测试apscheduler

Wenn wir beim Ausführen der Aufgabenfunktion Parameter mitführen möchten, fügen Sie einfach Argumente zur Funktion add_job hinzu, z. B. task.add_job(. func=sch_test, args =('a'), trigger='cron', second='*/10').

Welche Module hat Apscheduler?

Im obigen Beispiel haben wir ein vorläufiges Verständnis für die Verwendung von Apscheduler. Als nächstes müssen wir das Design-Framework von Apscheduler kennen. Apscheduler verfügt über vier Hauptmodule: Trigger, Job_Stores, Executors und Scheduler.

1. Auslöser:

Auslöser beziehen sich auf die durch die Aufgabe angegebene Auslösemethode. Im Beispiel verwenden wir die Methode „cron“. Wir können zwischen Cron, Datum und Intervall wählen.

Cron stellt eine geplante Aufgabe dar, ähnlich der Linux-Crontab, die zu einem bestimmten Zeitpunkt ausgelöst wird.

Die verfügbaren Parameter sind wie folgt:

Detaillierte Analyse der Python-Implementierung des Apschedulers für geplante Aufgaben

Darüber hinaus können wir auch Ausdruckstypen verwenden, um Cron festzulegen. Die am häufigsten verwendeten sind beispielsweise:

Detaillierte Analyse der Python-Implementierung des Apschedulers für geplante Aufgaben

Anwendungsbeispiel, einmal täglich um 7:20 Uhr ausgeführt:

task.add_job(func=sch_test, args=('geplante Aufgabe',), trigger='cron ',

Stunde='7', Minute='20')

Datum stellt eine einmalige Aufgabe dar, die für eine bestimmte Zeit spezifisch ist;

Verwendungsbeispiel:

# 使用run_date指定运行时间
task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30))
# 或者用next_run_time
task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))

interval stellt eine zyklische Aufgabe dar, die eine Intervallzeit angibt , jedes Mal Einmal in jedem Intervall ausführen.

interval kann die folgenden Parameter festlegen:

Detaillierte Analyse der Python-Implementierung des Apschedulers für geplante Aufgaben

Anwendungsbeispiel: Führen Sie die sch_test-Aufgabe alle 3 Sekunden aus:

task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)。

Nehmen wir ein Beispiel, um alle drei Trigger zu verwenden:

# encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
task = BlockingScheduler()
task.add_job(func=sch_test, args=('一次性任务',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
task.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)
task.start()

Einige Ergebnisse drucken:

时间:2022-10-08 15:45:49, 一次性任务测试apscheduler
时间:2022-10-08 15:45:49, 循环任务测试apscheduler
时间:2022-10-08 15:45:50, 定时任务测试apscheduler
时间:2022-10-08 15:45:52, 循环任务测试apscheduler
时间:2022-10-08 15:45:55, 定时任务测试apscheduler
时间:2022-10-08 15:45:55, 循环任务测试apscheduler
时间:2022-10-08 15:45:58, 循环任务测试apscheduler

Durch Code Anhand von Beispielen und Ergebnisanzeigen können wir die Unterschiede in der Verwendung verschiedener Trigger deutlich verstehen.

2. Aufgabenspeicher job_stores

Wie der Name schon sagt, werden Aufgaben im Aufgabenspeicher gespeichert, und zwar standardmäßig im Arbeitsspeicher. Wir können auch die Speichermethode anpassen, z. B. das Speichern von Aufgaben in MySQL. Hier gibt es mehrere Optionen:

Detaillierte Analyse der Python-Implementierung des Apschedulers für geplante Aufgaben

Normalerweise wird die Aufgabe standardmäßig im Speicher gespeichert. Wenn das Programm jedoch fehlschlägt und neu startet, wird die Aufgabe abgerufen und erneut ausgeführt. Wenn Sie hohe Anforderungen an die Aufgabenausführung haben, können Sie dies tun Wählen Sie einen anderen Speicher.

Beispiel für die Verwendung des SQLAlchemyJobStore-Speichers:

from apscheduler.schedulers.blocking import BlockingScheduler
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
sched = BlockingScheduler()
# 使用mysql存储任务
sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
sched.start()

3. Executors Executors

Die Funktion des Executors besteht darin, Aufgaben zur Ausführung in den Thread-Pool oder Prozesspool zu stellen. Es gibt mehrere Optionen:

Detaillierte Analyse der Python-Implementierung des Apschedulers für geplante Aufgaben

Der Standardwert ist ThreadPoolExecutor, und die am häufigsten verwendeten sind die Thread- und Prozesspool-Executoren. Wenn es sich bei der Anwendung um einen CPU-intensiven Vorgang handelt, kann ProcessPoolExecutor zur Ausführung verwendet werden.

4. Scheduler-Scheduler

Der Scheduler gehört zum Kern des Apschedulers. Er übernimmt die Aufgabe, das gesamte Apscheduler-System zu koordinieren. Es gibt mehrere Planer:

Detaillierte Analyse der Python-Implementierung des Apschedulers für geplante Aufgaben

Nicht in bestimmten Szenarien, der am häufigsten verwendete Planer ist BlockingScheduler.

Ausnahmeüberwachung

Wenn die geplante Aufgabe ausgeführt wird und ein Fehler auftritt, muss ein Überwachungsmechanismus eingerichtet werden. Normalerweise verwenden wir das Protokollierungsmodul, um Fehlerinformationen aufzuzeichnen.

Verwendungsbeispiel:

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
# logging日志配置打印格式及保存位置
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='sche.log',
                    filemode='a')
def log_listen(event):
if event.exception :
print ( '任务出错,报错信息:{}'.format(event.exception))
else:
print ( '任务正常运行...' )
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
    print(1/0)
sched = BlockingScheduler()
# 使用mysql存储任务
sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
# 配置任务执行完成及错误时的监听
sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 配置日志监听
sched._logger = logging
sched.start()

apscheduler-Paketnutzung

上面介绍了apscheduler框架的主要模块,我们基本能掌握怎样使用apscheduler了。下面就来封装一下apscheduler吧,以后要用直接在这份代码上修改就行了。

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
import logging.handlers
import os
import datetime
class LoggerUtils():
    def init_logger(self, logger_name):
        # 日志格式
        formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
        log_obj = logging.getLogger(logger_name)
        log_obj.setLevel(logging.INFO)
        # 设置log存储位置
        path = '/data/logs/'
        filename = '{}{}.log'.format(path, logger_name)
        if not os.path.exists(path):
            os.makedirs(path)
        # 设置日志按照时间分割
        timeHandler = logging.handlers.TimedRotatingFileHandler(
           filename,
           when='D',  # 按照什么维度切割, S:秒,M:分,H:小时,D:天,W:周
           interval=1, # 多少天切割一次
           backupCount=10  # 保留几天
        )
        timeHandler.setLevel(logging.INFO)
        timeHandler.setFormatter(formatter)
        log_obj.addHandler(timeHandler)
        return log_obj
class Scheduler(LoggerUtils):
    def __init__(self):
        # 执行器设置
        executors = {
            'default': ThreadPoolExecutor(10),  # 设置一个名为“default”的ThreadPoolExecutor,其worker值为10
            'processpool': ProcessPoolExecutor(5)  # 设置一个名为“processpool”的ProcessPoolExecutor,其worker值为5
        }
        self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors)
        # 存储器设置
        # 这里使用sqlalchemy存储器,将任务存储在mysql
        sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
        self.scheduler.add_jobstore('sqlalchemy',url=sql_url)
        def log_listen(event):
            if event.exception:
                # 日志记录
                self.scheduler._logger.error(event.traceback)
    
        # 配置任务执行完成及错误时的监听
        self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
        # 配置日志监听
        self.scheduler._logger = self.init_logger('sche_test')
    def add_job(self, *args, **kwargs):
        """添加任务"""
        self.scheduler.add_job(*args, **kwargs)
    def start(self):
        """开启任务"""
        self.scheduler.start()
# 测试任务
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
    print(1/0)
# 添加任务,开启任务
sched = Scheduler()
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
# 开启任务
sched.start()

【相关推荐:Python3视频教程

Das obige ist der detaillierte Inhalt vonDetaillierte Analyse der Python-Implementierung des Apschedulers für geplante Aufgaben. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:juejin.im. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen