首頁 >後端開發 >Python教學 >Python並發之PoolExecutor的介紹(附範例)

Python並發之PoolExecutor的介紹(附範例)

不言
不言轉載
2019-03-16 09:50:213094瀏覽

這篇文章帶給大家的內容是關於Python並發之PoolExecutor的介紹(附範例),有一定的參考價值,有需要的朋友可以參考一下,希望對你有所幫助。

使用多執行緒(threading)和多進程(multiprocessing)完成常規的並發需求,在啟動的時候 start、join 等步驟不能省,複雜的需要還要用 1-2 個佇列。
隨著需求越來越複雜,如果沒有良好的設計和抽象這部分的功能層次,程式碼量越多調試的難度就越大。

對於需要並發執行、但是對即時性要求不高的任務,我們可以使用 concurrent.futures 套件中的 PoolExecutor 類別來實現。

這個套件提供了兩個執行器:執行緒池執行器 ThreadPoolExecutor 和進程池執行器 ProcessPoolExecutor,兩個執行器提供相同的 API。

池的概念主要目的是為了重複使用:讓執行緒或進程在生命週期內可以多次使用。它減少了創建創建線程和進程的開銷,提高了程式效能。重複使用不是必須的規則,但它是程式設計師在應用中使用池的主要原因。

池,只有固定個數的執行緒/進程,透過 max_workers 指定。

任務透過 executor.submit 提交到 executor 的任務佇列,傳回一個 future 物件。

Future 是一種常見的並發設計模式。

一個Future物件代表了一些尚未就緒(完成)的結果,在「將來」的某個時間就緒了之後就可以得到這個結果。

任務被調度到各個 workers 中執行。

但要注意,一個任務一旦執行,在執行完畢前,就會一直佔用該 worker!如果 workers 不夠用,其他的任務會一直等待!因此 PoolExecutor 不適合即時任務。

import concurrent.futures
import time
from itertools import count

number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

def evaluate_item(x):
    for i in count(x):  # count 是无限迭代器,会一直递增。
        print(f"{x} - {i}")
        time.sleep(0.01)


if __name__ == "__main__":
        # 进程池
        start_time_2 = time.time()

        # 使用 with 在离开此代码块时,自动调用 executor.shutdown(wait=true) 释放 executor 资源
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
                # 将 10 个任务提交给 executor,并收集 futures
                futures = [executor.submit(evaluate_item, item) for item in number_list]

                # as_completed 方法等待 futures 中的 future 完成
                # 一旦某个 future 完成,as_completed 就立即返回该 future
                # 这个方法,使每次返回的 future,总是最先完成的 future
                # 而不是先等待任务 1,再等待任务 2...
                for future in concurrent.futures.as_completed(futures):
                        print(future.result())
        print ("Thread pool execution in " + str(time.time() - start_time_2), "seconds")

上面的程式碼中,item 為 1 2 3 4 5 的五個任務會一直佔用所有的 workers,而 6 7 8 9 10 這五個任務會永遠等待! ! !

API 詳細說明

concurrent.futures 包含三個部分的API:

PoolExecutor:也就是兩個執行器的API

##建構子:主要的參數是max_workers,用來指定執行緒池大小(或說workers 個數)

submit(fn, *args, **kwargs):將任務函數fn 提交到執行器,args 和kwargs 就是fn 所需的參數。

傳回一個future,用於取得結果

map(func, *iterables, timeout=None, chunksize=1):當任務是同一個,只有參數不同時,可以用這個方法代替submit。 iterables 的每個元素對應 func 的一組參數。

傳回一個 futures 的迭代器

shutdown(wait=True):關閉執行器,一般都會使用 with 管理員自動關閉。

Future:任務提交給執行器後,會傳回一個 future

future.result(timout=None):最常用的方法,傳回任務的結果。如果任務尚未結束,這個方法會一直等待!

timeout 指定逾時時間,為 None 時沒有逾時限制。

exception(timeout=None):給出任務拋出的例外。和 result() 一樣,也會等待任務結束。

cancel():取消此任務

add_done_callback(fn):future 完成後,會執行 fn(future)。

running():是否正在執行

done():future 是否已經結束了,boolean

...詳見官方文件

##模組帶有的實用函數

concurrent.futures.as_completed(fs, timeout=None):等待fs (futures iterable)中的future 完成

#一旦fs 中的某future 完成了,這個函數就立即回傳該future。

這個方法,使每次回傳的 future,總是最先完成的 future。而不是先等待任務 1,再等待任務 2...

常透過 for future in as_completed(fs): 使用此函數。

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED):一直等待,直到return_when 所指定的事發生,或timeout

return_when 有三個選項:ALL_COMPLETED(或timeout

return_when 有三個選項:ALL_COMPLETED(fsfsfs(fsfs)中的futures 全部完成),FIRST__COMPLETED(fs 中任意一個future 完成)還有FIRST_EXCEPTION(某任務拋出異常)

Future 設計模式

這裡的PoolExecutor 的特點,在於它使用了Future 設計模式,使任務的執行,與結果的獲取,變成一個非同步的流程。

我們先透過 submit/map 將任務放入任務佇列,這時任務就已經開始執行了!然後我們在需要的時候,透過 future 取得結果,或直接 add_done_callback(fn)。

這裡任務的執行是在新的 workers 中的,主行程/執行緒不會阻塞,因此主執行緒可以做其他的事。這種方式被稱為非同步程式設計。

畫外

concurrent.futures 基於 multiprocessing.pool 實現,因此實際上它比直接使用 執行緒/進程 的 Pool 要慢一點。但它提供了更方便簡潔的 API。

以上是Python並發之PoolExecutor的介紹(附範例)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:cnblogs.com。如有侵權,請聯絡admin@php.cn刪除