ホームページ  >  記事  >  バックエンド開発  >  カスタム Django コマンドを使用して Celery ワーカーを自動的にリロードする

カスタム Django コマンドを使用して Celery ワーカーを自動的にリロードする

WBOY
WBOYオリジナル
2024-07-22 09:40:111299ブラウズ

Automatically reload Celery workers with a custom Django command

Celery には以前は --autoreload フラグがありましたが、現在は削除されています。ただし、Django には、manage.py runserver コマンドに自動リロード機能が組み込まれています。 Celery ワーカーに自動リロードがないため、開発エクスペリエンスが混乱します。Python コードを更新すると、Django サーバーは現在のコードでリロードされますが、サーバーが起動するタスクはすべて、Celery ワーカーで古いコードを実行します。

この投稿では、開発中に Celery ワーカーを自動的にリロードするカスタム manage.py runworker コマンドを構築する方法を説明します。このコマンドは runserver をモデルにして、Django の自動リロードが内部でどのように機能するかを見ていきます。

始める前に

この投稿は、Celery がすでにインストールされている Django アプリがあることを前提としています (ガイド)。また、Django のプロジェクトとアプリケーションの違いを理解していることも前提としています。

ソース コードおよびドキュメントへのすべてのリンクは、発行時 (2024 年 7 月) の現在のバージョンの Django および Celery のものになります。あなたが遠い将来にこれを読んでいるなら、状況は変わっているかもしれません。

最後に、投稿の例では、メイン プロジェクト ディレクトリの名前は my_project になります。

解決策: カスタム コマンド

runworker という名前のカスタム manage.py コマンドを作成します。 Django は、runserver コマンドを介して自動リロードを提供するため、runserver のソース コードをカスタム コマンドの基礎として使用します。

プロジェクトのアプリケーション内に manage/commands/ ディレクトリを作成することで、Django でコマンドを作成できます。ディレクトリが作成されたら、そのディレクトリ内に作成したいコマンドの名前を含む Python ファイルを置くことができます (docs)。

プロジェクトにpollsという名前のアプリケーションがあると仮定して、polls/management/commands/runworker.pyにファイルを作成し、次のコードを追加します。

# polls/management/commands/runworker.py

import sys
from datetime import datetime

from celery.signals import worker_init

from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import autoreload

from my_project.celery import app as celery_app


class Command(BaseCommand):
    help = "Starts a Celery worker instance with auto-reloading for development."

    # Validation is called explicitly each time the worker instance is reloaded.
    requires_system_checks = []
    suppressed_base_arguments = {"--verbosity", "--traceback"}

    def add_arguments(self, parser):
        parser.add_argument(
            "--skip-checks",
            action="store_true",
            help="Skip system checks.",
        )
        parser.add_argument(
            "--loglevel",
            choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"),
            type=str.upper,  # Transforms user input to uppercase.
            default="INFO",
        )

    def handle(self, *args, **options):
        autoreload.run_with_reloader(self.run_worker, **options)

    def run_worker(self, **options):
        # If an exception was silenced in ManagementUtility.execute in order
        # to be raised in the child process, raise it now.
        autoreload.raise_last_exception()

        if not options["skip_checks"]:
            self.stdout.write("Performing system checks...\n\n")
            self.check(display_num_errors=True)

        # Need to check migrations here, so can't use the
        # requires_migrations_check attribute.
        self.check_migrations()

        # Print Django info to console when the worker initializes.
        worker_init.connect(self.on_worker_init)

        # Start the Celery worker.
        celery_app.worker_main(
            [
                "--app",
                "my_project",
                "--skip-checks",
                "worker",
                "--loglevel",
                options["loglevel"],
            ]
        )

    def on_worker_init(self, sender, **kwargs):
        quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C"

        now = datetime.now().strftime("%B %d, %Y - %X")
        version = self.get_version()
        print(
            f"{now}\n"
            f"Django version {version}, using settings {settings.SETTINGS_MODULE!r}\n"
            f"Quit the worker instance with {quit_command}.",
            file=self.stdout,
        )

重要: my_project のすべてのインスタンスを必ず Django プロジェクトの名前に置き換えてください。

このコードをコピーして貼り付けてプログラミングを続行したい場合は、この投稿の残りを読まなくても、ここで安全に終了できます。これは、Django & Celery プロジェクトを開発する際に役立つ洗練されたソリューションです。ただし、その仕組みについてさらに詳しく知りたい場合は、読み続けてください。

仕組み (オプション)

このコードを 1 行ずつレビューするのではなく、トピックごとに最も興味深い部分について説明します。 Django カスタム コマンドにまだ慣れていない場合は、続行する前にドキュメントを確認してください。

自動リロード

この部分が最も魔法のように感じられます。コマンドの handle() メソッドの本体内に、Django の内部 autoreload.run_with_reloader() への呼び出しがあります。プロジェクト内で Python ファイルが変更されるたびに実行されるコールバック関数を受け入れます。 実際はどのように機能しますか?

autoreload.run_with_reloader() 関数のソース コードの簡略化されたバージョンを見てみましょう。簡略化された関数は、コードの書き換え、インライン化、削除を行い、その動作を明確にします。

