>  기사  >  백엔드 개발  >  Python의 예약된 작업 apscheduler 구현에 대한 자세한 분석

Python의 예약된 작업 apscheduler 구현에 대한 자세한 분석

WBOY
WBOY앞으로
2022-10-10 16:29:332287검색

이 글에서는 예약된 작업 구현과 관련된 문제를 주로 소개하는 Python에 대한 관련 지식을 제공합니다. 타사 패키지를 사용하여 예약된 작업을 관리하는 것이 상대적으로 더 쉽습니다. 사용된 방법을 살펴보는 것이 모든 사람에게 도움이 되기를 바랍니다.

Python의 예약된 작업 apscheduler 구현에 대한 자세한 분석

【관련 추천: Python3 동영상 튜토리얼

apscheduler 첫 소개

apscheduler가 어떻게 사용되는지 간단한 예를 들어 보겠습니다.

#encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def sch_test():
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, 测试apscheduler'.format(now))
task = BlockingScheduler()
task.add_job(func=sch_test, trigger='cron', second='*/10')
task.start()

위의 예는 매우 간단합니다. 먼저 apscheduler 개체를 정의한 다음 add_job을 사용하여 작업을 추가하고 마지막으로 작업을 시작해야 합니다.

예제는 sch_test 작업을 10초마다 실행하는 것입니다. 실행 결과는 다음과 같습니다.

时间:2022-10-08 15:16:30, 测试apscheduler
时间:2022-10-08 15:16:40, 测试apscheduler
时间:2022-10-08 15:16:50, 测试apscheduler
时间:2022-10-08 15:17:00, 测试apscheduler

작업 함수를 실행할 때 매개변수를 전달하려면 task.add_job(과 같이 add_job 함수에 args를 추가하면 됩니다. func=sch_test, args =('a'), 트리거='cron', 두 번째='*/10').

apscheduler에는 어떤 모듈이 있나요?

위의 예에서 apschedulerl을 사용하는 방법에 대한 사전 이해가 있었습니다. 다음으로 apscheduler의 설계 프레임워크를 알아야 합니다. apscheduler에는 트리거, job_stores, 실행기 및 스케줄러의 네 가지 주요 모듈이 있습니다.

1. 트리거:

트리거는 작업에서 지정한 트리거 방법을 참조하며, 예제에서는 "cron" 방법을 사용합니다. cron, 날짜, 간격 중 하나를 선택할 수 있습니다.

Cron은 Linux crontab과 유사하게 지정된 시간에 트리거되는 예약된 작업을 나타냅니다.

사용 가능한 매개변수는 다음과 같습니다.

Python의 예약된 작업 apscheduler 구현에 대한 자세한 분석

또한 표현식 유형을 사용하여 cron을 설정할 수도 있습니다. 예를 들어, 일반적으로 사용되는 것은 다음과 같습니다:

Python의 예약된 작업 apscheduler 구현에 대한 자세한 분석

사용 예, 매일 7시 20분에 한 번 실행:

task.add_job(func=sch_test, args=('Schedule task',), Trigger='cron' ,

hour='7', Minute='20')

date는 특정 시간에 특정한 일회성 작업을 나타냅니다.

사용 예:

# 使用run_date指定运行时间
task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30))
# 或者用next_run_time
task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))

interval은 간격 시간을 지정하는 주기적 작업을 나타냅니다. time 간격마다 한 번씩 실행됩니다.

interval은 다음 매개변수를 설정할 수 있습니다.

Python의 예약된 작업 apscheduler 구현에 대한 자세한 분석

사용 예, 3초마다 sch_test 작업 실행:

task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)。

세 가지 트리거를 모두 사용하는 예를 들어보겠습니다.

# encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
task = BlockingScheduler()
task.add_job(func=sch_test, args=('一次性任务',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
task.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)
task.start()

일부 결과 인쇄:

时间:2022-10-08 15:45:49, 一次性任务测试apscheduler
时间:2022-10-08 15:45:49, 循环任务测试apscheduler
时间:2022-10-08 15:45:50, 定时任务测试apscheduler
时间:2022-10-08 15:45:52, 循环任务测试apscheduler
时间:2022-10-08 15:45:55, 定时任务测试apscheduler
时间:2022-10-08 15:45:55, 循环任务测试apscheduler
时间:2022-10-08 15:45:58, 循环任务测试apscheduler

코드를 통해 예제와 결과 표시를 통해 다양한 트리거 사용의 차이점을 명확하게 이해할 수 있습니다.

2. 태스크 메모리 job_stores

이름에서 알 수 있듯이 태스크 메모리는 태스크가 저장되는 곳으로 기본적으로 메모리에 저장됩니다. mysql에 작업을 저장하는 등 저장 방법을 사용자 정의할 수도 있습니다. 여기에는 몇 가지 옵션이 있습니다.

Python의 예약된 작업 apscheduler 구현에 대한 자세한 분석

일반적으로 기본값은 메모리에 저장하는 것이지만 프로그램이 실패하고 다시 시작되면 작업을 가져와서 다시 실행합니다. 작업 실행에 대한 요구 사항이 높으면 다음을 수행할 수 있습니다. 다른 저장소를 선택하세요.

SQLAlchemyJobStore 저장소 사용 예:

from apscheduler.schedulers.blocking import BlockingScheduler
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
sched = BlockingScheduler()
# 使用mysql存储任务
sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
sched.start()

3. 실행자 executor

실행자의 기능은 실행할 작업을 스레드 풀이나 프로세스 풀에 넣는 것입니다. 여러 가지 옵션이 있습니다:

Python의 예약된 작업 apscheduler 구현에 대한 자세한 분석

기본값은 ThreadPoolExecutor이고 일반적으로 사용되는 옵션은 스레드 및 프로세스 풀 실행기입니다. 애플리케이션이 CPU 집약적인 작업인 경우 ProcessPoolExecutor를 사용하여 실행할 수 있습니다.

4. Schedulers 스케줄러

스케줄러는 apscheduler 시스템 전체를 조정하는 역할을 합니다. 메모리, 실행기, 트리거는 해당 스케줄링에 따라 정상적으로 실행됩니다. 여러 가지 스케줄러가 있습니다.

Python의 예약된 작업 apscheduler 구현에 대한 자세한 분석

특정 시나리오가 아닌 가장 일반적으로 사용되는 스케줄러는 BlockingScheduler입니다.

예외 모니터링

예약된 작업이 실행 중일 때 오류가 발생하면 일반적으로 로깅 모듈을 사용하여 오류 정보를 기록해야 합니다.

사용 예:

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
# logging日志配置打印格式及保存位置
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='sche.log',
                    filemode='a')
def log_listen(event):
if event.exception :
print ( '任务出错,报错信息:{}'.format(event.exception))
else:
print ( '任务正常运行...' )
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
    print(1/0)
sched = BlockingScheduler()
# 使用mysql存储任务
sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
# 配置任务执行完成及错误时的监听
sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 配置日志监听
sched._logger = logging
sched.start()

apscheduler 패키지 사용

上面介绍了apscheduler框架的主要模块,我们基本能掌握怎样使用apscheduler了。下面就来封装一下apscheduler吧,以后要用直接在这份代码上修改就行了。

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
import logging.handlers
import os
import datetime
class LoggerUtils():
    def init_logger(self, logger_name):
        # 日志格式
        formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
        log_obj = logging.getLogger(logger_name)
        log_obj.setLevel(logging.INFO)
        # 设置log存储位置
        path = '/data/logs/'
        filename = '{}{}.log'.format(path, logger_name)
        if not os.path.exists(path):
            os.makedirs(path)
        # 设置日志按照时间分割
        timeHandler = logging.handlers.TimedRotatingFileHandler(
           filename,
           when='D',  # 按照什么维度切割, S:秒,M:分,H:小时,D:天,W:周
           interval=1, # 多少天切割一次
           backupCount=10  # 保留几天
        )
        timeHandler.setLevel(logging.INFO)
        timeHandler.setFormatter(formatter)
        log_obj.addHandler(timeHandler)
        return log_obj
class Scheduler(LoggerUtils):
    def __init__(self):
        # 执行器设置
        executors = {
            'default': ThreadPoolExecutor(10),  # 设置一个名为“default”的ThreadPoolExecutor,其worker值为10
            'processpool': ProcessPoolExecutor(5)  # 设置一个名为“processpool”的ProcessPoolExecutor,其worker值为5
        }
        self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors)
        # 存储器设置
        # 这里使用sqlalchemy存储器,将任务存储在mysql
        sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
        self.scheduler.add_jobstore('sqlalchemy',url=sql_url)
        def log_listen(event):
            if event.exception:
                # 日志记录
                self.scheduler._logger.error(event.traceback)
    
        # 配置任务执行完成及错误时的监听
        self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
        # 配置日志监听
        self.scheduler._logger = self.init_logger('sche_test')
    def add_job(self, *args, **kwargs):
        """添加任务"""
        self.scheduler.add_job(*args, **kwargs)
    def start(self):
        """开启任务"""
        self.scheduler.start()
# 测试任务
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
    print(1/0)
# 添加任务,开启任务
sched = Scheduler()
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
# 开启任务
sched.start()

【相关推荐:Python3视频教程

위 내용은 Python의 예약된 작업 apscheduler 구현에 대한 자세한 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 juejin.im에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제