Heim >Backend-Entwicklung >Python-Tutorial >Gewährleistung einer fairen Verarbeitung mit Sellerie – Teil I

Gewährleistung einer fairen Verarbeitung mit Sellerie – Teil I

Mary-Kate Olsen
Mary-Kate OlsenOriginal
2024-11-16 09:10:03667Durchsuche

Ensuring Fair Processing with Celery — Part I

Wenn Sie mit Python vertraut sind, haben Sie wahrscheinlich schon einmal von Sellerie gehört. Es ist oft die erste Wahl für die asynchrone Bearbeitung von Aufgaben, wie z. B. Bildverarbeitung oder E-Mail-Versand.

Als ich mit einigen Leuten sprach, bemerkte ich, dass viele Entwickler Celery zunächst beeindruckend finden, aber mit zunehmender Größe und Komplexität ihrer Projekte lässt ihre Begeisterung nach. Während einige aus berechtigten Gründen von Sellerie Abstand nehmen, erforschen andere seinen Kern vielleicht einfach nicht tief genug, um ihn an ihre Bedürfnisse anzupassen.

In diesem Blog möchte ich einen der Gründe diskutieren, warum einige Entwickler nach Alternativen suchen oder sogar benutzerdefinierte Hintergrund-Worker-Frameworks erstellen: faire Verarbeitung. In Umgebungen, in denen Benutzer/Mieter Aufgaben unterschiedlicher Größe einreichen, kann das Risiko, dass sich die hohe Arbeitsbelastung eines Mieters auf andere auswirkt, zu Engpässen und Frustration führen.

Ich führe Sie durch Strategien zur Implementierung einer fairen Verarbeitung in Celery und stelle eine ausgewogene Aufgabenverteilung sicher, sodass kein einzelner Mieter Ihre Ressourcen dominieren kann.

Das Problem

Lassen Sie uns auf eine häufige Herausforderung eingehen, mit der mandantenfähige Anwendungen konfrontiert sind, insbesondere solche, die die Stapelverarbeitung verarbeiten. Stellen Sie sich vor, Sie verfügen über ein System, in dem Benutzer ihre Bildverarbeitungsaufgaben in eine Warteschlange stellen können, sodass sie ihre verarbeiteten Bilder nach einer kurzen Wartezeit erhalten. Durch dieses Setup bleibt Ihre API nicht nur reaktionsfähig, sondern Sie können Ihre Mitarbeiter auch nach Bedarf skalieren, um die Last effizient zu bewältigen.

Alles läuft reibungslos – bis ein Mieter beschließt, eine riesige Menge Bilder zur Verarbeitung einzusenden. Sie haben mehrere Worker im Einsatz und diese können sich sogar automatisch skalieren, um einer erhöhten Nachfrage gerecht zu werden, sodass Sie sich auf Ihre Infrastruktur verlassen können. Das Problem beginnt jedoch, wenn andere Mieter versuchen, kleinere Stapel – vielleicht nur ein paar Bilder – in die Warteschlange zu stellen und plötzlich mit langen Wartezeiten ohne Aktualisierungen konfrontiert werden. Bevor Sie es wissen, strömen Support-Tickets herein und Benutzer beschweren sich darüber, dass Ihr Dienst langsam ist oder gar nicht reagiert.

Dieses Szenario kommt nur allzu häufig vor, da Celery Aufgaben standardmäßig in der Reihenfolge ihres Eingangs verarbeitet. Wenn ein Mieter Ihre Mitarbeiter mit einer enormen Menge an Aufgaben überfordert, reichen selbst die besten automatischen Skalierungsstrategien möglicherweise nicht aus, um Verzögerungen für andere Mieter zu verhindern. Infolgedessen erleben diese Benutzer möglicherweise Serviceniveaus, die hinter den versprochenen oder erwarteten Werten zurückbleiben.

Mengenbegrenzung bei Sellerie

Eine wirksame Strategie zur Gewährleistung einer fairen Verarbeitung ist die Einführung von Ratenbegrenzungen. Damit können Sie die Anzahl der Aufgaben steuern, die jeder Mieter innerhalb eines bestimmten Zeitraums einreichen kann. Dies verhindert, dass ein einzelner Mieter Ihre Arbeitskräfte monopolisiert und stellt sicher, dass alle Mieter eine faire Chance haben, ihre Aufgaben zu bearbeiten.

Celery verfügt über eine integrierte Funktionalität zur Ratenbegrenzung auf Aufgabenebene:

