ホームページ  >  記事  >  バックエンド開発  >  Pythonコンカレントフューチャーズ

Pythonコンカレントフューチャーズ

Linda Hamilton
Linda Hamiltonオリジナル
2024-11-04 04:13:29358ブラウズ

python concurrent.futures

未来

Future は、計算の結果または計算中に発生したエラーのいずれかを保持できるコンテナーです。 Future が作成されると、PENDING 状態で開始されます。ライブラリは、おそらくテスト目的を除いて、このオブジェクトが手動で作成されることを意図していません。

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

PENDING ステータスは、ユーザーによって要求された計算がスレッド プールに登録され、キューに配置されていますが、実行のためにまだどのスレッドにも取得されていないことを示します。空きスレッドがキューからタスク (コールバック) を取得すると、Future は RUNNING 状態に移行します。将来は PENDING 状態にある場合にのみキャンセルできます。したがって、PENDING 状態と RUNNING 状態の間には、要求された計算をキャンセルできる時間枠があります。

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

スレッド プールで要求された操作は、計算された値で完了するか、エラーが発生する可能性があります。結果に関係なく、フューチャーは FINISHED 状態に遷移します。結果またはエラーは、対応するフィールドに保存されます。

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

計算の結果を取得するには、result メソッドを使用します。計算がまだ完了していない場合、このメソッドは、計算が完了するか待機がタイムアウトになるまで、(結果が呼び出された) 現在のスレッドをブロックします。

計算がエラーなく正常に完了すると、result メソッドは計算された値を返します。

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_result('foo')
threading.Thread(target=target).start()
assert(f.result() == 'foo')

計算中に例外が発生した場合、result はその例外を発生させます。

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_exception(NameError)
threading.Thread(target=target).start()
try:
    f.result()
    raise Exception()
except NameError:
    assert(True)

待機中にメソッドがタイムアウトすると、TimeoutError が発生します。

import concurrent.futures as futures

f = futures.Future()
try:
    f.result(1)
    raise Exception()
except TimeoutError:
    assert(f._result is None)
    assert(f._exception is None)

キャンセルされた計算の結果を取得しようとすると、CancelledError が発生します。

import concurrent.futures as futures

f = futures.Future()
assert(f.cancel())
try:
    f.result()
    raise Exception()
except futures.CancelledError:
    assert(True)

待ち戦略

開発プロセスでは、スレッド プールで N 回の計算を実行し、その完了を待つ必要があることがよくあります。これを実現するために、ライブラリは wait 関数を提供します。いくつかの待機戦略があります: FIRST_COMPLETED、FIRST_EXCEPTION、ALL_COMPLETED。

すべての待機戦略に共通するのは、wait メソッドに渡された先物がすでに完了している場合、選択された戦略に関係なく、渡された先物のコレクションが返されるということです。エラー、結果、キャンセルなど、どのように完了したかは関係ありません。

import concurrent.futures as futures

def test(return_when):
    f1, f2, f3 = futures.Future(), futures.Future(), futures.Future()
    f1.cancel()
    f1.set_running_or_notify_cancel() # required
    f2.set_result('foo')
    f3.set_exception(NameError)

    r = futures.wait([f1, f2, f3], return_when=return_when)
    assert(len(r.done) == 3)
    assert(len(r.not_done) == 0)

for return_when in [futures.ALL_COMPLETED, futures.FIRST_EXCEPTION, futures.FIRST_COMPLETED]:
    test(return_when)

ALL_COMPLETED 戦略

ALL_COMPLETED 戦略は、渡されたすべての Future の完了を待つこと、またはその時点までに完了した Future のコレクション (不完全な可能性があります) でタイムアウト後に終了することを保証します。

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

FIRST_COMPLETED

FIRST_COMPLETED 戦略は、タイムアウトの場合に、少なくとも 1 つの完了したフューチャーを含むコレクション、または空のコレクションが返されることを保証します。 この戦略は、返されるコレクションに複数の要素を含めることができないことを意味するものではありません

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

FIRST_EXCEPTION

FIRST_EXCEPTION 戦略は、計算の 1 つがエラーで終了した場合に待機を中断します。例外が発生しない場合、動作は ALL_COMPLETED フューチャーと同じです。

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

スレッドプールエグゼキュータ

オブジェクトはスレッド プールの作成を担当します。このオブジェクトと対話するための主なメソッドは、Submit メソッドです。スレッドプールに計算を登録できます。応答として、Future オブジェクトが返されます。これは、計算のステータスを監視し、最終結果を取得するために使用されます。

プロパティ

  • 新しいスレッドは必要な場合にのみ作成されます。
    • 計算が要求されたときに少なくとも 1 つの空きスレッドがある場合、新しいスレッドは作成されません
    • 計算がリクエストされたときに空きスレッドがない場合は、maxWorkers 制限に達していない限り、新しいスレッドが作成されます。
    • 空きスレッドがなく、maxWorkers の制限に達した場合、計算はキューに入れられ、次に利用可能なスレッドが処理します
  • デフォルトでは、計算ニーズに割り当てられるスレッドの最大数は、論理プロセッサ コアの数と等しくなります
  • スレッドは一度作成されると、負荷が低い場合でも破棄されません

以上がPythonコンカレントフューチャーズの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。