Heim >Datenbank >Redis >Detaillierte Erläuterung der asynchronen Aufgabenverarbeitung in Redis

Detaillierte Erläuterung der asynchronen Aufgabenverarbeitung in Redis

王林
王林Original
2023-06-20 08:26:531741Durchsuche

Mit der Weiterentwicklung von Webanwendungen wird die Notwendigkeit einer asynchronen Aufgabenverarbeitung immer wichtiger, da wir sicherstellen müssen, dass Benutzer die Anwendung weiterhin verwenden können, bevor sie die Aufgabe abschließen. In diesem Fall kann mit Ausnahme der asynchronen Aufgabenverarbeitung keine parallele Verarbeitung mehrerer Aufgaben erreicht werden. Daher ist es häufig erforderlich, einige Tools zur Verarbeitung asynchroner Aufgaben zu verwenden, unter denen Redis ein sehr nützliches Tool ist.

Redis ist eine leistungsstarke In-Memory-Datenbank, mit der Daten schnell gespeichert, gelesen und verwaltet werden können. Sein Hauptzweck ist die Implementierung von Caching und Messaging, es kann jedoch auch zur Verarbeitung asynchroner Aufgaben verwendet werden. Redis verfügt über integrierte Warteschlangen- und Veröffentlichungs-/Abonnementfunktionen, was es zu einem sehr nützlichen Tool für die asynchrone Aufgabenverarbeitung macht.

In diesem Artikel stellen wir vor, wie Sie mit Redis die asynchrone Aufgabenverarbeitung implementieren.

  1. Herstellen einer Redis-Verbindung

Zuerst müssen wir einen Redis-Client verwenden, um eine Verbindung mit dem Redis-Server herzustellen. Es kann jeder Client verwendet werden, der Redis-Verbindungen unterstützt. Pythons Redis-Py ist eine sehr gute Wahl. Bitte stellen Sie sicher, dass Sie redis-py global installieren:

pip install redis

Als nächstes können Sie mit dem folgenden Befehl eine Redis-Verbindung herstellen:

import redis

redis_conn = redis.Redis(host='localhost', port=6379, db=0)

Hier haben wir eine Redis-Verbindungsinstanz namens redis_conn erstellt, die eine Verbindung zum lokalen Redis-Server (Host) herstellt = 'localhost'), die Portnummer ist 6379 (Port=6379) und die Datenbanknummer 0 wird verwendet (db=0).

  1. Redis Queue

Redis Queue (RQ) ist eine Python-Bibliothek, die Redis als Backend verwendet, um eine verteilte Aufgabenwarteschlange zu implementieren. RQ basiert auf den Befehlen lpush und rpop von Redis und bietet daher eine sehr gute Leistung.

Installieren Sie RQ und Redis:

pip install rq redis
  1. Synchronische Aufgaben

In einer synchronen Aufgabe führt der Hauptthread den gesamten Code aus und wartet auf den Abschluss der Aufgabe. Hier ist der Beispielcode für eine synchronisierte Aufgabe:

import time

def task():
    # 等待5秒
    time.sleep(5)
    print('Task complete')

print('Starting task')
task()
print('Task ended')

Im obigen Beispiel haben wir eine Funktion namens „Task“ definiert, die 5 Sekunden wartet und dann „Aufgabe abgeschlossen“ ausgibt. Dann rufen wir diese Aufgabe im Hauptthread auf, geben „Aufgabe wird gestartet“ aus, warten 5 Sekunden und geben „Aufgabe beendet“ aus.

Dieser Ansatz funktioniert für kurzlebige Aufgaben, aber bei lang andauernden Aufgaben ist der Benutzer sehr unzufrieden, da er die Anwendung nicht verwenden kann.

Jetzt wollen wir sehen, wie man diese Aufgabe in eine asynchrone Aufgabe umwandelt.

  1. Asynchrone Aufgaben

Die Idee, eine Aufgabe in eine asynchrone Aufgabe umzuwandeln, besteht darin, die Aufgabe in einem separaten Thread oder Prozess auszuführen und mit der Ausführung anderen Codes im Hauptthread fortzufahren. Auf diese Weise kann der Benutzer die Anwendung weiterhin verwenden, während Aufgaben im Hintergrund ausgeführt werden.

In Python können Sie Threads oder Prozesse verwenden, um Hintergrundaufgaben auszuführen. Wenn jedoch mehrere Aufgaben ausgeführt werden, kann sich die Anzahl der Threads und Prozesse erhöhen und es können auch Probleme wie Deadlocks und Synchronisierungsprobleme auftreten.

Die Verwendung von Redis kann dieses Problem lösen, da Redis über eine integrierte Warteschlangenstruktur verfügt, mit der wir diese Probleme vermeiden können. Die Grundidee der Implementierung asynchroner Aufgaben in Redis besteht darin, eine Aufgabenwarteschlange zu erstellen und Aufgaben zur Warteschlange hinzuzufügen. Erstellen Sie dann einen separaten Task-Executor, um die Tasks in der Warteschlange abzurufen und auszuführen.

Da Redis eine In-Memory-Datenbank ist, können Sie sie zum Speichern aller Warteschlangendaten verwenden. Auf diese Weise können wir den Aufgabenstatus in Redis speichern und müssen keine Threads oder Prozesse zur Bearbeitung von Aufgaben verwenden.

Das Folgende ist ein Beispielcode für eine asynchrone Aufgabe:

from rq import Queue
from redis import Redis

redis_conn = Redis()
q = Queue(connection=redis_conn)

def task():
    # 等待5秒
    time.sleep(5)
    print('Task complete')

print('Starting task')
job = q.enqueue(task)
print('Task started')

Im obigen Code erstellen wir zunächst eine Redis-Warteschlange mit dem Namen q und definieren dann eine Funktion mit dem Namen task. Wenn wir eine Aufgabe im Hauptthread aufrufen, fügen wir die Aufgabe mithilfe der Enqueue-Methode des Warteschlangenobjekts zur Warteschlange hinzu. Diese Methode gibt ein Aufgabenobjekt namens job zurück, das die Aufgabe in der Warteschlange darstellt. Dann geben wir „Aufgabe gestartet“ aus und der Warteschlangenausführer holt sich die Aufgabe im Hintergrund und führt sie aus.

  1. Aufgaben überwachen

Im vorherigen Beispiel könnten wir das Jobobjekt verwenden, um den Aufgabenstatus zu überwachen und die Ergebnisse abzurufen. Hier ist der Beispielcode zum Überwachen einer Aufgabe:

from rq import Queue
from redis import Redis

redis_conn = Redis()
q = Queue(connection=redis_conn)

def task():
    # 等待5秒
    time.sleep(5)
    return 'Task complete'

print('Starting task')
job = q.enqueue(task)
print('Task started')

# 检查任务状态并获取结果
while job.result is None:
    print('Task still processing')
    time.sleep(1)

print('Task complete: {}'.format(job.result))

Im obigen Code prüfen wir die Ergebniseigenschaft der Aufgabe, bis sie nicht mehr leer ist. Anschließend geben wir „Aufgabe abgeschlossen:“ plus das Ergebnis des Aufgabenobjekts aus.

  1. Verwenden von Publish/Subscribe

Redis unterstützt auch ein Publish/Subscribe-Modell (Pub/Sub), was es zu einem sehr nützlichen Messaging-Tool macht. In diesem Modell veröffentlicht ein Herausgeber Nachrichten zu einem Thema, und Abonnenten abonnieren das Thema und erhalten alle Nachrichten zu diesem Thema.

Nehmen wir als Beispiel eine asynchrone Aufgabe, um die Implementierung mithilfe des Publish/Subscribe-Modells zu veranschaulichen.

Zuerst müssen wir für jede Aufgabe eine eindeutige ID erstellen und die Aufgabe zur Warteschlange hinzufügen. Anschließend veröffentlichen wir die Aufgaben-ID im Thema. Der Task-Executor abonniert das Thema und wenn eine Task-ID empfangen wird, ruft er die Task ab und führt sie aus.

Hier ist der Beispielcode zum Implementieren einer asynchronen Aufgabe mithilfe des Publish/Subscribe-Modells:

from rq import Queue
from redis import Redis
import uuid

redis_conn = Redis()
q = Queue(connection=redis_conn)

# 订阅任务主题并执行任务
def worker():
    while True:
        _, job_id = redis_conn.blpop('tasks')
        job = q.fetch_job(job_id.decode('utf-8'))
        job.perform()

# 发布任务并将其ID添加到队列中
def enqueue_task():
    job = q.enqueue(task)
    redis_conn.rpush('tasks', job.id)

def task():
    # 等待5秒
    time.sleep(5)
    return 'Task complete'

print('Starting workers')
for i in range(3):
    # 创建3个工作线程
    threading.Thread(target=worker).start()

print('Enqueueing task')
enqueue_task()
print('Task enqueued')

Im obigen Code definieren wir zunächst einen Task-Executor namens Worker, der kontinuierlich eine Schleife durchführt und geplante Aufgaben aus der Warteschlangen-ID abbricht. Wenn es die Aufgaben-ID erhält, verwendet es die Methode fetch_job, um das Aufgabenobjekt abzurufen und auszuführen.

Wir definieren außerdem eine Funktion namens enqueue_task, die eine asynchrone Aufgabe namens Job erstellt und ihre ID zur Warteschlange hinzufügt. Anschließend rufen wir diese Funktion im Hauptthread auf und veröffentlichen die Aufgaben-ID in einem Thema namens „Aufgaben“. Der Task-Ausführer ruft die Task ab und führt sie aus, wenn er die Task-ID erhält.

  1. Zusammenfassung

In diesem Artikel haben wir vorgestellt, wie man Redis zur Implementierung der asynchronen Aufgabenverarbeitung verwendet. Wir haben Warteschlangen, ein Publish/Subscribe-Modell und die RQ-Bibliothek in Python verwendet und gezeigt, wie man Aufgaben in den asynchronen Modus umwandelt und asynchrone Aufgaben verwendet, um Benutzererfahrungsprobleme zu lösen. Redis ist bei der Bearbeitung asynchroner Aufgaben sehr nützlich, da es integrierte Warteschlangen- und Veröffentlichungs-/Abonnementfunktionen mit sehr guter Leistung bietet. Wenn Sie Ihre Webanwendung reaktionsfähig machen und eine asynchrone Aufgabenverarbeitung implementieren möchten, ist Redis eine gute Wahl.

Das obige ist der detaillierte Inhalt vonDetaillierte Erläuterung der asynchronen Aufgabenverarbeitung in Redis. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn