ホームページ >バックエンド開発 >Python チュートリアル >PythonのThreadPoolExecutorのスレッドプール問題を解決する方法
Python
にはすでに threading
モジュールがあるのに、なぜスレッド プールが必要なのでしょうか?プールですか?
クローラーを例にとると、同時にクロールされるスレッドの数を制御する必要があります。この例では、20 個のスレッドが作成され、同時に実行できるスレッドは 3 つだけですが、 20 個のスレッドすべてを作成して破棄する必要があります。作成にはシステム リソースが必要です。より良い解決策はありますか?
実際には、必要なスレッドは 3 つだけです。各スレッドにはタスクが割り当てられ、残りのタスクはキューに入れられて待機します。スレッドがタスクを完了すると、キューに入れられたタスクをこのスレッドが実行を継続できるように配置できます。 . .
これはスレッド プールの考え方です (もちろんそれほど単純ではありません) が、スレッド プールを自分で完璧に書くのは難しく、複雑な状況ではスレッドの同期も考慮する必要があります。デッドロックが発生しやすくなります。
Python3.2
以降、標準ライブラリは concurrent.futures
モジュールを提供します。これは、ThreadPoolExecutor
と ProcessPoolExecutor を提供します。
この 2 つのクラスは、threading
と multiprocessing
(ここでの主な焦点はスレッド プールです) のさらなる抽象化を実現します。これらは、スレッドの自動スケジュールを支援するだけでなく、次のことも実行します。
メインスレッドは、特定のスレッド(またはタスク)のステータスと戻り値を取得できます。
スレッドが完了すると、メインスレッドはすぐにそれを知ることができます。
マルチスレッドおよびマルチプロセスのコーディングインターフェイスを一貫させます。
from concurrent.futures import ThreadPoolExecutor import time # 参数times用来模拟网络请求的时间 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞 task1 = executor.submit(get_html, (3)) task2 = executor.submit(get_html, (2)) # done方法用于判定某个任务是否完成 print(task1.done()) # cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功 print(task2.cancel()) time.sleep(4) print(task1.done()) # result方法可以获取task的执行结果 print(task1.result()) # 执行结果 # False # 表明task1未执行完成 # False # 表明task2取消失败,因为已经放入了线程池中 # get page 2s finished # get page 3s finished # True # 由于在get page 3s finished之后才打印,所以此时task1必然完成了 # 3 # 得到task1的任务返回值
ThreadPoolExecutor がインスタンスを構築するときに、max_workers パラメーターを渡して、同時に実行できるスレッドの最大数を設定します。スレッドプール内。
submit 関数を使用して、スレッドが実行する必要があるタスク (関数名とパラメーター) をスレッド プールに送信し、タスクのハンドル (ファイルや図面と同様) を返します。 ) はブロックしていませんが、すぐに戻ります。
submit 関数によって返されたタスク ハンドルを通じて、done() メソッドを使用してタスクが終了したかどうかを判断できます。上記の例からわかるように、タスクには 2 秒の遅延があるため、タスク 1 がサブミットされた直後はタスク 1 が完了していないと判断されますが、4 秒の遅延後にタスク 1 は完了したと判断されます。
送信されたタスクをキャンセルするには、cancel() メソッドを使用します。タスクがスレッド プールですでに実行されている場合、キャンセルすることはできません。この例では、スレッド プール サイズが 2 に設定されており、タスクがすでに実行されているため、キャンセルは失敗します。スレッド プールのサイズを 1 に変更すると、タスク 1 が最初にサブミットされ、タスク 2 はキュー内で待機したままになりますが、この時点では正常にキャンセルできます。
result() メソッドを使用してタスクの戻り値を取得します。内部コードを確認すると、このメソッドがブロックしていることがわかりました。
上記ではタスクが完了したかどうかを判断する方法が提供されていますが、メインスレッドで常に判断できるとは限りません。
特定のタスクが終了したことがわかっているとき、各タスクが終了したかどうかを常に判断するのではなく、結果を取得することがあります。
これは、as_completed
メソッドを使用して、すべてのタスクの結果を一度に取得する方法です。
from concurrent.futures import ThreadPoolExecutor, as_completed import time # 参数times用来模拟网络请求的时间 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] # 并不是真的url all_task = [executor.submit(get_html, (url)) for url in urls] for future in as_completed(all_task): data = future.result() print("in main: get page {}s success".format(data)) # 执行结果 # get page 2s finished # in main: get page 2s success # get page 3s finished # in main: get page 3s success # get page 4s finished # in main: get page 4s success
as_completed()
このメソッドはジェネレーターです。タスクが完了していない場合はブロックされます。特定のタスクが完了すると、yield
がタスクを実行します。 for ループの下のステートメントを実行し、すべてのタスクが完了するまでブロックを続けることができます。
結果から、最初に完了したタスク が最初にメインスレッド に通知することもわかります。
上記の as_completed
メソッドに加えて、executor.map
メソッドも使用できますが、少し異なります。
from concurrent.futures import ThreadPoolExecutor import time # 参数times用来模拟网络请求的时间 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] # 并不是真的url for data in executor.map(get_html, urls): print("in main: get page {}s success".format(data)) # 执行结果 # get page 2s finished # get page 3s finished # in main: get page 3s success # in main: get page 2s success # get page 4s finished # in main: get page 4s success
事前に submit
メソッドを使用せず、map
メソッドを使用します。map
メソッドは python## と同じです。 # 標準ライブラリの ##map
意味は同じで、シーケンス内の各要素は同じ関数で実行されます。 上記のコードは、
の各要素に対して get_html
関数を実行し、各スレッド プールを割り当てます。実行結果は上記の as_completed
メソッドの結果とは異なります。出力順序は
urls## の順序と同じです。 #list (2 秒の場合でも) タスクが最初に実行されて完了した場合は、3 秒のタスクが最初に出力され、次に 2 秒のタスクが出力されます。 wait
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED import time # 参数times用来模拟网络请求的时间 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] # 并不是真的url all_task = [executor.submit(get_html, (url)) for url in urls] wait(all_task, return_when=ALL_COMPLETED) print("main") # 执行结果 # get page 2s finished # get page 3s finished # get page 4s finished # main
wait
メソッドは、待機中のタスク シーケンス、タイムアウト時間、待機条件の 3 つのパラメーターを受け取ります。
待機条件return_when
デフォルトは
で、すべてのタスクが終了するまで待機することを示します。 実行結果を見ると、すべてのタスクが実際に完了しており、メインスレッドが
main
を出力していることがわかります。
待機条件を FIRST_COMPLETED
に設定することもできます。これは、最初のタスクが完了すると待機が停止することを意味します。
ソースコード分析
は将来のオブジェクトを意味し、 オブジェクトとして理解できます。将来的には、非同期プログラミングの基礎となる操作
が完了しました。 <p>スレッド プール <code>submit()
の後、この future
オブジェクトが返されます。返された時点ではタスクは完了していませんが、将来的には完了します。
はタスクの戻りコンテナとも呼ばれ、タスクの結果とステータスを保存します。
その後ThreadPoolExecutor
このオブジェクトは内部的にどのように動作するのでしょうか?
以下は、ThreadPoolExecutor
のコードの一部の簡単な紹介です:
init
このメソッドで最も重要なことは、タスク キューとスレッド コレクションが他のメソッドで必要になることです。
submit
、_base.Future()## という 2 つの重要なオブジェクトがあります。 #そして
_WorkItem() オブジェクト、
_WorkItem() オブジェクトはタスクの実行と
future オブジェクトの設定、そして最後に
future## の設定を担当します。 # オブジェクトは Return になります。プロセス全体がブロックせずにすぐに戻ることがわかります。
3.adjust_thread_count メソッド
#4._WorkItem オブジェクト
self.future.set_result(result) です。
5. スレッド実行関数 -- _worker
これは、スレッド プールが主にキューからスレッドを作成するときに指定される関数エントリです。順番に以上がPythonのThreadPoolExecutorのスレッドプール問題を解決する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。