# app.py
from celery import Celery

app = Celery("app", broker="redis://localhost:6379/0")

@app.task(rate_limit="10/m") # Limit to 10 tasks per minute
def process_data(data):
    print(f"Processing data: {data}")

# Call the task
if __name__ == "__main__":
    for i in range(20):
        process_data.delay(f"data_{i}")

Sie können den Worker ausführen, indem Sie Folgendes ausführen:

celery -A app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1

Führen Sie nun das app.py-Skript aus, um 20 Aufgaben auszulösen:

python app.py

Wenn Sie es schaffen, es lokal auszuführen, werden Sie feststellen, dass es zwischen den einzelnen Aufgaben eine Verzögerung gibt, um sicherzustellen, dass die Ratenbegrenzung durchgesetzt wird. Jetzt denken Sie wahrscheinlich, dass uns das bei unserem Problem nicht wirklich weiterhilft, und da haben Sie vollkommen Recht. Diese integrierte Ratenbegrenzung von Celery ist nützlich für Szenarien, in denen unsere Aufgabe möglicherweise Anrufe an externe Dienste mit strengen Ratenbegrenzungen umfasst.

Dieses Beispiel zeigt, dass die integrierte Funktion für komplexe Szenarien möglicherweise zu einfach ist. Wir können diese Einschränkung jedoch überwinden, indem wir das Framework von Celery genauer untersuchen. Sehen wir uns an, wie wir ein angemessenes Ratenlimit mit automatischer Wiederholung pro Mandant einrichten können.

Wir werden Redis verwenden, um das Tariflimit pro Mieter zu verfolgen. Redis ist eine beliebte Datenbank und ein beliebter Broker für Celery, also nutzen wir diese Komponente, die sich wahrscheinlich bereits in Ihrem Stack befindet.

Lassen Sie uns ein paar Bibliotheken importieren:

import time
import redis
from celery import Celery, Task

Jetzt implementieren wir eine benutzerdefinierte Basisaufgabenklasse für unsere ratenbegrenzte Aufgabe:

# Initialize a Redis client
redis_client = redis.StrictRedis(host="localhost", port=6379, db=0)

class RateLimitedTask(Task):
    def __init__(self, *args, **kwargs):
        # Set default rate limit
        if not hasattr(self, "custom_rate_limit"):
            self.custom_rate_limit = 10

        super().__init__(*args, **kwargs)

    def __call__(self, tenant_id, *args, **kwargs):
        # Rate limiting logic
        key = f"rate_limit:{tenant_id}:{self.name}"

        # Increment the count for this minute
        current_count = redis_client.incr(key)

        if current_count == 1:
            # Set expiration for the key if it's the first request
            redis_client.expire(key, 10)

        if current_count > self.custom_rate_limit:
            print(f"Rate limit exceeded for tenant {tenant_id}. Retrying...")
            raise self.retry(countdown=10)

        return super().__call__(tenant_id, *args, **kwargs)

Diese benutzerdefinierte Klasse verfolgt die Anzahl der Aufgaben, die von einem bestimmten Mandanten mithilfe von Redis ausgelöst werden, und legt eine TTL von 10 Sekunden fest. Wenn das Ratenlimit überschritten wird, wird die Aufgabe innerhalb von 10 Sekunden erneut versucht. Grundsätzlich liegt unser Standardratenlimit bei 10 Aufgaben innerhalb von 10 Sekunden.

Lassen Sie uns eine Beispielaufgabe definieren, die die Verarbeitung emuliert:

@app.task(base=RateLimitedTask, custom_rate_limit=5)
def process(tenant_id: int, data):
    """
    Mock processing task that takes 0.3 seconds to complete.
    """
    print(f"Processing data: {data} for tenant: {tenant_id}")
    time.sleep(0.3)

Hier haben wir eine Prozessaufgabe definiert und Sie sehen, dass ich das custom_rate_limit auf Aufgabenebene ändern kann. Wenn wir kein custom_rate_limit angeben, wird der Standardwert 10 zugewiesen. Jetzt hat sich unser Ratenlimit auf 5 Aufgaben innerhalb von 10 Sekunden geändert.

Lassen Sie uns nun einige Aufgaben für verschiedene Mieter auslösen:

if __name__ == "__main__":
    for i in range(20):
        process.apply_async(args=(1, f"data_{i}"))

    for i in range(10):
        process.apply_async(args=(2, f"data_{i}"))

