Heim  >  Artikel  >  Backend-Entwicklung  >  Python concurrent.futures

Python concurrent.futures

Linda Hamilton
Linda HamiltonOriginal
2024-11-04 04:13:29358Durchsuche

python concurrent.futures

Zukunft

Future ist ein Container, der entweder das Ergebnis einer Berechnung oder einen Fehler, der während dieser Berechnung aufgetreten ist, enthalten kann. Wenn eine Zukunft erstellt wird, beginnt sie im Status PENDING. Die Bibliothek beabsichtigt nicht, dieses Objekt manuell zu erstellen, außer vielleicht zu Testzwecken.

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

Der Status PENDING zeigt an, dass eine vom Benutzer angeforderte Berechnung im Thread-Pool registriert und in eine Warteschlange gestellt wurde, aber noch von keinem Thread zur Ausführung übernommen wurde. Sobald ein freier Thread die Aufgabe (Callback) aus der Warteschlange übernimmt, wechselt der Future in den RUNNING-Status. Ein Future kann nur storniert werden, solange er sich im Status PENDING befindet. Daher gibt es zwischen den Zuständen PENDING und RUNNING ein Zeitfenster, in dem die angeforderte Berechnung abgebrochen werden kann.

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

Eine angeforderte Operation im Thread-Pool kann entweder mit einem berechneten Wert abgeschlossen werden oder zu einem Fehler führen. Unabhängig vom Ergebnis geht die Zukunft in den Zustand FERTIG über. Das Ergebnis bzw. der Fehler wird dann in den entsprechenden Feldern gespeichert.

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

Um das Ergebnis einer Berechnung abzurufen, wird die Ergebnismethode verwendet. Wenn die Berechnung noch nicht abgeschlossen ist, blockiert diese Methode den aktuellen Thread (von dem Ergebnis aufgerufen wurde), bis die Berechnung abgeschlossen ist oder die Wartezeit abgelaufen ist.

Wenn die Berechnung erfolgreich und ohne Fehler abgeschlossen wird, gibt die Ergebnismethode den berechneten Wert zurück.

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_result('foo')
threading.Thread(target=target).start()
assert(f.result() == 'foo')

Wenn während der Berechnung eine Ausnahme aufgetreten ist, wird das Ergebnis diese Ausnahme auslösen.

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_exception(NameError)
threading.Thread(target=target).start()
try:
    f.result()
    raise Exception()
except NameError:
    assert(True)

Wenn die Methode während des Wartens abläuft, wird ein TimeoutError ausgelöst.

import concurrent.futures as futures

f = futures.Future()
try:
    f.result(1)
    raise Exception()
except TimeoutError:
    assert(f._result is None)
    assert(f._exception is None)

Der Versuch, das Ergebnis einer abgebrochenen Berechnung abzurufen, löst einen CancelledError aus.

import concurrent.futures as futures

f = futures.Future()
assert(f.cancel())
try:
    f.result()
    raise Exception()
except futures.CancelledError:
    assert(True)

Wartestrategie

Im Entwicklungsprozess ist es durchaus üblich, N Berechnungen in einem Thread-Pool auszuführen und auf deren Abschluss zu warten. Um dies zu erreichen, stellt die Bibliothek eine Wartefunktion bereit. Es gibt mehrere Wartestrategien: FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED.

Allen Wartestrategien ist gemeinsam, dass, wenn die an die Wartemethode übergebenen Futures bereits abgeschlossen sind, die Sammlung der übergebenen Futures unabhängig von der gewählten Strategie zurückgegeben wird. Es spielt keine Rolle, wie sie abgeschlossen wurden, ob mit einem Fehler, einem Ergebnis oder ob sie abgebrochen wurden.

import concurrent.futures as futures

def test(return_when):
    f1, f2, f3 = futures.Future(), futures.Future(), futures.Future()
    f1.cancel()
    f1.set_running_or_notify_cancel() # required
    f2.set_result('foo')
    f3.set_exception(NameError)

    r = futures.wait([f1, f2, f3], return_when=return_when)
    assert(len(r.done) == 3)
    assert(len(r.not_done) == 0)

for return_when in [futures.ALL_COMPLETED, futures.FIRST_EXCEPTION, futures.FIRST_COMPLETED]:
    test(return_when)

ALL_COMPLETED-Strategie

Die ALL_COMPLETED-Strategie garantiert das Warten auf den Abschluss aller übergebenen Futures oder das Beenden nach einer Zeitüberschreitung mit einer Sammlung der bis zu diesem Zeitpunkt abgeschlossenen Futures, die möglicherweise unvollständig ist.

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

FIRST_COMPLETED

Die FIRST_COMPLETED-Strategie garantiert die Rückgabe einer Sammlung mit mindestens einem abgeschlossenen Future oder einer leeren Sammlung im Falle eines Timeouts. Diese Strategie bedeutet NICHT, dass die zurückgegebene Sammlung nicht mehrere Elemente enthalten kann.

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

FIRST_EXCEPTION

Die FIRST_EXCEPTION-Strategie unterbricht die Wartezeit, wenn eine der Berechnungen mit einem Fehler endet. Wenn keine Ausnahmen auftreten, ist das Verhalten identisch mit dem ALL_COMPLETED-Future.

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

ThreadPoolExecutor

Das Objekt ist für die Erstellung eines Thread-Pools verantwortlich. Die Hauptmethode für die Interaktion mit diesem Objekt ist die Submit-Methode. Es ermöglicht die Registrierung einer Berechnung im Thread-Pool. Als Antwort wird ein Future-Objekt zurückgegeben, das verwendet wird, um den Status der Berechnung zu überwachen und das Endergebnis zu erhalten.

Eigenschaften

  • Neue Threads werden NUR bei Bedarf erstellt:
    • Wenn zum Zeitpunkt der Anforderung einer Berechnung mindestens ein freier Thread vorhanden ist, wird kein neuer Thread erstellt
    • Wenn bei der Anforderung einer Berechnung keine freien Threads vorhanden sind, wird ein neuer Thread erstellt, sofern das maxWorkers-Limit nicht erreicht wurde.
    • Wenn keine freien Threads vorhanden sind und das maxWorkers-Limit erreicht wurde, wird die Berechnung in eine Warteschlange gestellt und vom nächsten verfügbaren Thread übernommen
  • Die maximale Anzahl der standardmäßig für Rechenanforderungen zugewiesenen Threads entspricht der Anzahl der logischen Prozessorkerne
  • Ein einmal erstellter Thread wird auch bei geringer Last nicht zerstört

Das obige ist der detaillierte Inhalt vonPython concurrent.futures. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn