首頁 >常見問題 >Python執行緒池及其原理和使用

Python執行緒池及其原理和使用

zbt
zbt原創
2023-06-20 16:44:251729瀏覽

系統啟動一個新執行緒的成本是比較高的,因為它涉及與作業系統的互動。在這種情況下,使用執行緒池可以很好地提升效能,尤其是當程式中需要建立大量生存期很短暫的執行緒時,更應該考慮使用執行緒池。線程池在系統啟動時即創建大量空閒的線程,程式只要將函數提交給線程池,線程池就會啟動一個空閒的線程來執行它。當函數執行結束後該執行緒並不會死亡,而是再次回到執行緒池中變成空閒狀態等待執行下一個函數。

Python執行緒池及其原理和使用

系統啟動新執行緒的成本是比較高的,因為它涉及與作業系統的互動。在這種情況下,使用執行緒池可以很好地提升效能,尤其是當程式中需要建立大量生存期很短暫的執行緒時,更應該考慮使用執行緒池。

線程池在系統啟動時即創建大量空閒的線程,程式只要將函數提交給線程池,線程池就會啟動一個空閒的線程來執行它。當函數執行結束後,該執行緒並不會死亡,而是再次回到執行緒池中變成空閒狀態,等待下一個函數。

此外,使用執行緒池可以有效地控制系統中並發執行緒的數量。當系統中包含有大量的並發線程時,會導致系統效能急劇下降,甚至導致 Python 解釋器崩潰,而線程池的最大線程數參數可以控制系統中並發線程的數量不超過此數。

線程池的使用

線程池的基類是 concurrent.futures 模組中的 Executor,Executor 提供了兩個子類,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用來建立執行緒池,而 ProcessPoolExecutor 用於建立進程池。

如果使用線程池/進程池來管理並發編程,那麼只要將相應的 task 函數提交給線程池/進程池,剩下的事情就由線程池/進程池來搞定。

Exectuor 提供如下常用方法:

submit(fn, *args, **kwargs):將 fn 函數提交給執行緒池。 *args 代表傳給 fn 函數的參數,*kwargs 代表以關鍵字參數的形式為 fn 函數傳入參數。

map(func, *iterables, timeout=None, chunksize=1):此函數類似全域函數 map(func, *iterables),只是函數將會啟動多個線程,以非同步方式立即對 iterables 執行 map 處理。

shutdown(wait=True):關閉執行緒池。

程式將 task 函數提交(submit)給執行緒池後,submit 方法會傳回一個 Future 對象,Future 類別主要用於取得線程任務函數的返回值。由於線程任務會在新線程中以非同步方式執行,因此,線程執行的函數相當於一個「將來完成」的任務,所以 Python 使用 Future 來代表。

實際上,在 Java 的多執行緒程式設計中同樣有 Future,此處的 Future 與 Java 的 Future 大同小異。

Future 提供以下方法:

cancel():取消該 Future 代表的執行緒任務。如果該任務正在執行,不可取消,則該方法傳回 False;否則,程式會取消該任務,並傳回 True。

cancelled():傳回 Future 代表的執行緒任務是否成功取消。

running():如果該 Future 代表的執行緒任務正在執行、不可被取消,則該方法傳回 True。

done():如果該 Funture 代表的執行緒任務被成功取消或執行完成,則該方法傳回 True。

result(timeout=None):取得該 Future 代表的執行緒任務最後回傳的結果。如果 Future 代表的線程任務尚未完成,該方法將會阻塞當前線程,其中 timeout 參數指定最多阻塞多少秒。

exception(timeout=None):取得該 Future 代表的執行緒任務所引發的例外狀況。如果該任務成功完成,沒有異常,則該方法返回 None。

add_done_callback(fn):為該 Future 代表的執行緒任務註冊一個“回調函數”,當該任務成功完成時,程式會自動觸發該 fn 函數。

在用完一個執行緒池後,應該呼叫該執行緒池的 shutdown() 方法,該方法會啟動執行緒池的關閉序列。呼叫 shutdown() 方法後的執行緒池不再接收新任務,但會將先前所有的已提交任務執行完成。當執行緒池中的所有任務都執行完成後,該執行緒池中的所有執行緒都會死亡。

使用執行緒池來執行執行緒任務的步驟如下:

呼叫 ThreadPoolExecutor 類別的建構器建立一個執行緒池。

定義一個普通函數作為執行緒任務。

呼叫 ThreadPoolExecutor 物件的 submit() 方法來提交執行緒任務。

當不想提交任何任務時,呼叫 ThreadPoolExecutor 物件的 shutdown() 方法來關閉執行緒池。

