首頁  >  文章  >  後端開發  >  使用自訂 Django 命令自動重新載入 Celery 工作線程

使用自訂 Django 命令自動重新載入 Celery 工作線程

WBOY
WBOY原創
2024-07-22 09:40:111299瀏覽

Automatically reload Celery workers with a custom Django command

Celery 之前有一個 --autoreload 標誌,現已刪除。然而,Django 在其manage.py runserver 指令中內建了自動重新載入功能。 Celery 工作執行緒中缺乏自動重新加載會造成令人困惑的開發體驗:更新 Python 程式碼會導致 Django 伺服器使用當前程式碼重新加載,但伺服器觸發的任何任務都將在 Celery 工作執行緒中執行過時的程式碼。

這篇文章將向您展示如何建立一個自訂的 manage.py runworker 命令,該命令在開發過程中自動重新載入 Celery 工作執行緒。這個指令將模仿 runserver,我們將看看 Django 的自動重新載入是如何在幕後工作的。

在我們開始之前

這篇文章假設您有一個已經安裝了 Celery 的 Django 應用程式(指南)。它還假設您了解 Django 中的項目和應用程式之間的差異。

所有原始碼和文件連結均適用於發佈時(2024 年 7 月)目前版本的 Django 和 Celery。如果您在遙遠的將來閱讀本文,事情可能會改變。

最後,主專案目錄將在貼文的範例中命名為 my_project。

解決方案:自訂命令

我們將建立一個名為 runworker 的自訂管理.py 指令。由於 Django 透過其 runsever 命令提供自動重新加載,因此我們將使用 runserver 的原始程式碼作為自訂命令的基礎。

您可以透過在專案的任何應用程式中建立 management/commands/ 目錄來在 Django 中建立命令。建立目錄後,您可以在該目錄 (docs) 中放置一個帶有您要建立的命令名稱的 Python 檔案。

假設您的專案有一個名為 polls 的應用程序,我們將在 polls/management/commands/runworker.py 中建立一個檔案並添加以下程式碼:

# polls/management/commands/runworker.py

import sys
from datetime import datetime

from celery.signals import worker_init

from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import autoreload

from my_project.celery import app as celery_app


class Command(BaseCommand):
    help = "Starts a Celery worker instance with auto-reloading for development."

    # Validation is called explicitly each time the worker instance is reloaded.
    requires_system_checks = []
    suppressed_base_arguments = {"--verbosity", "--traceback"}

    def add_arguments(self, parser):
        parser.add_argument(
            "--skip-checks",
            action="store_true",
            help="Skip system checks.",
        )
        parser.add_argument(
            "--loglevel",
            choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"),
            type=str.upper,  # Transforms user input to uppercase.
            default="INFO",
        )

    def handle(self, *args, **options):
        autoreload.run_with_reloader(self.run_worker, **options)

    def run_worker(self, **options):
        # If an exception was silenced in ManagementUtility.execute in order
        # to be raised in the child process, raise it now.
        autoreload.raise_last_exception()

        if not options["skip_checks"]:
            self.stdout.write("Performing system checks...\n\n")
            self.check(display_num_errors=True)

        # Need to check migrations here, so can't use the
        # requires_migrations_check attribute.
        self.check_migrations()

        # Print Django info to console when the worker initializes.
        worker_init.connect(self.on_worker_init)

        # Start the Celery worker.
        celery_app.worker_main(
            [
                "--app",
                "my_project",
                "--skip-checks",
                "worker",
                "--loglevel",
                options["loglevel"],
            ]
        )

    def on_worker_init(self, sender, **kwargs):
        quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C"

        now = datetime.now().strftime("%B %d, %Y - %X")
        version = self.get_version()
        print(
            f"{now}\n"
            f"Django version {version}, using settings {settings.SETTINGS_MODULE!r}\n"
            f"Quit the worker instance with {quit_command}.",
            file=self.stdout,
        )

重要提示: 請務必將 my_project 的所有實例替換為您的 Django 專案的名稱。

如果您想複製並貼上此程式碼並繼續編程,您可以安全地停在此處,而無需閱讀本文的其餘部分。這是一個優雅的解決方案,將在您開發 Django 和 Celery 專案時為您提供良好的服務。但是,如果您想了解更多有關其工作原理的信息,請繼續閱讀。

它是如何工作的(可選)

我不會逐行查看此程式碼,而是按主題討論最有趣的部分。如果您還不熟悉 Django 自訂命令,您可能需要在繼續之前查看文件。

自動裝彈

這部分感覺最神奇。在指令的handle()方法體內,呼叫了Django的內部autoreload.run_with_reloader()。它接受一個回調函數,每次專案中的 Python 檔案發生變更時都會執行該函數。 實際上是如何運作的?

讓我們來看看 autoreload.run_with_reloader() 函數原始碼的簡化版本。簡化的函數會重寫、內聯和刪除程式碼,使其操作更加清晰。

