Heim > Artikel > Backend-Entwicklung > Einführung in PoolExecutor in Python-Parallelität (mit Beispielen)
Der Inhalt dieses Artikels ist eine Einführung in PoolExecutor in Python-Parallelität (mit Beispielen). Ich hoffe, dass er für Sie hilfreich ist.
Verwenden Sie Multi-Threading und Multi-Processing, um reguläre Parallelitätsanforderungen zu erfüllen. Schritte wie Start und Join können beim Start nicht weggelassen werden. Für komplexe Anforderungen sind 1-2 Warteschlangen erforderlich.
Da die Anforderungen immer komplexer werden und kein gutes Design und eine Abstraktion auf Funktionsebene vorhanden sind, wird das Debuggen umso schwieriger, je mehr Code vorhanden ist.
Für Aufgaben, die eine gleichzeitige Ausführung erfordern, aber keine hohen Echtzeitanforderungen stellen, können wir die Klasse PoolExecutor im Paket concurrent.futures verwenden, um sie zu implementieren.
Dieses Paket stellt zwei Executoren bereit: den Thread-Pool-Executor ThreadPoolExecutor und den Prozesspool-Executor ProcessPoolExecutor. Beide Executoren stellen dieselbe API bereit.
Der Hauptzweck des Pool-Konzepts ist die Wiederverwendung: Threads oder Prozesse können während ihres Lebenszyklus mehrmals verwendet werden. Es reduziert den Aufwand für die Erstellung von Threads und Prozessen und verbessert die Programmleistung. Die Wiederverwendung ist keine zwingende Regel, aber sie ist der Hauptgrund, warum Programmierer Pools in ihren Anwendungen verwenden.
Der Pool verfügt nur über eine feste Anzahl von Threads/Prozessen, die durch max_workers angegeben wird.
Die Aufgabe wird über executor.submit an die Aufgabenwarteschlange des Ausführenden übermittelt und ein zukünftiges Objekt wird zurückgegeben.
Future ist ein gängiges Parallelitätsentwurfsmuster.
Ein zukünftiges Objekt stellt einige Ergebnisse dar, die noch nicht fertig (abgeschlossen) sind. Dieses Ergebnis kann erhalten werden, nachdem es zu einem bestimmten Zeitpunkt in der „Zukunft“ fertig ist.
Aufgaben werden verschiedenen Mitarbeitern zur Ausführung zugewiesen.
Bitte beachten Sie jedoch, dass der Arbeiter nach der Ausführung einer Aufgabe beschäftigt ist, bis die Ausführung abgeschlossen ist! Wenn nicht genügend Arbeitskräfte vorhanden sind, warten andere Aufgaben weiter! Daher ist PoolExecutor nicht für Echtzeitaufgaben geeignet.
import concurrent.futures import time from itertools import count number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] def evaluate_item(x): for i in count(x): # count 是无限迭代器,会一直递增。 print(f"{x} - {i}") time.sleep(0.01) if __name__ == "__main__": # 进程池 start_time_2 = time.time() # 使用 with 在离开此代码块时,自动调用 executor.shutdown(wait=true) 释放 executor 资源 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # 将 10 个任务提交给 executor,并收集 futures futures = [executor.submit(evaluate_item, item) for item in number_list] # as_completed 方法等待 futures 中的 future 完成 # 一旦某个 future 完成,as_completed 就立即返回该 future # 这个方法,使每次返回的 future,总是最先完成的 future # 而不是先等待任务 1,再等待任务 2... for future in concurrent.futures.as_completed(futures): print(future.result()) print ("Thread pool execution in " + str(time.time() - start_time_2), "seconds")
Im obigen Code belegen die fünf Aufgaben mit den Elementen 1 2 3 4 5 immer alle Arbeiter, während die fünf Aufgaben 6 7 8 9 10 ewig warten! ! !
Detaillierte API-Beschreibung
concurrent.futures enthält drei Teile der API:
PoolExecutor: Das heißt, die API der beiden Executoren
Konstruktor: Der Hauptparameter ist max_workers, der verwendet wird, um die Thread-Pool-Größe (oder die Anzahl der Worker) anzugeben.
submit(fn, *args, **kwargs): Senden Sie die Task-Funktion fn an den Executor, args und kwargs sind fn erforderliche Parameter.
Gibt eine Zukunft zum Erhalten von Ergebnissen zurück
map(func, *iterables, timeout=None, chunksize=1): Wenn die Aufgabe dieselbe ist und nur die Parameter unterschiedlich sind, können Sie verwenden diese Methode statt einreichen. Jedes Element von Iterables entspricht einem Satz von Parametern von func.
Gibt einen Iterator von Futures zurück
shutdown(wait=True): Fahren Sie den Executor herunter, normalerweise mit dem Manager, um ihn automatisch herunterzufahren.
Future: Nachdem die Aufgabe an den Ausführenden übermittelt wurde, wird eine Zukunft zurückgegeben.
future.result(timout=None): Die am häufigsten verwendete Methode gibt das Ergebnis der Aufgabe zurück. Wenn die Aufgabe noch nicht beendet ist, wartet diese Methode ewig!
timeout gibt das Timeout an. Wenn es „None“ ist, gibt es kein Timeout-Limit.
Exception(timeout=None): Gibt die von der Aufgabe ausgelöste Ausnahme an. Wie result() wartet es auch auf das Ende der Aufgabe.
cancel(): Diese Aufgabe abbrechen
add_done_callback(fn): Nachdem die Zukunft abgeschlossen ist, wird fn(future) ausgeführt.
running(): ob es läuft
done(): ob die Zukunft beendet ist, boolean
...Einzelheiten finden Sie in der offiziellen Dokumentation
Modul Mit Dienstprogrammfunktion
concurrent.futures.as_completed(fs, timeout=None): Warten Sie, bis die Zukunft in fs (futures iterable) abgeschlossen ist
Einmal eine Zukunft in fs ist abgeschlossen, dies Die Funktion gibt die Zukunft sofort zurück.
Diese Methode sorgt dafür, dass die Zukunft, die jedes Mal zurückgegeben wird, immer die erste ist, die abgeschlossen wird. Anstatt zuerst auf Aufgabe 1 und dann auf Aufgabe 2 zu warten...
Verwenden Sie diese Funktion normalerweise bis für die Zukunft in as_completed(fs):.
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED): Warten, bis etwas passiert, das durch return_when angegeben wird, oder Timeout
return_when hat drei Optionen: ALL_COMPLETED (fs Alle Futures in fs abgeschlossen sind), FIRST__COMPLETED (jede Zukunft in fs ist abgeschlossen) und FIRST_EXCEPTION (eine Aufgabe löst eine Ausnahme aus)
Future Design Pattern
Das Merkmal von PoolExecutor ist hier, dass es das Future-Design verwendet Das Muster macht die Aufgabenausführung und Ergebniserfassung zu einem asynchronen Prozess.
Wir stellen die Aufgabe zunächst über „submit/map“ in die Aufgabenwarteschlange und dann beginnt die Ausführung der Aufgabe! Wenn wir es dann brauchen, erhalten wir das Ergebnis über Future oder direkt über add_done_callback(fn).
Die Ausführung dieser Aufgabe erfolgt in neuen Workern. Der Hauptprozess/Thread wird nicht blockiert, sodass der Hauptthread andere Dinge tun kann. Dieser Ansatz wird als asynchrone Programmierung bezeichnet.
Offscreen
concurrent.futures wird basierend auf multiprocessing.pool implementiert, ist also tatsächlich etwas langsamer als die direkte Verwendung des Thread-/Prozesspools. Es bietet jedoch eine bequemere und übersichtlichere API.
Das obige ist der detaillierte Inhalt vonEinführung in PoolExecutor in Python-Parallelität (mit Beispielen). Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!