>  기사  >  백엔드 개발  >  매우 실용적입니다! Python의 주기적 작업 아티팩트인 Schedule 모듈!

매우 실용적입니다! Python의 주기적 작업 아티팩트인 Schedule 모듈!

王林
王林앞으로
2023-04-12 14:01:081890검색

매우 실용적입니다! Python의 주기적 작업 아티팩트인 Schedule 모듈!

Linux 서버에서 정기적으로 Python 스크립트를 실행하려는 경우 가장 유명한 선택은 Crontab 스크립트여야 하지만 Crontab에는 다음과 같은 단점이 있습니다.

<code style="font-family: monospace; font-size: inherit; background-color: rgba(0, 0, 0, 0.06); padding: 0px 2px; border-radius: 6px; line-height: inherit; overflow-wrap: break-word; text-indent: 0px;"><strong>​1.不方便执行<strong>秒级的任务</strong>。​</strong> 

<strong>​2.当需要执行的定时任务有上百个的时候,Crontab的<strong>管理就会特别不方便</strong>。​</strong> 

另外一个选择是 Celery,但是 Celery 的配置比较麻烦,如果你只是需要一个轻量级的调度工具,Celery 不会是一个好选择。

在你想要使用一个轻量级的任务调度工具,而且希望它尽量简单、容易使用、不需要外部依赖,最好能够容纳 Crontab 的所有基本功能,那么 Schedule 模块是你的不二之选。

使用它来调度任务可能只需要几行代码,感受一下:

# Python 实用宝典
import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

上面的代码表示每10分钟执行一次 job 函数,非常简单方便。你只需要引入 schedule 模块,通过调用 <strong>​scedule.every(时间数).时间类型.do(job)​</strong>  发布周期任务。

发布后的周期任务需要用 <strong>​run_pending​</strong> 函数来检测是否执行,因此需要一个 <strong>​While​</strong>​1. 2단계 작업을 수행하는 것이 불편합니다. ​

<strong>​2. 실행해야 할 예약된 작업이 수백 개 있는 경우 Crontab의 </strong> 관리 특히 불편할 거예요. ​

또 다른 옵션은 Celery이지만 Celery의 구성이 더 번거롭기 때문에 가벼운 일정 관리 도구만 필요한 경우 Celery는 좋은 선택이 아닙니다.

가벼운 작업 일정 도구를 사용하고 외부 종속성 없이 최대한 간단하고 사용하기 쉽고 Crontab의 모든 기본 기능을 수용할 수 있기를 원한다면 일정 모듈이 최선의 선택입니다.

작업을 예약하는 데 사용하는 데는 몇 줄의 코드만 필요할 수 있습니다. 느껴보세요:

pip install schedule
위 코드는 작업 기능이 10분마다 실행된다는 의미로 매우 간단하고 편리합니다.

<span style="color: #666666;">​scedule.every(시간 번호).time type.do(작업)​</span> code >

정기적인 작업을 게시하세요. 출시 후 정기적인 작업에서는

​run_pending​

함수를 사용하여 실행 여부를 감지합니다. a

<strong>​While​</strong>

loop는 이 함수를 지속적으로 폴링합니다.

다음에서는 Schedule 모듈의 설치와 기본 및 고급 사용법을 자세히 설명합니다.

1. 준비

종속성 설치 명령을 입력하려면 다음 방법 중 하나를 선택하세요.

1 Windows 환경 Cmd(시작-실행-CMD)를 엽니다.

2. MacOS 환경 터미널을 엽니다(터미널에 들어가려면 Command+Space).

3. VSCode 편집기나 Pycharm을 사용하는 경우 인터페이스 하단의 터미널을 직접 사용할 수 있습니다.

# Python 实用宝典
import schedule
import time

def job():
    print("I'm working...")

# 每十分钟执行任务
schedule.every(10).minutes.do(job)
# 每个小时执行任务
schedule.every().hour.do(job)
# 每天的10:30执行任务
schedule.every().day.at("10:30").do(job)
# 每个月执行任务
schedule.every().monday.do(job)
# 每个星期三的13:15分执行任务
schedule.every().wednesday.at("13:15").do(job)
# 每分钟的第17秒执行任务
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

2. 기본 사용법은🎜🎜🎜🎜🎜 제가 거기에 있었던 기사의 시작 부분에 언급되어 있으며 다음은 작업 예약에 대한 추가 예입니다. 🎜
# Python 实用宝典
import schedule
import time

def job_that_executes_once():
    # 此处编写的任务只会执行一次...
    return schedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

while True:
    schedule.run_pending()
    time.sleep(1)
🎜 위의 예에서 몇 달에서 몇 초까지의 구성이 다뤄지는 것을 볼 수 있습니다. 하지만🎜작업을 한 번만 실행하려면🎜 다음과 같이 구성하면 됩니다.🎜
# Python 实用宝典
import schedule

def greet(name):
    print('Hello', name)

