ホームページ >よくある問題 >Python スレッド プールとその原理と用途

Python スレッド プールとその原理と用途

zbt
zbtオリジナル
2023-06-20 16:44:251730ブラウズ

システムによる新しいスレッドの開始にはオペレーティング システムとの対話が含まれるため、コストが比較的高くなります。この場合、スレッド プールを使用するとパフォーマンスが大幅に向上します。特にプログラムが有効期間の短い多数のスレッドを作成する必要がある場合は、スレッド プールの使用を検討する必要があります。スレッド プールは、システムの起動時に多数のアイドル スレッドを作成します。プログラムがスレッド プールに関数を送信している限り、スレッド プールはその関数を実行するためにアイドル スレッドを開始します。関数が実行されると、スレッドは終了しませんが、再びスレッド プールに戻り、次の関数の実行を待機してアイドル状態になります。

Python スレッド プールとその原理と用途

#システムによる新しいスレッドの開始にはオペレーティング システムとの対話が含まれるため、コストが比較的高くなります。この場合、スレッド プールを使用するとパフォーマンスが大幅に向上します。特にプログラムが有効期間の短い多数のスレッドを作成する必要がある場合は、スレッド プールの使用を検討する必要があります。

スレッド プールは、システムの起動時に多数のアイドル スレッドを作成します。プログラムがスレッド プールに関数を送信している限り、スレッド プールはその関数を実行するためにアイドル スレッドを開始します。関数の実行が終了しても、スレッドは終了せず、スレッド プールに戻ってアイドル状態になり、次の関数の実行を待ちます。

さらに、スレッド プールを使用すると、システム内の同時スレッドの数を効果的に制御できます。システムに多数の同時スレッドが含まれる場合、システムのパフォーマンスが急激に低下し、Python が失敗することもあります。 インタプリタはクラッシュし、スレッド プールの最大スレッド数パラメータにより、システム内の同時スレッド数がこの数を超えないように制御できます。

スレッド プールの使用

スレッド プールの基本クラスは、concurrent.futures モジュールの Executor です。Executor には 2 つのサブクラスがあります。 ThreadPoolExecutor と ProcessPoolExecutor。ThreadPoolExecutor はスレッド プールの作成に使用されます。 ProcessPoolExecutor は、プロセス プールを作成するために使用されます。

スレッド プール/プロセス プールを使用して同時プログラミングを管理する場合は、対応するタスク関数をスレッド プール/プロセス プールに送信するだけで、残りはスレッド プール/プロセス プールが処理します。

Exectuor には、次の共通メソッドが用意されています。

submit(fn, *args, **kwargs): fn 関数をスレッド プールに送信します。 *args は fn 関数に渡されるパラメータを表します。 *kwargs パラメータがキーワード引数の形式で fn 関数に渡されることを示します。

map(func, *iterables, timeout=None, chunksize=1): この関数はグローバル関数map(func, *iterables) ですが、この関数は複数のスレッドを開始して、反復可能オブジェクトのマップ処理を非同期で即座に実行します。

shutdown(wait=True): スレッド プールを閉じます。

プログラムがタスク関数をスレッド プールに送信した後、submit メソッドは Future オブジェクトを返します。 このクラスは主にスレッドタスク関数の戻り値を取得するために使用されます。スレッドタスクは新しいスレッドで非同期に実行されるため、スレッドによって実行される関数は「将来完了する」タスクに相当するため、Pythonは 表現する未来。

実は、Java のマルチスレッド プログラミングにも Future があり、ここでの Future は Java の Future に似ています。

Future には次のメソッドが用意されています。

cancel(): Future によって表されるスレッド タスクをキャンセルします。タスクが実行中でキャンセルできない場合、メソッドは False を返します。それ以外の場合、プログラムはタスクをキャンセルして戻ります。 真実。

canceled(): Future で表されるスレッド タスクが正常にキャンセルされたかどうかを返します。

running(): Future で表されるスレッド タスクが実行中であり、キャンセルできない場合、このメソッドは True を返します。

done(): Funture によって表されるスレッド タスクが正常にキャンセルまたは完了した場合、このメソッドは True を返します。

result(timeout=None): Future で表されるスレッド タスクによって返される最終結果を取得します。もし未来なら 示されているスレッド タスクが完了していない場合、このメソッドは現在のスレッドをブロックします。タイムアウト パラメーターはブロックする最大秒数を指定します。

Exception(timeout=None): Future で表されるスレッド タスクによって引き起こされた例外を取得します。タスクが例外なく正常に完了すると、メソッドは戻ります。 なし。

add_done_callback(fn): この Future で表されるスレッド タスクに「コールバック関数」を登録します。タスクが正常に完了すると、プログラムは自動的に fn をトリガーします。 関数。

スレッド プールを使い切った後、スレッド プールの shutdown() メソッドを呼び出す必要があります。これにより、スレッド プールのシャットダウン シーケンスが開始されます。 shutdown() を呼び出す メソッドの後のスレッド プールは新しいタスクを受信しなくなりますが、以前に送信されたすべてのタスクは完了します。スレッド プール内のすべてのタスクが実行されると、スレッド プール内のすべてのスレッドが終了します。

スレッド プールを使用してスレッド タスクを実行する手順は次のとおりです。

ThreadPoolExecutor クラスのコンストラクターを呼び出して、スレッド プールを作成します。

通常の関数をスレッドタスクとして定義します。

ThreadPoolExecutor オブジェクトの submit() メソッドを呼び出して、スレッド タスクを送信します。

タスクを送信したくない場合は、ThreadPoolExecutor オブジェクトの shutdown() メソッドを呼び出して、スレッド プールをシャットダウンします。