# NOTE: This has been dramatically pared down for clarity.

def run_with_reloader(callback_func, *args, **kwargs):
    # NOTE: This will evaluate to False the first time it is run.
    is_inside_subprocess = os.getenv("RUN_MAIN") == "true"

    if is_inside_subprocess:
        # The reloader watches for Python file changes.
        reloader = get_reloader()

        django_main_thread = threading.Thread(
            target=callback_func, args=args, kwargs=kwargs
        )
        django_main_thread.daemon = True
        django_main_thread.start()

        # When the code changes, the reloader exits with return code 3.
        reloader.run(django_main_thread)

    else:
        # Returns Python path and the arguments passed to the command.
        # Example output: ['/path/to/python', './manage.py', 'runworker']
        args = get_child_arguments()

        subprocess_env = {**os.environ, "RUN_MAIN": "true"}
        while True:
            # Rerun the manage.py command in a subprocess.
            p = subprocess.run(args, env=subprocess_env, close_fds=False)
            if p.returncode != 3:
                sys.exit(p.returncode)

當manage.py runworker在命令列中執行時,它會先呼叫handle()方法,該方法會呼叫run_with_reloader()。

在 run_with_reloader() 內部,它將檢查名為 RUN_MAIN 的環境變數是否具有「true」值。當函數第一次被呼叫時,RUN_MAIN 應該沒有值。

當 RUN_MAIN 未設定為「true」時,run_with_reloader() 將進入循環。在迴圈內,它將啟動一個子進程,重新執行傳入的manage.py [command_name],然後等待該子進程退出。如果子進程退出並傳回程式碼 3,則循環的下一次迭代將啟動一個新的子進程並等待。這個循環將一直運行,直到子進程返回不為 3 的退出代碼(或直到使用者使用 ctrl + c 退出)。一旦得到非3的返回碼,就會完全退出程式。

產生的子程序再次執行manage.py指令(在我們的例子中是manage.py runworker),並且該指令將再次呼叫run_with_reloader()。這次,RUN_MAIN 將被設定為“true”,因為該命令在子進程中運行。

現在 run_with_reloader() 知道它位於子進程中,它將獲得一個監視檔案變更的重新載入器,將提供的回呼函數放入執行緒中,並將其傳遞給開始監視變更的重新載入器。

當重新載入器偵測到檔案變更時,它會執行 sys.exit(3)。這將退出子流程,從而觸發生成子流程的程式碼的下一次循環迭代。反過來,會啟動一個使用更新版本程式碼的新子流程。

系統檢查和遷移

預設情況下,Django 指令在執行其handle() 方法之前執行系統檢查。但是,對於 runserver 和我們的自訂 runworker 命令,我們希望延遲執行這些命令,直到進入我們提供給 run_with_reloader() 的回呼中。在我們的例子中,這是我們的 run_worker() 方法。這使我們能夠運行自動重新載入的命令,同時修復損壞的系統檢查。

為了延遲執行系統檢查,需要將requires_system_checks屬性的值設為空列表,並透過在run_worker()主體中呼叫self.check()來執行檢查。與 runserver 一樣,我們的自訂 runworker 命令也會檢查是否所有遷移都已執行,如果有待處理的遷移,它會顯示警告。

因為我們已經在 run_worker() 方法中執行 Django 的系統檢查,所以我們透過向 Celery 傳遞 --skip-checks 標誌來停用系統檢查,以防止重複工作。

所有與系統檢查和遷移相關的程式碼都是直接從 runserver 命令原始碼中提取的。

celery_app.worker_main()

我們的實作使用 celery_app.worker_main() 直接從 Python 啟動 Celery 工作程序,而不是向 Celery 發動攻擊。

on_worker_init()

此程式碼在工作進程初始化時執行,顯示日期和時間、Django 版本以及退出命令。它是根據 runserver 啟動時顯示的資訊建模的。

其他 runserver 樣板

以下行也從 runserver 原始碼中刪除:

  • suppressed_base_arguments = {"--verbosity", "--traceback"}
  • autoreload.raise_last_exception()

日誌等級

我們的自訂命令具有可配置的日誌級別,以防開發人員希望在不修改程式碼的情況下從 CLI 調整設定。

更進一步

我研究了 Django 和 Celery 的原始碼來建立這個實現,並且有很多擴展它的機會。您可以配置該命令以接受更多 Celery 的工作參數。或者,您可以建立一個自訂的 manage.py 命令,它會自動重新載入任何 shell 命令,就像 David Browne 在本要點中所做的那樣。

如果您覺得本文有用,請隨時留下按讚或留言。感謝您的閱讀。

以上是使用自訂 Django 命令自動重新載入 Celery 工作線程的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
上一篇:Python 函數 4下一篇:Python 函數 4