この記事では、Python 同時実行 PoolExecutor について (例とともに) 紹介します。これには一定の参考値があります。必要な友人は参照してください。お役に立てば幸いです。
マルチスレッドとマルチプロセッシングを使用して、従来の同時実行要件を満たします。起動時に開始や結合などの手順を省略することはできません。複雑な要件の場合は、1 ~ 2 つのキューが必要です。
要件がますます複雑になり、優れた設計と機能レベルの抽象化がなければ、コードが増えるほどデバッグが難しくなります。
同時実行が必要だが、リアルタイム要件がそれほど高くないタスクの場合は、concurrent.futures パッケージの PoolExecutor クラスを使用して実装できます。
このパッケージは、スレッド プール エグゼキュータ ThreadPoolExecutor とプロセス プール エグゼキュータ ProcessPoolExecutor の 2 つのエグゼキュータを提供します。これら 2 つのエグゼキュータは同じ API を提供します。
プールの概念の主な目的は再利用であり、スレッドまたはプロセスをライフサイクル中に複数回使用できるようにすることです。スレッドとプロセスの作成のオーバーヘッドが軽減され、プログラムのパフォーマンスが向上します。再利用は必須のルールではありませんが、プログラマがアプリケーションでプールを使用する主な理由です。
プールには、max_workers で指定された固定数のスレッド/プロセスのみが含まれます。
タスクは executor.submit を通じてエグゼキュータのタスク キューに送信され、future オブジェクトが返されます。
Future は、一般的な同時実行設計パターンです。
A Future オブジェクトは、まだ準備ができていない (完了した) 結果を表します。この結果は、「将来」の特定の時点で準備が整った後に取得できます。
タスクはさまざまなワーカー間で実行されるようにスケジュールされます。
ただし、タスクが実行されると、実行が完了するまでワーカーは占有されることに注意してください。ワーカーが足りないと、他のタスクが待たされてしまいます。したがって、PoolExecutor はリアルタイム タスクには適していません。
import concurrent.futures import time from itertools import count number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] def evaluate_item(x): for i in count(x): # count 是无限迭代器,会一直递增。 print(f"{x} - {i}") time.sleep(0.01) if __name__ == "__main__": # 进程池 start_time_2 = time.time() # 使用 with 在离开此代码块时,自动调用 executor.shutdown(wait=true) 释放 executor 资源 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # 将 10 个任务提交给 executor,并收集 futures futures = [executor.submit(evaluate_item, item) for item in number_list] # as_completed 方法等待 futures 中的 future 完成 # 一旦某个 future 完成,as_completed 就立即返回该 future # 这个方法,使每次返回的 future,总是最先完成的 future # 而不是先等待任务 1,再等待任务 2... for future in concurrent.futures.as_completed(futures): print(future.result()) print ("Thread pool execution in " + str(time.time() - start_time_2), "seconds")
上記のコードでは、項目 1 2 3 4 5 を含む 5 つのタスクは常にすべてのワーカーを占有しますが、5 つのタスク 6 7 8 9 10 は永久に待機します。 ! !
API の詳細な説明
concurrent.futures には、API の 3 つの部分が含まれています。
PoolExecutor: つまり、2 つのエグゼキュータの API
Constructor:主なパラメータは max_workers で、スレッド プール サイズ (またはワーカーの数) を指定するために使用されます。
submit(fn, *args, **kwargs): タスク関数 fn をエグゼキュータ args に送信します。 kwargs は必須パラメータです。
結果を取得するための Future を返す
map(func, *iterables, timeout=None, chunksize=1): タスクが同じでパラメーターのみが異なる場合は、次のように使用できます。送信の代わりにこのメソッドを使用します。 iterables の各要素は、func のパラメーターのセットに対応します。
Futures のイテレータを返す
shutdown(wait=True): エグゼキュータをシャットダウンします。通常、自動的にシャットダウンするには with マネージャが使用されます。
Future: タスクが実行者に送信されると、Future が返されます。
future.result(timout=None): 最も一般的に使用されるメソッドは、タスクの結果を返します。タスクがまだ終了していない場合、このメソッドは永久に待機します。
timeout はタイムアウト期間を指定します。None の場合、タイムアウト制限はありません。
Exception(timeout=None): タスクによってスローされた例外を示します。 result() と同様に、タスクが終了するまで待機します。
cancel(): このタスクをキャンセルします
add_done_callback(fn): future が完了すると、fn(future) が実行されます。
running(): 実行中かどうか
done(): 未来が終了したかどうか、ブール値
...詳細については、公式ドキュメントを参照してください
実用的な関数を備えたモジュール
concurrent.futures.as_completed(fs, timeout=None): fs (futures iterable) の Future が完了するまで待ちます
Once a future in fs が完了すると、この関数はすぐに未来を返します。
このメソッドでは、毎回返される Future が常に最初に完了するようになります。最初にタスク 1 を待ってからタスク 2 を待つ代わりに...
通常は、この関数を as_completed(fs): の将来のために使用します。
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED): return_when で指定された何かが発生するまで待つか、timeout
return_when には 3 つのオプションがあります: ALL_COMPLETED (fs すべての futures in fs が完了している)、FIRST__COMPLETED (fs 内のすべての future が完了している)、FIRST_EXCEPTION (タスクが例外をスローする)
Future Design Pattern
ここでの PoolExecutor の特徴は、The Future デザインを使用していることです。 pattern はタスクの実行と結果の取得を非同期処理にします。
まず、submit/map を通じてタスクをタスク キューに入れました。その後、タスクの実行が開始されました。その後、必要なときに、future または add_done_callback(fn) を介して直接結果を取得します。
ここでのタスクの実行は新しいワーカーで行われます。メイン プロセス/スレッドはブロックされないため、メイン スレッドは他のことを行うことができます。このアプローチは非同期プログラミングと呼ばれます。
Offscreen
concurrent.futures は multiprocessing.pool に基づいて実装されているため、実際にはスレッド/プロセス プールを直接使用するよりも少し遅くなります。ただし、より便利で簡潔な API が提供されます。
以上がPython 同時実行における PoolExecutor の概要 (例付き)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。