次のプログラムは、スレッド プールを使用してスレッド タスクを実行する方法を示しています。

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含2条线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
# 判断future1代表的任务是否结束
print(future1.done())
time.sleep(3)
# 判断future2代表的任务是否结束
print(future2.done())
# 查看future1代表的任务返回的结果
print(future1.result())
# 查看future2代表的任务返回的结果
print(future2.result())
# 关闭线程池
pool.shutdown()

上記のプログラムでは、コードの 13 行目で 2 つのスレッドを含むスレッド プールが作成されます。次の 2 行のコードでは、action() を追加するだけです。 関数はスレッド プールに送信され、スレッド プールは、action() 関数を実行するスレッドを開始する役割を果たします。スレッドを開始するこの方法は洗練されており、より効率的です。

プログラムが action() 関数をスレッド プールに送信すると、submit() メソッドはタスクに対応する Future オブジェクトを返し、プログラムは即座に futurel を決定します。 done() メソッドは False を返します (この時点ではタスクが完了していないことを示します)。次に、メイン プログラムは 3 秒間一時停止し、future2 の Done() を決定します。 この時点でタスクが完了している場合、このメソッドは True を返します。

プログラムは最終的に、Future の result() メソッドを通じて 2 つの非同期タスクによって返された結果を取得します。

読者はこのコードを自分で実行して結果を確認できますが、ここでは示しません。

プログラムが Future の result() メソッドを使用して結果を取得する場合、タイムアウトが指定されていない場合、このメソッドは現在のスレッドをブロックします。 パラメータを指定すると、Future で表されるタスクが返されるまで、現在のスレッドはブロックされたままになります。

実行結果の取得

前のプログラムは、スレッド タスクの戻り値を取得するために Future の result() メソッドを呼び出しましたが、このメソッドは現在のメイン スレッドをブロックし、次のまで待機するだけです。資金処理タスクが完了しました。 , result() メソッドのブロックが解除されます。

プログラムがスレッドをブロックするために result() メソッドを直接呼び出したくない場合は、Future の add_done_callback() を渡すことができます。 コールバック関数を追加する方法。コールバック関数は fn(future) のようになります。スレッドタスクが完了すると、プログラムは自動的にコールバック関数をトリガーし、対応する Future を転送します。 オブジェクトはパラメータとしてコールバック関数に渡されます。

次のプログラムは、add_done_callback() メソッドを使用して、スレッド タスクの戻り値を取得します:

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含2条线程的线程池
with ThreadPoolExecutor(max_workers=2) as pool:
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
def get_result(future):
print(future.result())
# 为future1添加线程完成的回调函数
future1.add_done_callback(get_result)
# 为future2添加线程完成的回调函数
future2.add_done_callback(get_result)
print('--------------')

上記のメイン プログラムは、同じコールバック関数を future1 と future2 にそれぞれ追加します。スレッド タスクで使用されます。終了時に戻り値を取得します。

メイン プログラムのコードの最後の行では、水平線が出力されます。プログラムはfuture1とfuture2のresult()を直接呼び出していないため メソッドを使用するため、メインスレッドはブロックされず、出力メインスレッドによって印刷された水平線をすぐに確認できます。次に、2 つの新しいスレッドが同時に実行されていることがわかります。スレッド タスクが完了すると、get_result() 関数がトリガーされ、スレッド タスクの戻り値が出力されます。

さらに、スレッド プールはコンテキスト管理プロトコルを実装しているため、プログラムは ステートメントを使用してスレッド プールを管理することで、上記のプログラムに示すようにスレッド プールを手動で閉じる必要がなくなります。

さらに、Exectuor はマップ (func, *iterables, timeout=None, chunksize=1) も提供します。 このメソッドの機能はグローバル関数の map() に似ていますが、異なる点は、スレッド プールの map() メソッドが iterables の要素ごとにスレッドを開始して func を並行して実行することです。 関数。このメソッドは、len(iterables) スレッドを開始し、各スレッドの実行結果を収集するのと同じです。

たとえば、次のプログラムは、Executor の map() メソッドを使用してスレッドを開始し、スレッド タスクの戻り値を収集します。

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含4条线程的线程池
with ThreadPoolExecutor(max_workers=4) as pool:
# 使用线程执行map计算
# 后面元组有3个元素,因此程序启动3条线程来执行action函数
results = pool.map(action, (50, 100, 150))
print('--------------')
for r in results:
print(r)

上記のプログラムは、map() メソッドを使用して、 3 つのスレッドを開始します (プログラムのスレッド プールには 4 つのスレッドが含まれます) 2 つのスレッドのみを含むスレッド プールを使用し続ける場合、この時点で 1 つのタスクが待機状態になり、スレッドが解放されて実行できるようになる前に、タスクの 1 つが完了するまで待つ必要があります。 )、マップ() メソッドの戻り値は、各スレッド タスクの戻り結果を収集します。

上記のプログラムを実行すると、3 つのスレッドの同時実行の結果も確認でき、最後に、結果を通じて 3 つのスレッドのタスクの戻り結果を確認できます。

上記のプログラムからわかるように、map() メソッドを使用してスレッドを開始し、スレッドの実行結果を収集すると、コードが単純になるだけでなく、プログラムがアクションを実行するという利点もあります。 () 同時に ただし、最後に収集される action() 関数の実行結果は、渡されたパラメーターの結果と一致しています。つまり、上記の結果の最初の要素は action(50) です。 2 番目の要素は action(100) の結果であり、3 番目の要素は action(150) の結果です。

以上がPython スレッド プールとその原理と用途の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。