Wir definieren 20 Aufgaben für die Mieter-ID 1 und 10 Aufgaben für die Mieter-ID 2.

Unser vollständiger Code würde also so aussehen:

# app.py
import time
import redis
from celery import Celery, Task

app = Celery(
    "app",
    broker="redis://localhost:6379/0",
    broker_connection_retry_on_startup=False,
)

# Initialize a Redis client
redis_client = redis.StrictRedis(host="localhost", port=6379, db=0)


class RateLimitedTask(Task):
    def __init__(self, *args, **kwargs):
        if not hasattr(self, "custom_rate_limit"):
            self.custom_rate_limit = 10

        super().__init__(*args, **kwargs)

    def __call__(self, tenant_id, *args, **kwargs):
        # Rate limiting logic
        key = f"rate_limit:{tenant_id}:{self.name}"

        # Increment the count for this minute
        current_count = redis_client.incr(key)

        if current_count == 1:
            # Set expiration for the key if it's the first request
            redis_client.expire(key, 10)

        if current_count > self.custom_rate_limit:
            print(f"Rate limit exceeded for tenant {tenant_id}. Retrying...")
            raise self.retry(countdown=10)

        return super().__call__(tenant_id, *args, **kwargs)


@app.task(base=RateLimitedTask, custom_rate_limit=5)
def process(tenant_id: int, data):
    """
    Mock processing task that takes 0.3 seconds to complete.
    """
    print(f"Processing data: {data} for tenant: {tenant_id}")
    time.sleep(0.3)

if __name__ == "__main__":
    for i in range(20):
        process.apply_async(args=(1, f"data_{i}"))

    for i in range(10):
        process.apply_async(args=(2, f"data_{i}"))

Lassen Sie uns unseren Arbeiter ausführen:

celery -A app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1

Führen Sie nun das app.py-Skript aus, um die Aufgaben auszulösen:

python app.py

Wie Sie sehen können, verarbeitet der Worker 5 Aufgaben des ersten Mandanten und richtet einen Wiederholungsversuch für alle anderen Aufgaben ein. Anschließend übernimmt es 5 Aufgaben des zweiten Mandanten und richtet einen Wiederholungsversuch für die anderen Aufgaben ein, und es geht weiter.

Mit diesem Ansatz können Sie ein Ratenlimit pro Mandant definieren. Wie Sie jedoch in unserem Beispiel sehen können, führt eine zu strenge Ratenbegrenzung bei einer Aufgabe, die sehr schnell ausgeführt wird, dazu, dass der Arbeiter eine Zeit lang nichts tut. Die Feinabstimmung der Ratenbegrenzungsparameter ist von entscheidender Bedeutung und hängt von der spezifischen Aufgabe und dem Volumen ab. Zögern Sie nicht zu experimentieren, bis Sie die optimale Balance gefunden haben.

Abschluss

Wir haben untersucht, wie die Standardaufgabenverarbeitung von Celery zu Ungerechtigkeit in Umgebungen mit mehreren Mandanten führen kann und wie Ratenbegrenzung zur Behebung dieses Problems beitragen kann. Durch die Implementierung mandantenspezifischer Ratenbegrenzungen können wir verhindern, dass ein einzelner Mandant Ressourcen monopolisiert, und eine gerechtere Verteilung der Rechenleistung sicherstellen.

Dieser Ansatz bietet eine solide Grundlage für eine faire Verarbeitung von Sellerie. Es gibt jedoch auch andere Techniken, die es wert sind, untersucht zu werden, um die Aufgabenbearbeitung in mandantenfähigen Anwendungen weiter zu optimieren. Obwohl ich ursprünglich geplant hatte, alles in einem Beitrag abzudecken, erweist sich dieses Thema als ziemlich umfangreich! Um Klarheit zu gewährleisten und diesen Artikel fokussiert zu halten, habe ich beschlossen, ihn in zwei Teile zu teilen.

Im nächsten Teil dieser Serie befassen wir uns mit Aufgabenprioritäten als einem weiteren Mechanismus zur Verbesserung von Fairness und Effizienz. Mit diesem Ansatz können Sie Aufgaben anhand unterschiedlicher Kriterien unterschiedliche Prioritätsstufen zuweisen und so sicherstellen, dass kritische Aufgaben auch in Zeiten hoher Nachfrage zeitnah bearbeitet werden.

Bleiben Sie gespannt auf die nächste Folge!

Das obige ist der detaillierte Inhalt vonGewährleistung einer fairen Verarbeitung mit Sellerie – Teil I. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn