ホームページ  >  記事  >  バックエンド開発  >  PythonのThreadPoolExecutorのスレッドプール問題を解決する方法

PythonのThreadPoolExecutorのスレッドプール問題を解決する方法

WBOY
WBOY転載
2023-04-28 22:40:201519ブラウズ

    コンセプト

    Python にはすでに threading モジュールがあるのに、なぜスレッド プールが必要なのでしょうか?プールですか?

    クローラーを例にとると、同時にクロールされるスレッドの数を制御する必要があります。この例では、20 個のスレッドが作成され、同時に実行できるスレッドは 3 つだけですが、 20 個のスレッドすべてを作成して破棄する必要があります。作成にはシステム リソースが必要です。より良い解決策はありますか?

    実際には、必要なスレッドは 3 つだけです。各スレッドにはタスクが割り当てられ、残りのタスクはキューに入れられて待機します。スレッドがタスクを完了すると、キューに入れられたタスクをこのスレッドが実行を継続できるように配置できます。 . .

    これはスレッド プールの考え方です (もちろんそれほど単純ではありません) が、スレッド プールを自分で完璧に書くのは難しく、複雑な状況ではスレッドの同期も考慮する必要があります。デッドロックが発生しやすくなります。

    Python3.2 以降、標準ライブラリは concurrent.futures モジュールを提供します。これは、ThreadPoolExecutorProcessPoolExecutor を提供します。 この 2 つのクラスは、threadingmultiprocessing (ここでの主な焦点はスレッド プールです) のさらなる抽象化を実現します。これらは、スレッドの自動スケジュールを支援するだけでなく、次のことも実行します。

    • メインスレッドは、特定のスレッド(またはタスク)のステータスと戻り値を取得できます。

    • スレッドが完了すると、メインスレッドはすぐにそれを知ることができます。

    • マルチスレッドおよびマルチプロセスのコーディングインターフェイスを一貫させます。

    簡単な使用法

    from concurrent.futures import ThreadPoolExecutor
    import time
     
    # 参数times用来模拟网络请求的时间
    def get_html(times):
        time.sleep(times)
        print("get page {}s finished".format(times))
        return times
     
    executor = ThreadPoolExecutor(max_workers=2)
    # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
    task1 = executor.submit(get_html, (3))
    task2 = executor.submit(get_html, (2))
    # done方法用于判定某个任务是否完成
    print(task1.done())
    # cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功
    print(task2.cancel())
    time.sleep(4)
    print(task1.done())
    # result方法可以获取task的执行结果
    print(task1.result())
     
    # 执行结果
    # False  # 表明task1未执行完成
    # False  # 表明task2取消失败,因为已经放入了线程池中
    # get page 2s finished
    # get page 3s finished
    # True  # 由于在get page 3s finished之后才打印,所以此时task1必然完成了
    # 3     # 得到task1的任务返回值

    ThreadPoolExecutor がインスタンスを構築するときに、max_workers パラメーターを渡して、同時に実行できるスレッドの最大数を設定します。スレッドプール内。

    submit 関数を使用して、スレッドが実行する必要があるタスク (関数名とパラメーター) をスレッド プールに送信し、タスクのハンドル (ファイルや図面と同様) を返します。 ) はブロックしていませんが、すぐに戻ります。

    submit 関数によって返されたタスク ハンドルを通じて、done() メソッドを使用してタスクが終了したかどうかを判断できます。上記の例からわかるように、タスクには 2 秒の遅延があるため、タスク 1 がサブミットされた直後はタスク 1 が完了していないと判断されますが、4 秒の遅延後にタスク 1 は完了したと判断されます。

    送信されたタスクをキャンセルするには、cancel() メソッドを使用します。タスクがスレッド プールですでに実行されている場合、キャンセルすることはできません。この例では、スレッド プール サイズが 2 に設定されており、タスクがすでに実行されているため、キャンセルは失敗します。スレッド プールのサイズを 1 に変更すると、タスク 1 が最初にサブミットされ、タスク 2 はキュー内で待機したままになりますが、この時点では正常にキャンセルできます。

    result() メソッドを使用してタスクの戻り値を取得します。内部コードを確認すると、このメソッドがブロックしていることがわかりました。

    as_completed

    上記ではタスクが完了したかどうかを判断する方法が提供されていますが、メインスレッドで常に判断できるとは限りません。

    特定のタスクが終了したことがわかっているとき、各タスクが終了したかどうかを常に判断するのではなく、結果を取得することがあります。

    これは、as_completed メソッドを使用して、すべてのタスクの結果を一度に取得する方法です。

    from concurrent.futures import ThreadPoolExecutor, as_completed
    import time
     
    # 参数times用来模拟网络请求的时间
    def get_html(times):
        time.sleep(times)
        print("get page {}s finished".format(times))
        return times
     
    executor = ThreadPoolExecutor(max_workers=2)
    urls = [3, 2, 4] # 并不是真的url
    all_task = [executor.submit(get_html, (url)) for url in urls]
     
    for future in as_completed(all_task):
        data = future.result()
        print("in main: get page {}s success".format(data))
     
    # 执行结果
    # get page 2s finished
    # in main: get page 2s success
    # get page 3s finished
    # in main: get page 3s success
    # get page 4s finished
    # in main: get page 4s success

    as_completed()このメソッドはジェネレーターです。タスクが完了していない場合はブロックされます。特定のタスクが完了すると、yield がタスクを実行します。 for ループの下のステートメントを実行し、すべてのタスクが完了するまでブロックを続けることができます。

    結果から、最初に完了したタスク が最初にメインスレッド に通知することもわかります。

    map

    上記の as_completed メソッドに加えて、executor.map メソッドも使用できますが、少し異なります。

    from concurrent.futures import ThreadPoolExecutor
    import time
     
    # 参数times用来模拟网络请求的时间
    def get_html(times):
        time.sleep(times)
        print("get page {}s finished".format(times))
        return times
     
    executor = ThreadPoolExecutor(max_workers=2)
    urls = [3, 2, 4] # 并不是真的url
     
    for data in executor.map(get_html, urls):
        print("in main: get page {}s success".format(data))
    # 执行结果
    # get page 2s finished
    # get page 3s finished
    # in main: get page 3s success
    # in main: get page 2s success
    # get page 4s finished
    # in main: get page 4s success

    事前に submit メソッドを使用せず、map メソッドを使用します。map メソッドは python## と同じです。 # 標準ライブラリの ##map意味は同じで、シーケンス内の各要素は同じ関数で実行されます。 上記のコードは、

    urls

    の各要素に対して get_html 関数を実行し、各スレッド プールを割り当てます。実行結果は上記の as_completed メソッドの結果とは異なります。出力順序は urls## の順序と同じです。 #list (2 秒の場合でも) タスクが最初に実行されて完了した場合は、3 秒のタスクが最初に出力され、次に 2 秒のタスクが出力されます。 wait

    wait

    メソッドを使用すると、設定された要件が満たされるまでメインスレッドをブロックできます。

    from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
    import time
     
    # 参数times用来模拟网络请求的时间
    def get_html(times):
        time.sleep(times)
        print("get page {}s finished".format(times))
        return times
     
    executor = ThreadPoolExecutor(max_workers=2)
    urls = [3, 2, 4] # 并不是真的url
    all_task = [executor.submit(get_html, (url)) for url in urls]
    wait(all_task, return_when=ALL_COMPLETED)
    print("main")
    # 执行结果 
    # get page 2s finished
    # get page 3s finished
    # get page 4s finished
    # main

    waitメソッドは、待機中のタスク シーケンス、タイムアウト時間、待機条件の 3 つのパラメーターを受け取ります。

    待機条件return_whenデフォルトは

    ALL_COMPLETED

    で、すべてのタスクが終了するまで待機することを示します。 実行結果を見ると、すべてのタスクが実際に完了しており、メインスレッドが main を出力していることがわかります。

    待機条件を FIRST_COMPLETED に設定することもできます。これは、最初のタスクが完了すると待機が停止することを意味します。

    ソースコード分析

    cocurrent.future

    モジュール内の

    future

    は将来のオブジェクトを意味し、 オブジェクトとして理解できます。将来的には、非同期プログラミングの基礎となる操作 が完了しました。 <p>スレッド プール <code>submit() の後、この future オブジェクトが返されます。返された時点ではタスクは完了していませんが、将来的には完了します。

    はタスクの戻りコンテナとも呼ばれ、タスクの結果とステータスを保存します。

    その後ThreadPoolExecutorこのオブジェクトは内部的にどのように動作するのでしょうか?

    以下は、ThreadPoolExecutor のコードの一部の簡単な紹介です:

    1.init メソッド

    initこのメソッドで最も重要なことは、タスク キューとスレッド コレクションが他のメソッドで必要になることです。

    PythonのThreadPoolExecutorのスレッドプール問題を解決する方法

    2. 送信メソッドには、

    submit_base.Future()## という 2 つの重要なオブジェクトがあります。 #そして _WorkItem() オブジェクト、_WorkItem() オブジェクトはタスクの実行と future オブジェクトの設定、そして最後に future## の設定を担当します。 # オブジェクトは Return になります。プロセス全体がブロックせずにすぐに戻ることがわかります。

    PythonのThreadPoolExecutorのスレッドプール問題を解決する方法3.adjust_thread_count メソッド

    このメソッドの意味は理解しやすく、主に指定された数のスレッドを作成します。ただし、実装は少しわかりにくく、例えば、スレッド実行関数のweakref.refには、後述する弱参照などの概念が含まれています。

    #4._WorkItem オブジェクトPythonのThreadPoolExecutorのスレッドプール問題を解決する方法

    _WorkItem

    オブジェクトの役割は、タスクを実行し、結果を設定することです。ここでの主な複雑さは、

    self.future.set_result(result) です。

    5. スレッド実行関数 -- _workerPythonのThreadPoolExecutorのスレッドプール問題を解決する方法

    これは、スレッド プールが主にキューからスレッドを作成するときに指定される関数エントリです。順番に

    task

    を取り出して実行しますが、関数の最初のパラメータがまだよくわかりません。それは後回しにしておきます。

    以上がPythonのThreadPoolExecutorのスレッドプール問題を解決する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。