下面程式示範如何使用執行緒池來執行執行緒任務:

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含2条线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
# 判断future1代表的任务是否结束
print(future1.done())
time.sleep(3)
# 判断future2代表的任务是否结束
print(future2.done())
# 查看future1代表的任务返回的结果
print(future1.result())
# 查看future2代表的任务返回的结果
print(future2.result())
# 关闭线程池
pool.shutdown()

上面程式中,第 13 行程式碼建立了一個包含兩個執行緒的執行緒池,接下來的兩行程式碼只要將 action() 函數提交(submit)給執行緒池,該執行緒池就會負責啟動執行緒來執行 action() 函數。這種啟動線程的方法既優雅,又具有更高的效率。

當程式把 action() 函數提交給執行緒池時,submit() 方法會傳回該任務所對應的 Future 對象,程式立即判斷 futurel 的 done() 方法,此方法將會傳回 False(表示此時該任務尚未完成)。接下來主程式暫停 3 秒,然後判斷 future2 的 done() 方法,如果此時該任務已經完成,那麼該方法將會傳回 True。

程式最後透過 Future 的 result() 方法來取得兩個非同步任務傳回的結果。

讀者可以自己執行此程式碼查看運行結果,這裡不再示範。

當程式使用 Future 的 result() 方法來取得結果時,方法會阻塞目前線程,如果沒有指定 timeout 參數,目前執行緒將一直處於阻塞狀態,直到 Future 代表的任務返回。

取得執行結果

前面程式呼叫了Future 的result() 方法來取得執行緒任務的運回值,但該方法會阻塞目前主線程,只有等到錢程任務完成後,result() 方法的阻塞才會解除。

如果程式不希望直接呼叫 result() 方法阻塞線程,則可透過 Future 的 add_done_callback() 方法來加入回調函數,該回呼函數形如 fn(future)。當執行緒任務完成後,程式會自動觸發該回呼函數,並將對應的 Future 物件作為參數傳給該回呼函數。

下面程式使用add_done_callback() 方法來取得執行緒任務的回傳值:

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含2条线程的线程池
with ThreadPoolExecutor(max_workers=2) as pool:
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
def get_result(future):
print(future.result())
# 为future1添加线程完成的回调函数
future1.add_done_callback(get_result)
# 为future2添加线程完成的回调函数
future2.add_done_callback(get_result)
print('--------------')

上面主程式分別為future1、future2 新增了同一個回呼函數,該回呼函數會在執行緒任務結束時取得其傳回值。

主程式的最後一行程式碼列印了一條橫線。由於程式並未直接呼叫 future1、future2 的 result() 方法,因此主執行緒不會被阻塞,可以立即看到輸出主執行緒列印出的橫線。接下來將會看到兩個新執行緒並發執行,當執行緒任務執行完成後,get_result() 函數被觸發,輸出線程任務的回傳值。

另外,由於執行緒池實作了上下文管理協定(Context Manage Protocol),因此,程式可以使用 with 語句來管理執行緒池,這樣即可避免手動關閉執行緒池,如上面的程式所示。

此外,Exectuor 也提供了一個 map(func, *iterables, timeout=None, chunksize=1) 方法,該方法的功能類似於全域函數 map(),差異在於線程池的 map() 方法會為 iterables 的每個元素啟動一個線程,以並發方式執行 func 函數。這種方式相當於啟動 len(iterables) 個線程,井收集每個線程的執行結果。

例如,如下程式使用Executor 的map() 方法來啟動執行緒,並收集執行緒任務的回傳值:

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含4条线程的线程池
with ThreadPoolExecutor(max_workers=4) as pool:
# 使用线程执行map计算
# 后面元组有3个元素,因此程序启动3条线程来执行action函数
results = pool.map(action, (50, 100, 150))
print('--------------')
for r in results:
print(r)

上面程式使用map() 方法來啟動3 個執行緒(該程式的執行緒池包含4 個線程,如果繼續使用只包含兩個線程的線程池,此時將有一個任務處於等待狀態,必須等其中一個任務完成,線程空閒出來才會獲得執行的機會),map() 方法的回傳值將會收集每個執行緒任務的回傳結果。

執行上面程序,同樣可以看到 3 個執行緒並發執行的結果,最後透過 results 可以看到 3 個執行緒任務的回傳結果。

透過上面程式可以看出,使用 map() 方法來啟動線程,並收集線程的執行結果,不僅具有程式碼簡單的優點,而且雖然程式會以並發方式來執行 action() 函數,但最後收集到的 action() 函數的執行結果,依然與傳入參數的結果一致。也就是說,上面 results 的第一個元素是 action(50) 的結果,第二個元素是 action(100) 的結果,第三個元素是 action(150) 的結果。

以上是Python執行緒池及其原理和使用的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn