首頁 >後端開發 >Python教學 >python 並發.futures

python 並發.futures

Linda Hamilton
Linda Hamilton原創
2024-11-04 04:13:29458瀏覽

python concurrent.futures

未來

Future 是一個容器,可以保存計算結果或計算期間發生的錯誤。建立 future 時,它以 PENDING 狀態開始。該庫不打算手動建立此對象,除非出於測試目的。

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

PENDING 狀態表示使用者請求的計算已註冊到執行緒池中並放入佇列中,但尚未被任何執行緒拾取執行。一旦空閒執行緒從佇列中取得任務(回呼),Future 就會轉換為 RUNNING 狀態。 Future 只能在處於 PENDING 狀態時被取消。因此,在 PENDING 和 RUNNING 狀態之間存在一個時間窗口,在此期間可以取消請求的計算。

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

執行緒池中請求的操作可以完成計算值或導致錯誤。無論結果如何,未來都會過渡到 FINISHED 狀態。然後結果或錯誤將儲存在相應的欄位中。

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

要檢索計算結果,請使用 result 方法。如果計算尚未完成,此方法將阻塞目前執行緒(從中呼叫結果),直到計算完成或等待逾時。

如果計算成功完成且沒有錯誤,則 result 方法會傳回計算值。

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_result('foo')
threading.Thread(target=target).start()
assert(f.result() == 'foo')

如果計算過程中發生異常,結果將引發此異常。

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_exception(NameError)
threading.Thread(target=target).start()
try:
    f.result()
    raise Exception()
except NameError:
    assert(True)

如果方法在等待時逾時,則會引發 TimeoutError。

import concurrent.futures as futures

f = futures.Future()
try:
    f.result(1)
    raise Exception()
except TimeoutError:
    assert(f._result is None)
    assert(f._exception is None)

嘗試取得已取消的計算結果將引發 CancelledError。

import concurrent.futures as futures

f = futures.Future()
assert(f.cancel())
try:
    f.result()
    raise Exception()
except futures.CancelledError:
    assert(True)

等待策略

在開發過程中,需要在執行緒池上執行N次運算並等待其完成是很常見的。為了實現這一點,該庫提供了等待函數。有幾個等待策略:FIRST_COMPLETED、FIRST_EXCEPTION、ALL_COMPLETED。

所有等待策略的共同點是,如果傳遞給 wait 方法的 future 已經完成,則無論選擇何種策略,都會傳回傳遞的 future 的集合。無論它們是如何完成的,無論是有錯誤、結果還是被取消,都無關緊要。

import concurrent.futures as futures

def test(return_when):
    f1, f2, f3 = futures.Future(), futures.Future(), futures.Future()
    f1.cancel()
    f1.set_running_or_notify_cancel() # required
    f2.set_result('foo')
    f3.set_exception(NameError)

    r = futures.wait([f1, f2, f3], return_when=return_when)
    assert(len(r.done) == 3)
    assert(len(r.not_done) == 0)

for return_when in [futures.ALL_COMPLETED, futures.FIRST_EXCEPTION, futures.FIRST_COMPLETED]:
    test(return_when)

ALL_COMPLETED 策略

ALL_COMPLETED 策略保證等待所有傳遞的 future 完成,或在逾時後退出,並收集截至該時刻完成的 future,這可能是不完整的。

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

FIRST_COMPLETED

FIRST_COMPLETED 策略保證會傳回至少有一個已完成的 future 的集合,或在逾時的情況下傳回空集合。 此策略並不表示傳回的集合不能包含多個元素

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

FIRST_EXCEPTION

如果其中一個計算完成時發生錯誤,FIRST_EXCEPTION 策略會中斷等待。如果沒有發生異常,行為與 ALL_COMPLETED 未來相同。

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

執行緒池執行器

該物件負責建立執行緒池。與該物件互動的主要方法是 Submit 方法。它允許在線程池中註冊計算。作為回應,傳回一個 Future 對象,用於監控計算狀態並取得最終結果。

屬性

  • 僅根據需要建立新執行緒:
    • 如果請求計算時至少有一個空閒線程,則不會建立新線程
    • 如果請求計算時沒有空閒線程,則在未達到 maxWorkers 限制的情況下建立一個新線程。
    • 如果沒有空閒線程並且已達到 maxWorkers 限制,則計算將放入隊列中,並由下一個可用線程進行
  • 預設為運算需求分配的最大執行緒數等於邏輯處理器核心數
  • 線程一旦創建,即使在低負載的情況下也不會被銷毀

以上是python 並發.futures的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn