ホームページ >バックエンド開発 >Python チュートリアル >Python のスケジュールされたタスクの実装の詳細な分析 apscheduler

Python のスケジュールされたタスクの実装の詳細な分析 apscheduler

WBOY
WBOY転載
2022-10-10 16:29:332583ブラウズ

この記事では、Python に関する関連知識を提供します。主に、スケジュールされたタスクの実装に関する関連問題を紹介します。サードパーティのパッケージを使用して、スケジュールされたタスクを管理できます。比較的に、apscheduler を使用する方が使いやすいです。使用方法を見てみましょう。皆さんの参考になれば幸いです。

Python のスケジュールされたタスクの実装の詳細な分析 apscheduler

[関連する推奨事項: Python3 ビデオ チュートリアル ]

apscheduler の最初の紹介

簡単な例を見てみましょうapscheduler がどのように使用されるかを参照してください。

#encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def sch_test():
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, 测试apscheduler'.format(now))
task = BlockingScheduler()
task.add_job(func=sch_test, trigger='cron', second='*/10')
task.start()

上記の例は非常に単純で、最初に apscheduler オブジェクトを定義し、次に add_job でタスクを追加し、最後にタスクを開始する必要があります。

例では、sch_test タスクを 10 秒ごとに実行します。実行結果は次のとおりです:

时间:2022-10-08 15:16:30, 测试apscheduler
时间:2022-10-08 15:16:40, 测试apscheduler
时间:2022-10-08 15:16:50, 测试apscheduler
时间:2022-10-08 15:17:00, 测试apscheduler

タスク関数の実行時にパラメータを渡したい場合は、add_job に引数を追加するだけです。 task .add_job(func=sch_test, args=('a'),trigger='cron', Second='*/10') などの関数。

apscheduler にはどのようなモジュールがありますか?

上の例では、apschedulerl の使用方法を予備的に理解しましたが、次に、apscheduler の設計フレームワークを知る必要があります。 apscheduler には、トリガー、job_stores、executors、およびスケジューラーという 4 つの主要なモジュールがあります。

1. トリガー:

トリガーとは、タスクによって指定されたトリガー方法を指します。この例では、「cron」方法を使用します。 cron、date、interval のいずれかを選択できます。

Cron は、Linux の crontab に似た、指定された時間にトリガーされるスケジュールされたタスクを表します。

利用可能なパラメータは次のとおりです:

Python のスケジュールされたタスクの実装の詳細な分析 apscheduler

さらに、式タイプを使用して cron を設定することもできます。たとえば、一般的に使用されるものは次のとおりです。

Python のスケジュールされたタスクの実装の詳細な分析 apscheduler

毎日 7:20 に 1 回実行される使用例:

task.add_job(func=sch_test, args =( '時限タスク',), トリガー='cron',

hour='7', minutes='20')

date は、特定のタスクに固有の 1 回限りのタスクを表します。 time;

使用例:

# 使用run_date指定运行时间
task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30))
# 或者用next_run_time
task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))

interval は周期タスクを表し、間隔を指定し、その間隔が経過するたびに実行します。

interval では次のパラメータを設定できます:

Python のスケジュールされたタスクの実装の詳細な分析 apscheduler

# 使用例、sch_test タスクを 3 秒ごとに実行します:

task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)。

Come on Theこの例では 3 つのトリガーをすべて使用しています:

# encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
task = BlockingScheduler()
task.add_job(func=sch_test, args=('一次性任务',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
task.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)
task.start()

結果の一部を印刷します:

时间:2022-10-08 15:45:49, 一次性任务测试apscheduler
时间:2022-10-08 15:45:49, 循环任务测试apscheduler
时间:2022-10-08 15:45:50, 定时任务测试apscheduler
时间:2022-10-08 15:45:52, 循环任务测试apscheduler
时间:2022-10-08 15:45:55, 定时任务测试apscheduler
时间:2022-10-08 15:45:55, 循环任务测试apscheduler
时间:2022-10-08 15:45:58, 循环任务测试apscheduler

コード例と結果の表示を通じて、さまざまなトリガーの使用の違いを明確に知ることができます。

2. タスク ストレージ job_stores

名前が示すように、タスク ストレージはタスクが保存される場所であり、タスクはデフォルトでメモリに保存されます。 。タスクをmysqlに保存するなど、保存方法をカスタマイズすることもできます。ここにはいくつかのオプションがあります:

Python のスケジュールされたタスクの実装の詳細な分析 apscheduler

通常、デフォルトではメモリに保存されますが、プログラムが失敗して再起動すると、タスクがプルされて再度実行されます。実行要件が高い場合は、他のメモリを選択できます。

SQLAlchemyJobStore ストレージの使用例:

from apscheduler.schedulers.blocking import BlockingScheduler
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
sched = BlockingScheduler()
# 使用mysql存储任务
sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
sched.start()

3. エグゼキューター executor

エグゼキューターの機能は、タスクをスレッド プールまたはプロセス プールで実行します。いくつかのオプションがあります:

Python のスケジュールされたタスクの実装の詳細な分析 apscheduler

#デフォルトは ThreadPoolExecutor で、一般的に使用されるのはスレッドおよびプロセス プール エグゼキュータです。アプリケーションが CPU を大量に使用する操作である場合は、ProcessPoolExecutor を使用して実行できます。

4. スケジューラ スケジューラ

スケジューラは、apscheduler の中核に属し、メモリを含む apscheduler システム全体を調整する役割を果たします。および executor. の場合、トリガーはスケジュールに従って通常どおり実行されます。いくつかのスケジューラがあります:

Python のスケジュールされたタスクの実装の詳細な分析 apscheduler

#特定のシナリオではありませんが、最も一般的に使用されるスケジューラは BlockingScheduler です。

例外監視

スケジュールされたタスクの実行中にエラーが発生した場合、監視メカニズムをセットアップする必要があります。エラー情報を記録するログモジュール。

使用例:

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
# logging日志配置打印格式及保存位置
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='sche.log',
                    filemode='a')
def log_listen(event):
if event.exception :
print ( '任务出错,报错信息:{}'.format(event.exception))
else:
print ( '任务正常运行...' )
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
    print(1/0)
sched = BlockingScheduler()
# 使用mysql存储任务
sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
# 配置任务执行完成及错误时的监听
sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 配置日志监听
sched._logger = logging
sched.start()

apscheduler カプセル化の使用法

上面介绍了apscheduler框架的主要模块,我们基本能掌握怎样使用apscheduler了。下面就来封装一下apscheduler吧,以后要用直接在这份代码上修改就行了。

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
import logging.handlers
import os
import datetime
class LoggerUtils():
    def init_logger(self, logger_name):
        # 日志格式
        formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
        log_obj = logging.getLogger(logger_name)
        log_obj.setLevel(logging.INFO)
        # 设置log存储位置
        path = '/data/logs/'
        filename = '{}{}.log'.format(path, logger_name)
        if not os.path.exists(path):
            os.makedirs(path)
        # 设置日志按照时间分割
        timeHandler = logging.handlers.TimedRotatingFileHandler(
           filename,
           when='D',  # 按照什么维度切割, S:秒,M:分,H:小时,D:天,W:周
           interval=1, # 多少天切割一次
           backupCount=10  # 保留几天
        )
        timeHandler.setLevel(logging.INFO)
        timeHandler.setFormatter(formatter)
        log_obj.addHandler(timeHandler)
        return log_obj
class Scheduler(LoggerUtils):
    def __init__(self):
        # 执行器设置
        executors = {
            'default': ThreadPoolExecutor(10),  # 设置一个名为“default”的ThreadPoolExecutor,其worker值为10
            'processpool': ProcessPoolExecutor(5)  # 设置一个名为“processpool”的ProcessPoolExecutor,其worker值为5
        }
        self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors)
        # 存储器设置
        # 这里使用sqlalchemy存储器,将任务存储在mysql
        sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
        self.scheduler.add_jobstore('sqlalchemy',url=sql_url)
        def log_listen(event):
            if event.exception:
                # 日志记录
                self.scheduler._logger.error(event.traceback)
    
        # 配置任务执行完成及错误时的监听
        self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
        # 配置日志监听
        self.scheduler._logger = self.init_logger('sche_test')
    def add_job(self, *args, **kwargs):
        """添加任务"""
        self.scheduler.add_job(*args, **kwargs)
    def start(self):
        """开启任务"""
        self.scheduler.start()
# 测试任务
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
    print(1/0)
# 添加任务,开启任务
sched = Scheduler()
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
# 开启任务
sched.start()

【相关推荐:Python3视频教程

以上がPython のスケジュールされたタスクの実装の詳細な分析 apschedulerの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はjuejin.imで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。