# do() 将额外的参数传递给job函数
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
🎜🎜매개변수 전달🎜🎜🎜실행을 위해 작업에 전달해야 하는 매개변수가 있는 경우 이렇게만 하면 됩니다. :🎜
# Python 实用宝典
import schedule

def hello():
    print('Hello world')

schedule.every().second.do(hello)

all_jobs = schedule.get_jobs()
🎜🎜모든 현재 매개변수 가져오기 작업 🎜🎜🎜 현재 작업을 모두 가져오려면: 🎜
# Python 实用宝典
import schedule

def greet(name):
    print('Hello {}'.format(name))

schedule.every().second.do(greet)

schedule.clear()
🎜🎜 모든 작업 취소 🎜🎜🎜 일부 메커니즘이 트리거되면 현재 프로그램의 모든 작업을 지워야 합니다. 즉시: 🎜
# Python 实用宝典
import schedule

def greet(name):
    print('Hello {}'.format(name))

# .tag 打标签
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

# get_jobs(标签):可以获取所有该标签的任务
friends = schedule.get_jobs('friend')

# 取消所有 daily-tasks 标签的任务
schedule.clear('daily-tasks')
🎜🎜 태그 기능 🎜🎜🎜 in 작업 설정 시 나중에 작업 관리를 용이하게 하기 위해 작업에 태그를 지정하여 해당 작업을 필터링하여 해당 작업을 얻거나 작업을 취소할 수 있습니다. . 🎜
# Python 实用宝典
import schedule
from datetime import datetime, timedelta, time

def job():
    print('Boo')

# 每个小时运行作业,18:30后停止
schedule.every(1).hours.until("18:30").do(job)

# 每个小时运行作业,2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)

# 每个小时运行作业,8个小时后停止
schedule.every(1).hours.until(timedelta(hours=8)).do(job)

# 每个小时运行作业,11:32:42后停止
schedule.every(1).hours.until(time(11, 33, 42)).do(job)

# 每个小时运行作业,2020-5-17 11:36:20后停止
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
🎜🎜작업 기한 설정🎜🎜🎜특정 시간까지 작업이 마감되어야 하는 경우 다음 방법을 사용할 수 있습니다. 🎜
# Python 实用宝典
import schedule

def job_1():
    print('Foo')

def job_2():
    print('Bar')

schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all()

# 立即运行所有作业,每次作业间隔10秒
schedule.run_all(delay_seconds=10)
🎜기한이 지나면 작업을 실행할 수 없습니다. 🎜🎜🎜일정에 관계없이 모든 작업을 즉시 실행🎜🎜

如果某个机制触发了,你需要立即运行所有作业,可以调用 <strong>​schedule.run_all()​</strong> :

# Python 实用宝典
import schedule

def job_1():
    print('Foo')

def job_2():
    print('Bar')

schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all()

# 立即运行所有作业,每次作业间隔10秒
schedule.run_all(delay_seconds=10)

3.高级使用

装饰器安排作业

如果你觉得设定作业这种形式太啰嗦了,也可以使用装饰器模式:

# Python 实用宝典
from schedule import every, repeat, run_pending
import time

# 此装饰器效果等同于 schedule.every(10).minutes.do(job)
@repeat(every(10).minutes)
def job():
    print("I am a scheduled job")

while True:
    run_pending()
    time.sleep(1)

并行执行

默认情况下,Schedule 按顺序执行所有作业。其背后的原因是,很难找到让每个人都高兴的并行执行模型。

不过你可以通过多线程的形式来运行每个作业以解决此限制:

# Python 实用宝典
import threading
import time
import schedule

def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)

while True:
    schedule.run_pending()
    time.sleep(1)

日志记录

Schedule 模块同时也支持 logging 日志记录,这么使用:

# Python 实用宝典
import schedule
import logging

logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
# 日志级别为DEBUG
schedule_logger.setLevel(level=logging.DEBUG)

def job():
    print("Hello, Logs")

schedule.every().second.do(job)

schedule.run_all()

schedule.clear()

效果如下:

DEBUG:schedule:Running *all* 1 jobs with 0s delay in between
DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={})
Hello, Logs
DEBUG:schedule:Deleting *all* jobs

异常处理

Schedule 不会自动捕捉异常,它遇到异常会直接抛出,这会导致一个严重的问题:后续所有的作业都会被中断执行,因此我们需要捕捉到这些异常。

你可以手动捕捉,但是某些你预料不到的情况需要程序进行自动捕获,加一个装饰器就能做到了:

# Python 实用宝典
import functools

def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator

@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0

schedule.every(5).minutes.do(bad_task)

这样,<strong>​bad_task​</strong> 在执行时遇到的任何错误,都会被 <strong>​catch_exceptions ​</strong> 捕获,这点在保证调度任务正常运转的时候非常关键。

위 내용은 매우 실용적입니다! Python의 주기적 작업 아티팩트인 Schedule 모듈!의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 51cto.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제