Heim >Backend-Entwicklung >Python-Tutorial >Laden Sie Celery-Worker automatisch mit einem benutzerdefinierten Django-Befehl neu

Laden Sie Celery-Worker automatisch mit einem benutzerdefinierten Django-Befehl neu

WBOY
WBOYOriginal
2024-07-22 09:40:111334Durchsuche

Automatically reload Celery workers with a custom Django command

Celery hatte zuvor ein --autoreload-Flag, das inzwischen entfernt wurde. Django verfügt jedoch über ein integriertes automatisches Neuladen in seinen runserver-Befehl manage.py. Das Fehlen eines automatischen Neuladens in Celery-Workern führt zu einer verwirrenden Entwicklungserfahrung: Durch die Aktualisierung des Python-Codes wird der Django-Server mit dem aktuellen Code neu geladen, aber alle Aufgaben, die der Server auslöst, führen veralteten Code im Celery-Worker aus.

In diesem Beitrag erfahren Sie, wie Sie einen benutzerdefinierten manage.py-Runworker-Befehl erstellen, der Celery-Worker während der Entwicklung automatisch neu lädt. Der Befehl wird dem Runserver nachempfunden sein und wir werden einen Blick darauf werfen, wie Djangos automatisches Neuladen unter der Haube funktioniert.

Bevor wir beginnen

In diesem Beitrag wird davon ausgegangen, dass Sie eine Django-App haben, auf der Celery bereits installiert ist (Anleitung). Es setzt außerdem voraus, dass Sie die Unterschiede zwischen Projekten und Anwendungen in Django verstehen.

Alle Links zum Quellcode und zur Dokumentation beziehen sich auf die aktuellen Versionen von Django und Celery zum Zeitpunkt der Veröffentlichung (Juli 2024). Wenn Sie dies in ferner Zukunft lesen, haben sich die Dinge möglicherweise geändert.

Schließlich wird das Hauptprojektverzeichnis in den Beispielen des Beitrags my_project genannt.

Lösung: ein benutzerdefinierter Befehl

Wir werden einen benutzerdefinierten manage.py-Befehl namens runworker erstellen. Da Django über seinen Runsever-Befehl ein automatisches Neuladen ermöglicht, verwenden wir den Quellcode von Runserver als Grundlage für unseren benutzerdefinierten Befehl.

Sie können einen Befehl in Django erstellen, indem Sie in einer beliebigen Anwendung Ihres Projekts ein Verzeichnis „management/commands/“ erstellen. Sobald die Verzeichnisse erstellt wurden, können Sie eine Python-Datei mit dem Namen des Befehls, den Sie erstellen möchten, in diesem Verzeichnis ablegen (docs).

Angenommen, Ihr Projekt verfügt über eine Anwendung namens polls, erstellen wir eine Datei unter polls/management/commands/runworker.py und fügen den folgenden Code hinzu:

# polls/management/commands/runworker.py

import sys
from datetime import datetime

from celery.signals import worker_init

from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import autoreload

from my_project.celery import app as celery_app


class Command(BaseCommand):
    help = "Starts a Celery worker instance with auto-reloading for development."

    # Validation is called explicitly each time the worker instance is reloaded.
    requires_system_checks = []
    suppressed_base_arguments = {"--verbosity", "--traceback"}

    def add_arguments(self, parser):
        parser.add_argument(
            "--skip-checks",
            action="store_true",
            help="Skip system checks.",
        )
        parser.add_argument(
            "--loglevel",
            choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"),
            type=str.upper,  # Transforms user input to uppercase.
            default="INFO",
        )

    def handle(self, *args, **options):
        autoreload.run_with_reloader(self.run_worker, **options)

    def run_worker(self, **options):
        # If an exception was silenced in ManagementUtility.execute in order
        # to be raised in the child process, raise it now.
        autoreload.raise_last_exception()

        if not options["skip_checks"]:
            self.stdout.write("Performing system checks...\n\n")
            self.check(display_num_errors=True)

        # Need to check migrations here, so can't use the
        # requires_migrations_check attribute.
        self.check_migrations()

        # Print Django info to console when the worker initializes.
        worker_init.connect(self.on_worker_init)

        # Start the Celery worker.
        celery_app.worker_main(
            [
                "--app",
                "my_project",
                "--skip-checks",
                "worker",
                "--loglevel",
                options["loglevel"],
            ]
        )

    def on_worker_init(self, sender, **kwargs):
        quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C"

        now = datetime.now().strftime("%B %d, %Y - %X")
        version = self.get_version()
        print(
            f"{now}\n"
            f"Django version {version}, using settings {settings.SETTINGS_MODULE!r}\n"
            f"Quit the worker instance with {quit_command}.",
            file=self.stdout,
        )