# NOTE: This has been dramatically pared down for clarity.

def run_with_reloader(callback_func, *args, **kwargs):
    # NOTE: This will evaluate to False the first time it is run.
    is_inside_subprocess = os.getenv("RUN_MAIN") == "true"

    if is_inside_subprocess:
        # The reloader watches for Python file changes.
        reloader = get_reloader()

        django_main_thread = threading.Thread(
            target=callback_func, args=args, kwargs=kwargs
        )
        django_main_thread.daemon = True
        django_main_thread.start()

        # When the code changes, the reloader exits with return code 3.
        reloader.run(django_main_thread)

    else:
        # Returns Python path and the arguments passed to the command.
        # Example output: ['/path/to/python', './manage.py', 'runworker']
        args = get_child_arguments()

        subprocess_env = {**os.environ, "RUN_MAIN": "true"}
        while True:
            # Rerun the manage.py command in a subprocess.
            p = subprocess.run(args, env=subprocess_env, close_fds=False)
            if p.returncode != 3:
                sys.exit(p.returncode)

manage.py runworker がコマンドラインで実行されると、最初に handle() メソッドが呼び出され、その後 run_with_reloader() が呼び出されます。

run_with_reloader() 内で、RUN_MAIN という環境変数の値が「true」かどうかを確認します。関数が最初に呼び出されるとき、RUN_MAIN には値がありません。

RUN_MAIN が "true" に設定されていない場合、run_with_reloader() はループに入ります。ループ内で、渡された manage.py [command_name] を再実行するサブプロセスを開始し、そのサブプロセスが終了するのを待ちます。サブプロセスが戻りコード 3 で終了した場合、ループの次の反復で新しいサブプロセスが開始され、待機します。このループは、サブプロセスが 3 以外の終了コードを返すまで (またはユーザーが ctrl + c で終了するまで) 実行されます。 3 以外のリターン コードを取得すると、プログラムは完全に終了します。

生成されたサブプロセスは、manage.py コマンド (この場合は manage.py runworker) を再度実行し、コマンドは再度 run_with_reloader() を呼び出します。今回は、コマンドがサブプロセスで実行されているため、RUN_MAIN は「true」に設定されます。

run_with_reloader() は自分がサブプロセス内にあることを認識したので、ファイルの変更を監視するリローダーを取得し、提供されたコールバック関数をスレッドに配置して、変更の監視を開始するリローダーに渡します。

リローダーはファイルの変更を検出すると、sys.exit(3) を実行します。これによりサブプロセスが終了し、サブプロセスを生成したコードからループの次の反復がトリガーされます。次に、更新されたバージョンのコードを使用する新しいサブプロセスが起動されます。

システムのチェックと移行

デフォルトでは、Django コマンドは handle() メソッドを実行する前にシステム チェックを実行します。ただし、runserver とカスタム runworker コマンドの場合は、run_with_reloader() に提供するコールバック内に入るまで、これらの実行を延期する必要があります。私たちの場合、これは run_worker() メソッドです。これにより、壊れたシステム チェックを修正しながら、自動リロードでコマンドを実行できます。

システム チェックの実行を延期するには、requires_system_checks 属性の値を空のリストに設定し、run_worker() の本体で self.check() を呼び出してチェックを実行します。 runserver と同様に、カスタム runworker コマンドもすべての移行が実行されたかどうかを確認し、保留中の移行がある場合は警告を表示します。

run_worker() メソッド内で Django のシステム チェックをすでに実行しているため、重複した作業を防ぐために --skip-checks フラグを渡して Celery のシステム チェックを無効にします。

システム チェックと移行に関連するコードはすべて、runserver コマンドのソース コードから直接取得されました。

celery_app.worker_main()

私たちの実装では、Celery にシェルアウトするのではなく、celery_app.worker_main() を使用して Python から直接 Celery ワーカーを起動します。

on_worker_init()

このコードはワーカーが初期化されるときに実行され、日付と時刻、Django のバージョン、終了するコマンドが表示されます。これは、runserver の起動時に表示される情報をモデルにしています。

その他のランサーバー定型文

次の行も runserver ソースから引用されました:

  • suppressed_base_arguments = {"--verbosity", "--traceback"}
  • autoreload.raise_last_Exception()

ログレベル

開発者がコードを変更せずに CLI から設定を調整したい場合に備えて、カスタム コマンドには構成可能なログ レベルがあります。

さらに進む

私はこの実装を構築するために Django と Celery のソース コードを徹底的に調べましたが、拡張する機会はたくさんあります。 Celery のワーカー引数をさらに受け入れるようにコマンドを構成できます。あるいは、David Browne がこの Gist で行ったように、任意の シェル コマンドを自動的にリロードするカスタム manage.py コマンドを作成することもできます。

これが役に立ったと思われた場合は、お気軽に「いいね!」またはコメントを残してください。読んでいただきありがとうございます。

以上がカスタム Django コマンドを使用して Celery ワーカーを自動的にリロードするの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
前の記事:Python 関数 4次の記事:Python 関数 4