ホームページ  >  記事  >  バックエンド開発  >  Python で構成ファイルのホットロードを実装する方法

Python で構成ファイルのホットロードを実装する方法

WBOY
WBOY転載
2023-05-07 18:31:201736ブラウズ

    背景

    最近の作業要件により、構成ホット リロード機能を実装するために、既存のプロジェクトに新しい機能を追加する必要があります。いわゆる構成ホットリロードとは、サービスが構成更新メッセージを受信した後、サービスを再起動せずに最新の構成を使用してタスクを実行できることを意味します。

    実装方法

    以下では、マルチプロセス、マルチスレッド、およびコルーチンのメソッドを使用して、構成のホットリロードを実装します。

    複数のプロセスを使用して構成のホット ロードを実装する

    コード実装で複数のプロセスを使用する場合、メイン プロセス 1 が構成を更新して命令を送信し、タスク呼び出しはプロセス 2 になります。構成ホットロードを実装しますか? 毛織物?

    信号セマフォを使用してホット ロードを実装する

    Python で構成ファイルのホットロードを実装する方法

    メイン プロセスが構成更新メッセージを受信するとき (構成読み取りはどのように構成更新メッセージを受信しますか? ?ここではまだ説明しません)、メインプロセスはサブプロセス 1 に kill シグナルを送信し、サブプロセス 1 は kill シグナルを受信した後に終了し、その後、信号処理関数が最新の構成ファイルを使用して新しいプロセスを開始します。 . ミッションを続行します。

    #main 関数

    def main():
        # 启动一个进程执行任务
        p1 = Process(target=run, args=("p1",))
        p1.start()
    
        monitor(p1, run) # 注册信号
        processes["case100"] = p1 #将进程pid保存
        num = 0 
        while True: # 模拟获取配置更新
            print(
                f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
            print(f"{processes=}\n")
            sleep(2)
            if num == 4:
                kill_process(processes["case100"]) # kill 当前进程
            if num == 8:
                kill_process(processes["case100"]) # kill 当前进程
            if num == 12:
                kill_process(processes["case100"]) # kill 当前进程
            num += 1

    signal_handler 関数

    def signal_handler(process: Process, func, signum, frame):
        # print(f"{signum=}")
        global counts
    
        if signum == 17:  # 17 is SIGCHILD 
            # 这个循环是为了忽略SIGTERM发出的信号,避免抢占了主进程发出的SIGCHILD
            for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
                signal.signal(signame, SIG_DFL)
    
            print("Launch a new process")
            p = multiprocessing.Process(target=func, args=(f"p{counts}",))
            p.start()
            monitor(p, run)
            processes["case100"] = p
            counts += 1
    
        if signum == 2:
            if process.is_alive():
                print(f"Kill {process} process")
                process.terminate()
            signal.signal(SIGCHLD, SIG_IGN)
            sys.exit("kill parent process")

    完全なコードは次のとおりです

    #! /usr/local/bin/python3.8
    from multiprocessing import Process
    from typing import Dict
    import signal
    from signal import SIGCHLD, SIGTERM, SIGINT, SIGQUIT, SIG_DFL, SIG_IGN
    import multiprocessing
    from multiprocessing import Process
    from typing import Callable
    from data import processes
    import sys
    from functools import partial
    import time
    
    processes: Dict[str, Process] = {}
    counts = 2
    
    
    def run(process: Process):
        while True:
            print(f"{process} running...")
            time.sleep(1)
    
    
    def kill_process(process: Process):
        print(f"kill {process}")
        process.terminate()
    
    
    def monitor(process: Process, func: Callable):
        for signame in [SIGTERM, SIGCHLD, SIGINT, SIGQUIT]:
            # SIGTERM is kill signal.
            # No SIGCHILD is not trigger singnal_handler,
            # No SIGINT is not handler ctrl+c,
            # No SIGQUIT is RuntimeError: reentrant call inside <_io.BufferedWriter name=&#39;<stdout>&#39;>
            signal.signal(signame, partial(signal_handler, process, func))
    
    
    def signal_handler(process: Process, func, signum, frame):
        print(f"{signum=}")
        global counts
    
        if signum == 17:  # 17 is SIGTERM
            for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
                signal.signal(signame, SIG_DFL)
            print("Launch a new process")
            p = multiprocessing.Process(target=func, args=(f"p{counts}",))
            p.start()
            monitor(p, run)
            processes["case100"] = p
            counts += 1
    
        if signum == 2:
            if process.is_alive():
                print(f"Kill {process} process")
                process.terminate()
            signal.signal(SIGCHLD, SIG_IGN)
            sys.exit("kill parent process")
    
    
    def main():
        p1 = Process(target=run, args=("p1",))
        p1.start()
        monitor(p1, run)
        processes["case100"] = p1
        num = 0
        while True:
            print(
                f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
            print(f"{processes=}\n")
            time.sleep(2)
            if num == 4:
                kill_process(processes["case100"])
            if num == 8:
                kill_process(processes["case100"])
            if num == 12:
                kill_process(processes["case100"])
            num += 1
    
    
    if __name__ == &#39;__main__&#39;:
        main()

    実行結果は次のとおりです

    multiprocessing.active_children()=[<Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>}
    
    p1 running...
    p1 running...
    kill <Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>
    multiprocessing.active_children()=[<Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>}
    
    signum=17
    Launch a new process
    p2 running...
    p2 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>}
    
    p2 running...
    p2 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>}
    
    p2 running...
    p2 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>}
    
    p2 running...
    p2 running...
    kill <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>
    signum=17
    Launch a new process
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 stopped exitcode=-SIGTERM>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-3&#39; pid=2675 parent=2532 started>}
    
    p3 running...
    p3 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-3&#39; pid=2675 parent=2532 started>], count=1

    #概要

    利点: セマフォを使用すると、複数のプロセス間の通信の問題に対処できます。

    欠点: コードを書くのが難しく、書かれたコードを理解するのが難しい。セマフォの使用法に精通している必要がありますが、そうでないと、自分でバグを作成するのが簡単です (経験豊富なドライバーを除き、すべての初心者は注意してセマフォを使用する必要があります)。 ##process.terminate()

    送信されたシグナルは

    SIGTERM

    数値は 15 ですが、

    signal_handler が初めてシグナルを受信したときは数値 = 17 です。シグナルが 15 の場合、前のプロセスを強制終了できないという問題が発生します。セマフォに詳しい方、アドバイスをいただければ幸いです。 multiprocessing.Event を使用して構成のホット ロードを実装する実装ロジックは、メイン プロセス 1 が構成を更新し、指示を送信することです。プロセス 2 はスケジュール タスクを開始します。

    このとき、メインプロセス1は構成を更新した後、プロセス2に指示を送ります。このときの指示はEventを使って非同期イベントを通知することです。

    コードに直接移動します

    スケジューラー関数

    def scheduler():
        while True:
            print(&#39;wait message...&#39;)
            case_configurations = scheduler_notify_queue.get()
            print(f"Got case configurations {case_configurations=}...")
    
            task_schedule_event.set() # 设置set之后, is_set 为True
    
            print(f"Schedule will start ...")
            while task_schedule_event.is_set(): # is_set 为True的话,那么任务就会一直执行
                run(case_configurations)
    
            print("Clearing all scheduling job ...")

    event_scheduler関数

    def event_scheduler(case_config):
    
        scheduler_notify_queue.put(case_config)
        print(f"Put cases config to the Queue ...")
    
        task_schedule_event.clear() # clear之后,is_set 为False
        print(f"Clear scheduler jobs ...")
    
        print(f"Schedule job ...")

    完全なコード 実行結果は次のとおりです

    import multiprocessing
    import time
    
    
    scheduler_notify_queue = multiprocessing.Queue()
    task_schedule_event = multiprocessing.Event()
    
    
    def run(case_configurations: str):
        print(f&#39;{case_configurations} running...&#39;)
        time.sleep(3)
    
    
    def scheduler():
        while True:
            print(&#39;wait message...&#39;)
            case_configurations = scheduler_notify_queue.get()
    
            print(f"Got case configurations {case_configurations=}...")
            task_schedule_event.set()
    
            print(f"Schedule will start ...")
            while task_schedule_event.is_set():
                run(case_configurations)
    
            print("Clearing all scheduling job ...")
    
    
    def event_scheduler(case_config: str):
    
        scheduler_notify_queue.put(case_config)
        print(f"Put cases config to the Queue ...")
    
        task_schedule_event.clear()
        print(f"Clear scheduler jobs ...")
    
        print(f"Schedule job ...")
    
    
    def main():
        scheduler_notify_queue.put(&#39;1&#39;)
        p = multiprocessing.Process(target=scheduler)
        p.start()
    
        count = 1
        print(f&#39;{count=}&#39;)
        while True:
            if count == 5:
                event_scheduler(&#39;100&#39;)
            if count == 10:
                event_scheduler(&#39;200&#39;)
            count += 1
            time.sleep(1)
    
    
    if __name__ == &#39;__main__&#39;:
        main()

    実行結果は次のとおりです

    wait message...
    Got case configurations case_configurations=&#39;1&#39;...
    Schedule will start ...
    1 running...
    1 running...
    Put cases config to the Queue ...
    Clear scheduler jobs ...
    Schedule job ...
    Clearing all scheduling job ...
    wait message...
    Got case configurations case_configurations=&#39;100&#39;...
    Schedule will start ...
    100 running...
    Put cases config to the Queue ...
    Clear scheduler jobs ...
    Schedule job ...
    Clearing all scheduling job ...
    wait message...
    Got case configurations case_configurations=&#39;200&#39;...
    Schedule will start ...
    200 running...
    200 running...
    概要

    Event イベント通知を使用すると、コードはエラーが発生しにくく、記述されるコードの量も少なくなり、読みやすくなります。以前のセマフォ方式と比較して、この方式をより頻繁に使用することをお勧めします。

    マルチスレッドやコルーチンを利用する場合も、実際には上記の実装方法と同じです。唯一の違いは、queue

    event

    .

    # threading
    scheduler_notify_queue = queue.Queue()
    task_schedule_event = threading.Event()
    
    # async
    scheduler_notify_queue = asyncio.Queue()
    task_schedule_event = asyncio.Event()
    という異なるライブラリが呼び出されることです。

    以上がPython で構成ファイルのホットロードを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。