WICHTIG: Ersetzen Sie unbedingt alle Instanzen von my_project durch den Namen Ihres Django-Projekts.

Wenn Sie diesen Code kopieren und einfügen und mit der Programmierung fortfahren möchten, können Sie hier getrost aufhören, ohne den Rest dieses Beitrags zu lesen. Dies ist eine elegante Lösung, die Ihnen bei der Entwicklung Ihres Django & Celery-Projekts gute Dienste leisten wird. Wenn Sie jedoch mehr über die Funktionsweise erfahren möchten, lesen Sie weiter.

Wie es funktioniert (optional)

Anstatt diesen Code Zeile für Zeile durchzugehen, werde ich die interessantesten Teile thematisch besprechen. Wenn Sie mit den benutzerdefinierten Django-Befehlen noch nicht vertraut sind, sollten Sie die Dokumentation lesen, bevor Sie fortfahren.

Automatisches Nachladen

Dieser Teil fühlt sich am magischsten an. Im Hauptteil der handle()-Methode des Befehls gibt es einen Aufruf von Djangos internem autoreload.run_with_reloader(). Es akzeptiert eine Rückruffunktion, die jedes Mal ausgeführt wird, wenn eine Python-Datei im Projekt geändert wird. Wie funktioniert das eigentlich?

Werfen wir einen Blick auf eine vereinfachte Version des Quellcodes der Funktion autoreload.run_with_reloader(). Die vereinfachte Funktion schreibt Code neu, fügt ihn ein und löscht ihn, um Klarheit über seine Funktionsweise zu schaffen.

# NOTE: This has been dramatically pared down for clarity.

def run_with_reloader(callback_func, *args, **kwargs):
    # NOTE: This will evaluate to False the first time it is run.
    is_inside_subprocess = os.getenv("RUN_MAIN") == "true"

    if is_inside_subprocess:
        # The reloader watches for Python file changes.
        reloader = get_reloader()

        django_main_thread = threading.Thread(
            target=callback_func, args=args, kwargs=kwargs
        )
        django_main_thread.daemon = True
        django_main_thread.start()

        # When the code changes, the reloader exits with return code 3.
        reloader.run(django_main_thread)

    else:
        # Returns Python path and the arguments passed to the command.
        # Example output: ['/path/to/python', './manage.py', 'runworker']
        args = get_child_arguments()

        subprocess_env = {**os.environ, "RUN_MAIN": "true"}
        while True:
            # Rerun the manage.py command in a subprocess.
            p = subprocess.run(args, env=subprocess_env, close_fds=False)
            if p.returncode != 3:
                sys.exit(p.returncode)

Wenn manage.py runworker in der Befehlszeile ausgeführt wird, ruft es zuerst die Methode handle() auf, die run_with_reloader() aufruft.

In run_with_reloader() wird geprüft, ob eine Umgebungsvariable namens RUN_MAIN den Wert „true“ hat. Beim ersten Aufruf der Funktion sollte RUN_MAIN keinen Wert haben.

Wenn RUN_MAIN nicht auf „true“ gesetzt ist, tritt run_with_reloader() in eine Schleife ein. Innerhalb der Schleife wird ein Unterprozess gestartet, der die übergebene Datei manage.py [Befehlsname] erneut ausführt, und dann darauf warten, dass dieser Unterprozess beendet wird. Wenn der Unterprozess mit Rückkehrcode 3 beendet wird, startet die nächste Iteration der Schleife einen neuen Unterprozess und wartet. Die Schleife wird ausgeführt, bis ein Unterprozess einen Exit-Code zurückgibt, der nicht 3 ist (oder bis der Benutzer mit Strg + C beendet wird). Sobald es einen Rückkehrcode ungleich 3 erhält, wird das Programm vollständig beendet.

Der erzeugte Unterprozess führt den Befehl manage.py erneut aus (in unserem Fall manage.py runworker), und der Befehl ruft erneut run_with_reloader() auf. Dieses Mal wird RUN_MAIN auf „true“ gesetzt, da der Befehl in einem Unterprozess ausgeführt wird.

Da run_with_reloader() nun weiß, dass es sich in einem Unterprozess befindet, ruft es einen Reloader ab, der auf Dateiänderungen überwacht, fügt die bereitgestellte Rückruffunktion in einen Thread ein und übergibt sie an den Reloader, der mit der Suche nach Änderungen beginnt.

Wenn ein Reloader eine Dateiänderung erkennt, führt er sys.exit(3) aus. Dadurch wird der Unterprozess beendet, was die nächste Iteration der Schleife aus dem Code auslöst, der den Unterprozess erzeugt hat. Im Gegenzug wird ein neuer Unterprozess gestartet, der eine aktualisierte Version des Codes verwendet.

Systemprüfungen und Migrationen

Standardmäßig führen Django-Befehle Systemprüfungen durch, bevor sie ihre handle()-Methode ausführen. Im Fall von runserver und unserem benutzerdefinierten runworker-Befehl möchten wir jedoch deren Ausführung verschieben, bis wir uns innerhalb des Rückrufs befinden, den wir run_with_reloader() bereitstellen. In unserem Fall ist dies unsere run_worker()-Methode. Dadurch können wir den Befehl mit automatischem Neuladen ausführen und gleichzeitig fehlerhafte Systemprüfungen beheben.

Um die Ausführung der Systemprüfungen zu verschieben, wird der Wert des Attributs require_system_checks auf eine leere Liste gesetzt und die Prüfungen werden durch Aufruf von self.check() im Hauptteil von run_worker() durchgeführt. Wie runserver prüft auch unser benutzerdefinierter runworker-Befehl, ob alle Migrationen ausgeführt wurden, und zeigt eine Warnung an, wenn Migrationen ausstehen.

Da wir die Systemprüfungen von Django bereits innerhalb der run_worker()-Methode durchführen, deaktivieren wir die Systemprüfungen in Celery, indem wir ihr das Flag --skip-checks übergeben, um doppelte Arbeit zu verhindern.

Der gesamte Code, der sich auf Systemprüfungen und Migrationen bezieht, wurde direkt aus dem Quellcode des Runserver-Befehls übernommen.

celery_app.worker_main()

Unsere Implementierung startet den Celery-Worker direkt aus Python mithilfe von celery_app.worker_main(), anstatt ihn an Celery zu senden.

on_worker_init()

Dieser Code wird ausgeführt, wenn der Worker initialisiert wird, und zeigt Datum und Uhrzeit, die Django-Version und den Befehl zum Beenden an. Es ist den Informationen nachempfunden, die beim Booten des Runservers angezeigt werden.

Andere Runserver-Boilerplate

Die folgenden Zeilen wurden ebenfalls aus der Runserver-Quelle entfernt:

  • suppressed_base_arguments = {"--verbosity", "--traceback"🎜>
  • autoreload.raise_last_Exception()
Protokollebene

Unser benutzerdefinierter Befehl verfügt über eine konfigurierbare Protokollebene für den Fall, dass der Entwickler die Einstellung über die CLI anpassen möchte, ohne den Code zu ändern.

Weiter gehen

Ich habe den Quellcode für Django & Celery durchforstet, um diese Implementierung zu erstellen, und es gibt viele Möglichkeiten, sie zu erweitern. Sie können den Befehl so konfigurieren, dass er mehr Worker-Argumente von Celery akzeptiert. Alternativ könnten Sie einen benutzerdefinierten manage.py-Befehl erstellen, der

jeden Shell-Befehl automatisch neu lädt, wie es David Browne in diesem Gist. getan hat

Wenn Sie dies nützlich fanden, hinterlassen Sie gerne ein „Gefällt mir“ oder einen Kommentar. Danke fürs Lesen.

Das obige ist der detaillierte Inhalt vonLaden Sie Celery-Worker automatisch mit einem benutzerdefinierten Django-Befehl neu. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Vorheriger Artikel:Python-Funktionen 4Nächster Artikel:Python-Funktionen 4