ホームページ >バックエンド開発 >Python チュートリアル >Python apscheduler cronスケジュールタスクトリガーインターフェイスの自動検査を実装する方法

Python apscheduler cronスケジュールタスクトリガーインターフェイスの自動検査を実装する方法

WBOY
WBOY転載
2023-05-01 10:40:061502ブラウズ

    Python cron スケジュール タスク トリガー インターフェイスの自動検査

    スケジュール タスクのトリガー方法にはいくつかの種類がありますが、研究開発の学生が日常業務でよく使用するのが cron です。メソッド

    APScheduler フレームワークが複数のスケジュールされたタスク メソッドをサポートしていることを確認しました

    最初に apscheduler モジュールをインストールします

    $ pip install apscheduler

    コードは次のとおりです: (メソッドにはさまざまなコメントが付けられています)時間パラメーターの定義と範囲)

    from apscheduler.schedulers.blocking import BlockingScheduler
    
    
    class Timing:
        def __init__(self, start_date, end_date, hour=None):
            self.start_date = start_date
            self.end_date = end_date
            self.hour = hour
    
        def cron(self, job, *value_list):
            """cron格式 在特定时间周期性地触发"""
            # year (int 或 str) – 年,4位数字
            # month (int 或 str) – 月 (范围1-12)
            # day (int 或 str) – 日 (范围1-31)
            # week (int 或 str) – 周 (范围1-53)
            # day_of_week (int 或 str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
            # hour (int 或 str) – 时 (范围0-23)
            # minute (int 或 str) – 分 (范围0-59)
            # second (int 或 str) – 秒 (范围0-59)
            # start_date (datetime 或 str) – 最早开始日期(包含)
            # end_date (datetime 或 str) – 分 最晚结束时间(包含)
            # timezone (datetime.tzinfo 或str) – 指定时区
            scheduler = BlockingScheduler()
            scheduler.add_job(job, 'cron', start_date=self.start_date, end_date=self.end_date, hour=self.hour,
                              args=[*value_list])
            scheduler.start()
    
        def interval(self, job, *value_list):
            """interval格式 周期触发任务"""
            # weeks (int) - 间隔几周
            # days (int)  - 间隔几天
            # hours (int) - 间隔几小时
            # minutes (int) - 间隔几分钟
            # seconds (int) - 间隔多少秒
            # start_date (datetime 或 str) - 开始日期
            # end_date (datetime 或 str) - 结束日期
            # timezone (datetime.tzinfo 或str) - 时区
            scheduler = BlockingScheduler()
            # 在 2019-08-29 22:15:00至2019-08-29 22:17:00期间,每隔1分30秒 运行一次 job 方法
            scheduler.add_job(job, 'interval', minutes=1, seconds=30, start_date=self.start_date,
                              end_date=self.end_date, args=[*value_list])
            scheduler.start()
    
        @staticmethod
        def date(job, *value_list):
            """date格式 特定时间点触发"""
            # run_date (datetime 或 str) - 作业的运行日期或时间
            # timezone (datetime.tzinfo 或 str)  - 指定时区
            scheduler = BlockingScheduler()
            # 在 2019-8-30 01:00:01 运行一次 job 方法
            scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=[*value_list])
            scheduler.start()

    カプセル化方法はあまり汎用的ではありません。コードは後で最適化されますが、少なくとも今は使用できます、はははははははははははははははははははははははははははははははははははははははははははははははははははははははははははは

    アイデアを考えました、検査によってタスクがトリガーされ、次に DingTalk がトリガーされるため、スケジュールされたタスクは最上位レイヤーにある必要があります

    DingTalk によってカプセル化されたコードは、以前と最下位で共有され、引き続き改善されています

    if __name__ == '__main__':
        file_list = ["test_shiyan.py", "MeetSpringFestival.py"]
        # run_py(file_list)
        case_list = ["test_case_01", "test_case_02"]
        # run_case(test_sample, case_list)
        dingDing_list = [2, case_list, test_sample]
        # run_dingDing(*dingDing_list)
        Timing('2022-02-15 00:00:00', '2022-02-16 00:00:00', '0-23').cron(run_dingDing, *dingDing_list)

    run_dingDing() の関数 カプセル化された Timing().cron(run_dingDing, *dingDing_list) に入れて、パラメータを run_dingDing() にタプルの形式で渡します

    は上で書いたもので、あなたはここで見ることができます

    def cron(self, job, *value_list):
            """cron格式 在特定时间周期性地触发"""
            scheduler.add_job(job, 'cron', start_date=self.start_date, end_date=self.end_date, hour=self.hour,
                                      args=[*value_list])

    Timing() の初期化で時間範囲を埋めるようにしました。これにより、より快適になります。

    タイミングは、Timing().cron の実行後にトリガーできます。 () ですが、「コンピューターの電源をオンにするだけです。後でプラットフォームの調査を開始するときに、それをサーバーに保存すると便利です~

    apscheduler がエラーを報告しました: ジョブの実行時間 …&hellip ; next run at: ……)" was missing by

    apscheduler 実行プロセス中に次のようなエラーが発生します:

    ジョブ "9668_hack (トリガー) の実行時間: 間隔[1:00:00]、次回の実行時刻: 2018- 10-29 22:00:00 CST)" は 0:01:47.387821 ジョブ "9668_index (トリガー: 間隔[0:30: 00]、次回の実行時刻: 2018-10-29 21:30: 00 CST)" は 0:01:47.392574 までに失敗しました。ジョブ "9669_deep (トリガー: 間隔[1:00:00]、次回の実行時刻: 2018-10-29 22:00:00 CST)" は 0:01:47.397622 までに失敗しました。ジョブ "9669_hack (トリガー: 間隔[1:00:00]、次回の実行時刻: 2018-10-29 22: 00:00 CST)」は 0:01:47.402938 までに失敗しました。ジョブ「9669_index (トリガー: 間隔[0:30:00]、次回の実行時刻: 2018-10-29 21:30:00 CST)」の実行時間は次のとおりです。 0:01:47.407996 で見逃した

    Baidu は基本的にこの問題を指摘できませんでした。Google はキー構成を見つけましたが、それでもエラーが発生したため、何が原因であるかを調べるために情報を探し続けました。地獄がこの問題を引き起こしていました。

    Python apscheduler cronスケジュールタスクトリガーインターフェイスの自動検査を実装する方法

    misfire_grace_time パラメータ

    その中に、misfire_grace_time というパラメータが記載されていますが、このパラメータは何に使用されますか? 他の場所で説明を見つけました。他にもいくつかのパラメータがありますが、私自身の理解に基づいて包括的な概要を示します。

    • coalesce: 何らかの理由でジョブが複数回蓄積された場合実際の操作 (たとえば、システムは 5 分間ハングした後に復元され、1 分ごとに実行されるタスクがあります。論理的に言えば、当初はこの 5 分間に 5 回実行されることが「計画」されていましたが、実際には実行されませんでした) Coalesce が True の場合、次にこのジョブがエグゼキューターに送信されるとき、最後に 1 回だけ実行されます。False の場合、5 回実行されます (必ずしも必要ではありません。他の条件については、後の missfire_grace_time の説明を参照してください)

    • max_instance: これは、同じジョブのインスタンスを同時に実行できるのは最大でも複数であることを意味します。たとえば、10 分かかるジョブは、1 分ごとに 1 回実行されるように指定されています。max_instance 値が 5 の場合、6 分から 10 分目までは、すでに 5 つのインスタンスが実行されているため、新しい実行インスタンスは実行されません

    • missfire_grace_time: 上記の合体と同様のシナリオを想像してください。ジョブが最初は 14:00 に実行されましたが、何らかの理由でスケジュールされず、現在は 14:00 に実行されているとします。 14:01 では、14:00 に実行中のインスタンスが送信されると、実行がスケジュールされている時刻と現在の時刻 (ここでは 1 分) の差が、設定した 30 秒の制限を超えているかどうかがチェックされ、その後、この実行インスタンスは実行されません。

    例:

    15 分に 1 回のタスクの場合、misfire_grace_time を 100 秒に設定し、0:06 にプロンプ​​トを表示します:

    ジョブ「9392_index (トリガー: 間隔[0:15:00]、次回実行時刻: 2018-10-27 00:15:00 CST)」の実行時間は 0:06:03.931026

    説明:

    • #0:00 に実行されるはずだったタスクが何らかの理由でスケジュールされておらず、プロンプトが表示されました。次回実行する時刻 (0:15) は現在時刻 (しきい値 100 秒) と 6 分異なるため、0:15 には実行されません

    • したがって、このパラメータは通常タスクのタイムアウト許容値として理解されます。実行プログラムにタイムアウト期間を設定して与えます。実行すべき処理がこの時間範囲内に完了していない場合、TND は実行を停止する必要があります。

    そこで、構成を次のように変更しました:

     class Config(object):
     
        SCHEDULER_JOBSTORES = {
            'default': RedisJobStore(db=3,host='0.0.0.0', port=6378,password='******'),
        }
     
        SCHEDULER_EXECUTORS = {
            'default': {'type': 'processpool', 'max_workers': 50}  #用进程池提升任务处理效率
        }
     
        SCHEDULER_JOB_DEFAULTS = {
            'coalesce': True,   #积攒的任务只跑一次
            'max_instances': 1000, #支持1000个实例并发
           'misfire_grace_time':600 #600秒的任务超时容错
        }
     
        SCHEDULER_API_ENABLED = True

    我本以为这样应该就没什么问题了,配置看似完美,但是现实是残忍的,盯着apscheduler日志看了一会,熟悉的“was missed by”又出现了,这时候就需要怀疑这个配置到底有没有生效了,然后发现果然没有生效,从/scheduler/jobs中可以看到任务:

     {
    "id": "9586_site_status",
    "name": "9586_site_status",
    "func": "monitor_scheduler:monitor_site_status",
    "args": [
    9586,
    "http://sl.jxcn.cn/",
    1000,
    100,
    200,
    "",
    0,
    2
    ],
    "kwargs": {},
    "trigger": "interval",
    "start_date": "2018-09-14T00:00:00+08:00",
    "end_date": "2018-12-31T00:00:00+08:00",
    "minutes": 15,
    "misfire_grace_time": 10,
    "max_instances": 3000,
    "next_run_time": "2018-10-24T18:00:00+08:00"
    }

    可以看到任务中默认就有misfire_grace_time配置,没有改为600,折腾一会发现修改配置,重启与修改任务都不会生效,只能修改配置后删除任务重新添加(才能把这个默认配置用上),或者修改任务的时候把这个值改掉

     scheduler.modify_job(func=func, id=id, args=args, trigger=trigger, minutes=minutes,start_date=start_date,end_date=end_date,misfire_grace_time=600)

    然后就可以了?图样图森破,missed 依然存在。

    其实从后来的报错可以发现这个容错时间是用上的,因为从执行时间加上600秒后才出现的报错。

    找到任务超时的根本原因

    那么还是回到这个超时根本问题上,即使容错时间足够长,没有这个报错了,但是一个任务执行时间过长仍然是个根本问题,所以终极思路还在于如何优化executor的执行时间上。

    当然这里根据不同的任务处理方式是不一样的,在于各自的代码了,比如更改链接方式、代码是否有冗余请求,是否可以改为异步执行,等等。

    而我自己的任务解决方式为:由接口请求改为python模块直接传参,redis链接改为内网,极大提升执行效率,所以也就控制了执行超时问题。

    以上がPython apscheduler cronスケジュールタスクトリガーインターフェイスの自動検査を実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。