>  기사  >  백엔드 개발  >  Python에서 구성 파일의 핫 로딩을 구현하는 방법

Python에서 구성 파일의 핫 로딩을 구현하는 방법

WBOY
WBOY앞으로
2023-05-07 18:31:201753검색

    Background

    최근 작업 요구 사항으로 인해 구성 핫 리로딩 기능을 구현하려면 기존 프로젝트에 새로운 기능을 추가해야 합니다. 소위 구성 핫 리로딩은 서비스가 구성 업데이트 메시지를 수신한 후 서비스를 다시 시작하지 않고도 최신 구성을 사용하여 작업을 수행할 수 있음을 의미합니다.

    구현 방법

    아래에서는 다중 프로세스, 다중 스레드 및 코루틴 방법을 사용하여 구성의 핫 리로드를 구현합니다.

    여러 프로세스를 사용하여 구성 핫 로딩 구현

    코드 구현에 여러 프로세스를 사용하는 경우 기본 프로세스 1이 구성을 업데이트하고 지침을 보내고 작업 호출은 프로세스 2입니다. 구성 핫 로딩을 구현하는 방법은 무엇입니까?

    신호 세마포어를 사용하여 핫 로딩 구현

    Python에서 구성 파일의 핫 로딩을 구현하는 방법

    메인 프로세스가 구성 업데이트 메시지를 받으면(구성 읽기가 구성 업데이트 메시지를 어떻게 수신합니까? 여기서는 논의하지 않겠습니다), 메인 프로세스는 하위 프로세스 1로 진행합니다 종료 신호를 보냅니다. 하위 프로세스 1은 종료 신호를 받은 후 종료됩니다. 그런 다음 신호 처리 기능은 새 프로세스를 시작하고 최신 구성 파일을 사용하여 작업을 계속 실행합니다.

    주요 함수

    def main():
        # 启动一个进程执行任务
        p1 = Process(target=run, args=("p1",))
        p1.start()
    
        monitor(p1, run) # 注册信号
        processes["case100"] = p1 #将进程pid保存
        num = 0 
        while True: # 模拟获取配置更新
            print(
                f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
            print(f"{processes=}\n")
            sleep(2)
            if num == 4:
                kill_process(processes["case100"]) # kill 当前进程
            if num == 8:
                kill_process(processes["case100"]) # kill 当前进程
            if num == 12:
                kill_process(processes["case100"]) # kill 当前进程
            num += 1

    signal_handler 함수

    def signal_handler(process: Process, func, signum, frame):
        # print(f"{signum=}")
        global counts
    
        if signum == 17:  # 17 is SIGCHILD 
            # 这个循环是为了忽略SIGTERM发出的信号,避免抢占了主进程发出的SIGCHILD
            for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
                signal.signal(signame, SIG_DFL)
    
            print("Launch a new process")
            p = multiprocessing.Process(target=func, args=(f"p{counts}",))
            p.start()
            monitor(p, run)
            processes["case100"] = p
            counts += 1
    
        if signum == 2:
            if process.is_alive():
                print(f"Kill {process} process")
                process.terminate()
            signal.signal(SIGCHLD, SIG_IGN)
            sys.exit("kill parent process")

    전체 코드는 다음과 같습니다

    #! /usr/local/bin/python3.8
    from multiprocessing import Process
    from typing import Dict
    import signal
    from signal import SIGCHLD, SIGTERM, SIGINT, SIGQUIT, SIG_DFL, SIG_IGN
    import multiprocessing
    from multiprocessing import Process
    from typing import Callable
    from data import processes
    import sys
    from functools import partial
    import time
    
    processes: Dict[str, Process] = {}
    counts = 2
    
    
    def run(process: Process):
        while True:
            print(f"{process} running...")
            time.sleep(1)
    
    
    def kill_process(process: Process):
        print(f"kill {process}")
        process.terminate()
    
    
    def monitor(process: Process, func: Callable):
        for signame in [SIGTERM, SIGCHLD, SIGINT, SIGQUIT]:
            # SIGTERM is kill signal.
            # No SIGCHILD is not trigger singnal_handler,
            # No SIGINT is not handler ctrl+c,
            # No SIGQUIT is RuntimeError: reentrant call inside <_io.BufferedWriter name=&#39;<stdout>&#39;>
            signal.signal(signame, partial(signal_handler, process, func))
    
    
    def signal_handler(process: Process, func, signum, frame):
        print(f"{signum=}")
        global counts
    
        if signum == 17:  # 17 is SIGTERM
            for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
                signal.signal(signame, SIG_DFL)
            print("Launch a new process")
            p = multiprocessing.Process(target=func, args=(f"p{counts}",))
            p.start()
            monitor(p, run)
            processes["case100"] = p
            counts += 1
    
        if signum == 2:
            if process.is_alive():
                print(f"Kill {process} process")
                process.terminate()
            signal.signal(SIGCHLD, SIG_IGN)
            sys.exit("kill parent process")
    
    
    def main():
        p1 = Process(target=run, args=("p1",))
        p1.start()
        monitor(p1, run)
        processes["case100"] = p1
        num = 0
        while True:
            print(
                f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
            print(f"{processes=}\n")
            time.sleep(2)
            if num == 4:
                kill_process(processes["case100"])
            if num == 8:
                kill_process(processes["case100"])
            if num == 12:
                kill_process(processes["case100"])
            num += 1
    
    
    if __name__ == &#39;__main__&#39;:
        main()

    실행 결과는 다음과 같습니다

    multiprocessing.active_children()=[<Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>}
    
    p1 running...
    p1 running...
    kill <Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>
    multiprocessing.active_children()=[<Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>}
    
    signum=17
    Launch a new process
    p2 running...
    p2 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>}
    
    p2 running...
    p2 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>}
    
    p2 running...
    p2 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>}
    
    p2 running...
    p2 running...
    kill <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>
    signum=17
    Launch a new process
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 stopped exitcode=-SIGTERM>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-3&#39; pid=2675 parent=2532 started>}
    
    p3 running...
    p3 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-3&#39; pid=2675 parent=2532 started>], count=1

    요약

    이용: 세마포어는 여러 간의 통신을 처리할 수 있습니다. 프로세스 문제.

    단점: 코드 작성이 어렵고, 작성된 코드를 이해하기 어렵습니다. 세마포어 사용법에 익숙해야 합니다. 그렇지 않으면 스스로 버그를 작성하기 쉽습니다. (숙련된 드라이버를 제외하고 모든 초보자는 주의해서 사용해야 합니다.)

    또 특별히 이해되지 않는 것은 프로세스입니다. 종료() 전송된 신호는 <code>SIGTERM이고 숫자는 15이지만, signal_handler가 처음으로 신호를 수신할 때는 숫자=17입니다. 15라는 신호를 받으면 이전 프로세스를 종료할 수 없는 문제가 발생합니다. 세마포어에 대해 잘 아시는 분들의 조언 부탁드립니다. process.terminate() 发送出信号是SIGTERM number是15,但是第一次signal_handler收到信号却是number=17,如果我要去处理15的信号,就会导致前一个进程不能kill掉的问题。欢迎有对信号量比较熟悉的大佬,前来指点迷津,不甚感谢。

    采用multiprocessing.Event 来实现配置热加载

    实现逻辑是主进程1 更新配置并发送指令。进程2启动调度任务。

    这时候当主进程1更新好配置之后,发送指令给进程2,这时候的指令就是用Event一个异步事件通知。

    直接上代码

    scheduler 函数

    def scheduler():
        while True:
            print(&#39;wait message...&#39;)
            case_configurations = scheduler_notify_queue.get()
            print(f"Got case configurations {case_configurations=}...")
    
            task_schedule_event.set() # 设置set之后, is_set 为True
    
            print(f"Schedule will start ...")
            while task_schedule_event.is_set(): # is_set 为True的话,那么任务就会一直执行
                run(case_configurations)
    
            print("Clearing all scheduling job ...")

    event_scheduler 函数

    def event_scheduler(case_config):
    
        scheduler_notify_queue.put(case_config)
        print(f"Put cases config to the Queue ...")
    
        task_schedule_event.clear() # clear之后,is_set 为False
        print(f"Clear scheduler jobs ...")
    
        print(f"Schedule job ...")

    完整代码如下

    import multiprocessing
    import time
    
    
    scheduler_notify_queue = multiprocessing.Queue()
    task_schedule_event = multiprocessing.Event()
    
    
    def run(case_configurations: str):
        print(f&#39;{case_configurations} running...&#39;)
        time.sleep(3)
    
    
    def scheduler():
        while True:
            print(&#39;wait message...&#39;)
            case_configurations = scheduler_notify_queue.get()
    
            print(f"Got case configurations {case_configurations=}...")
            task_schedule_event.set()
    
            print(f"Schedule will start ...")
            while task_schedule_event.is_set():
                run(case_configurations)
    
            print("Clearing all scheduling job ...")
    
    
    def event_scheduler(case_config: str):
    
        scheduler_notify_queue.put(case_config)
        print(f"Put cases config to the Queue ...")
    
        task_schedule_event.clear()
        print(f"Clear scheduler jobs ...")
    
        print(f"Schedule job ...")
    
    
    def main():
        scheduler_notify_queue.put(&#39;1&#39;)
        p = multiprocessing.Process(target=scheduler)
        p.start()
    
        count = 1
        print(f&#39;{count=}&#39;)
        while True:
            if count == 5:
                event_scheduler(&#39;100&#39;)
            if count == 10:
                event_scheduler(&#39;200&#39;)
            count += 1
            time.sleep(1)
    
    
    if __name__ == &#39;__main__&#39;:
        main()

    执行结果如下

    wait message...
    Got case configurations case_configurations=&#39;1&#39;...
    Schedule will start ...
    1 running...
    1 running...
    Put cases config to the Queue ...
    Clear scheduler jobs ...
    Schedule job ...
    Clearing all scheduling job ...
    wait message...
    Got case configurations case_configurations=&#39;100&#39;...
    Schedule will start ...
    100 running...
    Put cases config to the Queue ...
    Clear scheduler jobs ...
    Schedule job ...
    Clearing all scheduling job ...
    wait message...
    Got case configurations case_configurations=&#39;200&#39;...
    Schedule will start ...
    200 running...
    200 running...

    总结

    使用Event事件通知,代码不易出错,代码编写少,易读。相比之前信号量的方法,推荐大家多使用这种方式。

    使用多线程或协程的方式,其实和上述实现方式一致。唯一区别就是调用了不同库中,queue和 event

    multiprocessing.Event를 사용하여 구성 핫 로딩 구현🎜🎜구현 논리는 기본 프로세스 1이 구성을 업데이트하고 지침을 보내는 것입니다. 프로세스 2는 예약 작업을 시작합니다. 🎜🎜 이때 메인 프로세스 1이 구성을 업데이트한 후 프로세스 2에 명령을 보냅니다. 이때 명령은 Event를 사용하여 비동기 이벤트를 알리는 것입니다. 🎜🎜코드 바로가기🎜🎜🎜scheduler 함수🎜🎜
    # threading
    scheduler_notify_queue = queue.Queue()
    task_schedule_event = threading.Event()
    
    # async
    scheduler_notify_queue = asyncio.Queue()
    task_schedule_event = asyncio.Event()
    🎜🎜event_scheduler 함수🎜🎜rrreee🎜🎜전체 코드는 다음과 같습니다🎜🎜rrreee🎜실행 결과는 다음과 같습니다🎜rrreee🎜🎜요약 🎜🎜 🎜이벤트 활용하기 이벤트 알림을 사용하면 코드에 오류가 덜 발생하고 코드 작성이 줄어들고 읽기가 더 쉬워집니다. 기존 세마포어 방식에 비해 이 방식을 더 자주 사용하는 것이 좋습니다. 🎜🎜멀티스레딩이나 코루틴을 사용하는 것은 실제로 위의 구현 방법과 동일합니다. 유일한 차이점은 queueevent 등 다양한 라이브러리가 호출된다는 것입니다.🎜rrreee

    위 내용은 Python에서 구성 파일의 핫 로딩을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    성명:
    이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제