首頁  >  文章  >  後端開發  >  詳細了解Python進程池與進程鎖

詳細了解Python進程池與進程鎖

WBOY
WBOY轉載
2022-05-10 18:11:232609瀏覽

本篇文章為大家帶來了關於python的相關知識,其中主要介紹了關於進程池與進程鎖的相關問題,包括進程池的創建模組,進程池函數等等內容,下面一起來看一下,希望對大家有幫助。

詳細了解Python進程池與進程鎖

推薦學習:python影片教學

#進程池

什麼是進程池

上一章節關於進程的問題我們提到過,進程創建太多的情況下就會對資源消耗過大。為了避免這種情況,我們就需要固定進程的數量,這時候就需要進程池的幫助。

我們可以認為進程池就是一個池子,在這個池子裡提前創建好一定數量的進程。見下圖:


詳細了解Python進程池與進程鎖


例如這個紅色矩形陣列就代表一個行程池子,在這個池子裡有6個行程。這6個進程會伴隨進程池一起被創建,不僅如此,我們在學習物件導向的生命週期的時候曾經說過,每個實例化物件在使用完成之後都會被記憶體管家回收。

我們的流程也會伴隨著創建與關閉的過程而被記憶體管家回收,每一個都是如此,創建於關閉進程的過程也會消耗一定的效能。而進程池中的進程當被創建之後就不會被關閉,可以一直被重複使用,從而避免了創建於關閉的資源消耗,也避免了創建於關閉的反复操作提高了效率。

當然,當我們執行完程式進程池關閉的時候,進程也隨之關閉。

當我們有任務需要被執行的時候,會判斷目前的進程池當中有沒有空閒的進程(所謂空閒的進程其實就是進程池中沒有執行任務的進程)。有進程處於空閒狀態的情況下,任務會找到進程執行該任務。如果目前進程池中的進程都處於非空閒狀態,則任務就會進入等待狀態,直到進程池中有進程處於空閒狀態才會進出進程池從而執行該任務。

這就是進程池的作用。

進程池的建立模組- multiprocessing

#建立進程池函數- Pool

##函數名稱介紹參數#傳回值PoolPool #進程池的建立

Processcount進程池物件

#Pool功能介紹:透過呼叫"multiprocessing" 模組的"Pool" 函數來幫助我們建立"進程池物件" ,它有一個參數"Processcount" (一個整數),代表我們在這個進程池中建立幾個進程。

進程池的常用方法函數名稱介紹參數傳回值
當創建了進程池物件之後,我們要對它進程操作,讓我們來看看都有哪些常用方法(函數)。
apply_async 任務加入進程池(非同步) func,args
close 關閉進程池
#join######等待進程池任務結束### ###無######無#############
  • apply_async 函數:它的功能是將任務加入到進程池中,並且是透過非同步實現的。 非同步 這個知識我們還沒有學習,先不用關心它到底是什麼意思。它有兩個參數:func 與agrs , func 是加入進程池中工作的函數;args 是一個元組,代表著簽一個函數的參數,這和我們創建並使用一個進程是完全一致的。
  • close 函數:當我們使用完進程池之後,透過呼叫 close 函數可以關閉進程池。它沒有任何的參數,也沒有任何的回傳值。
  • join 函數:它和我們上一章學習的 建立進程的 join 函數中方法是一致的。只有進程池中的任務全部執行完畢之後,才會執行後續的任務。不過一般它會伴隨著進程池的關閉(close 函數)才會被使用。

apply_async 函數演示案例

#接下裡我們在 Pycharm 中建立一個腳本,練習一下關於進程池的使用方法。

  • 定義一個函數,列印輸出該函數每次被執行的次數與該次數的行程編號
  • 定義進程池的數量,每一次的執行行程數量最多為該進程池設定的進程數

範例程式碼如下:

# coding:utf-8import osimport timeimport multiprocessingdef work(count):    
# 定义一个 work 函数,打印输出 每次执行的次数 与 该次数的进程号
    print('\'work\' 函数 第 {} 次执行,进程号为 {}'.format(count, os.getpid()))
    time.sleep(3)
    # print('********')if __name__ == '__main__':
    pool = multiprocessing.Pool(3)      
    # 定义进程池的进程数量,同一时间每次执行最多3个进程
    for i in range(21):
        pool.apply_async(func=work, args=(i,))      
        # 传入的参数是元组,因为我们只有一个 i 参数,所以我们要写成 args=(i,)

    time.sleep(15)      
    # 这里的休眠时间是必须要加上的,否则我们的进程池还未运行,主进程就已经运行结束,对应的进程池也会关闭。

運行結果如下:


詳細了解Python進程池與進程鎖

從上圖中我們可以看到每一次都是一次性運行三個進程,每一個進程的進程號是不一樣的,但仔細看會發現存在相同的進程號,這說明進程池的進程號在重複利用。這證明我們上文介紹的內容,進程池中的進程不會被關閉,可以重複使用。

而且我們還可以看到每隔3秒都會執行3個進程,原因是我們的進程池中只有3個進程;雖然我們的for 循環 中有21 個任務,work 函數會被執行21次,但由於我們的進程池中只有3個進程。所以執行了3個任務之後(休眠3秒),後面的任務等待進程池中的進程處於空閒狀態之後才會繼續執行。

同樣的,進程號在順序上回出現一定的區別,原因是因為我們使用的是一種 非同步 的方法(非同步即非同步)。這就導致 work 函數 一起執行的三個任務會被打亂順序,這也是為什麼我們的進程號出現順序不一致的原因。 (更多的非同步知識我們會在非同步的章節進行詳細介紹

#進程池的原理: 上述腳本的案例證實了我們進程池關於進程的限制,只有當我們進程池中的進程處於空閒狀態的時候才會將進程池外等待的任務丟到進程池中工作。


 close 函數與join 函數示範

在上文的腳本中, 我們使用time.sleep(15) 幫助我們將主進程阻塞15秒鐘再次退出,所以給了我們進程池足夠的時間完成我們的work() 函數的循環任務。

如果沒有 time.sleep(15) 這句話又怎麼辦呢,其實這裡就可以使用進程的 join 函數了。不過上文我們也提到過,進程的 join() 函數一般都會伴隨進程池的關閉(close 函數)來使用。接下來,我們就將上文腳本中的 time.sleep(15) 替換成 join() 函數試試看。

範例程式碼如下:

# coding:utf-8import osimport timeimport multiprocessingdef work(count):    
# 定义一个 work 函数,打印输出 每次执行的次数 与 该次数的进程号
    print('\'work\' 函数 第 {} 次执行,进程号为 {}'.format(count, os.getpid()))
    time.sleep(3)
    # print('********')if __name__ == '__main__':
    pool = multiprocessing.Pool(3)      # 定义进程池的进程数量,同一时间每次执行最多3个进程
    for i in range(21):
        pool.apply_async(func=work, args=(i,))      
        # 传入的参数是元组,因为我们只有一个 i 参数,所以我们要写成 args=(i,)

    # time.sleep(15) 
    pool.close()
    pool.join()

運行結果如下:


詳細了解Python進程池與進程鎖

#從上面的動圖我們可以看出,work() 函數的任務與進程池中的進程與使用time.sleep(15)的運行結果一致。

PS:如果我們的主程序會一直執行,就不會退出。那我們並不需要加入 close() 與 join() 函數 ,可以讓進程池一直啟動著,直到有任務進來就會執行。

在後面學習 WEB 開發之後,不退出主行程進行工作是家常便飯。還有一些需要長期執行的任務也不會關閉,但如果只有一次性執行的腳本,就需要添加close() 與join() 函數 來保證進程池的任務全部完成之後主進程再退出。當然,如果主進程關閉了,就不會再接受新的任務了,也就代表了進程池的終結。


接下來再看一個例子,在 work 函數 加入一個 return。

這裡大家可能會有一個疑問,在上一章節針對進程的知識點明明說的是進程無法取得返回值,那麼這裡的work() 函數增加的return 又有什麼意義呢?

其实不然,在我们的使用进程池的 apply_async 方法时,是通过异步的方式实现的,而异步是可以获取返回值的。针对上述脚本,我们在 for循环中针对每一个异步 apply_async 添加一个变量名,从而获取返回值。

示例代码如下:

# coding:utf-8import osimport timeimport multiprocessingdef work(count):    # 定义一个 work 函数,打印输出 每次执行的次数 与 该次数的进程号
    print('\'work\' 函数 第 {} 次执行,进程号为 {}'.format(count, os.getpid()))
    time.sleep(3)
    return '\'work\' 函数 result 返回值为:{}, 进程ID为:{}'.format(count, os.getpid())if __name__ == '__main__':
    pool = multiprocessing.Pool(3)      # 定义进程池的进程数量,同一时间每次执行最多3个进程
    results = []
    for i in range(21):
        result = pool.apply_async(func=work, args=(i,))      # 传入的参数是元组,因为我们只有一个 i 参数,所以我们要写成 args=(i,)
        results.append(result)

    for result in results:
        print(result.get())     # 可以通过这个方式返回 apply_async 的返回值,
                                # 通过这种方式也不再需要 使用 close()、join() 函数就可以正常执行。

    # time.sleep(15)      # 这里的休眠时间是必须要加上的,否则我们的进程池还未运行,主进程就已经运行结束,对应的进程池也会关闭。
    # pool.close()
    # pool.join()

运行结果如下:


詳細了解Python進程池與進程鎖

从运行结果可以看出,首先 work() 函数被线程池的线程执行了一遍,当第一组任务执行完毕紧接着执行第二次线程池任务的时候,打印输出了 apply_async 的返回值,证明返回值被成功的返回了。然后继续下一组的任务…

这些都是主要依赖于 异步 ,关于 异步 的更多知识会在 异步 的章节进行详细的介绍。


进程锁

 进程锁的概念

锁:大家都知道,我们可以给一个大门上锁。

结合这个场景来举一个例子:比如现在有多个进程同时冲向一个 "大门" ,当前门内是没有 "人"的(其实就是进程),锁也没有锁上。当有一个进程进去之后并且把 “门” 锁上了,这时候门外的那些进程是进不来的。在门内的 “人” ,可以在 “门” 内做任何事情且不会被干扰。当它出来之后,会解开门锁。这时候又有一个 “人” 进去了门内,并且重复这样的操作,这就是 进程锁。它可以让锁后面的工作只能被一个任务来处理,只有它解锁之后下一个任务才会进入,这就是 “锁” 的概念。

进程锁 就是仅针对于 进程 有效的锁,当进程的任务开始之后,就会被上一把 “锁”;与之对应的是 线程锁 ,它们的原理几乎是一样的。

进程锁的加锁与解锁

进程锁的使用方法:

通过 multiprocessing 导入 Manager 类

from multiprocessing import Manager

然后实例化 Manager

manager = Manager()

再然后通过实例化后的 manager 调用 它的 Lock() 函数

lock = manager.Lock()

接下来,就需要操作这个 lock 对象的函数

函数名 介绍 参数 返回值
acquire 上锁
release 解锁(开锁)

代码示例如下:

# coding:utf-8import osimport timeimport multiprocessingdef work(count, lock):    # 定义一个 work 函数,打印输出 每次执行的次数 与 该次数的进程号,增加线程锁。
    lock.acquire()        # 上锁
    print('\'work\' 函数 第 {} 次执行,进程号为 {}'.format(count, os.getpid()))
    time.sleep(3)
    lock.release()        # 解锁
    return '\'work\' 函数 result 返回值为:{}, 进程ID为:{}'.format(count, os.getpid())if __name__ == '__main__':
    pool = multiprocessing.Pool(3)      # 定义进程池的进程数量,同一时间每次执行最多3个进程
    manager = multiprocessing.Manager()
    lock = manager.Lock()
    results = []
    for i in range(21):
        result = pool.apply_async(func=work, args=(i, lock))      # 传入的参数是元组,因为我们只有一个 i 参数,所以我们要写成 args=(i,)
        # results.append(result)


    # time.sleep(15)      # 这里的休眠时间是必须要加上的,否则我们的进程池还未运行,主进程就已经运行结束,对应的进程池也会关闭。
    pool.close()
    pool.join()

执行结果如下:


詳細了解Python進程池與進程鎖

从上图中,可以看到每一次只有一个任务会被执行。由于每一个进程会被阻塞 3秒钟,所以我们的进程执行的非常慢。这是因为每一个进程进入到 work() 函数中,都会执行 上锁、阻塞3秒、解锁 的过程,这样就完成了一个进程的工作。下一个进程任务开始,重复这个过程… 这就是 进程锁的概念


其实进程锁还有很多种方法,在 multiprocessing 中有一个直接使用的锁,就是 ``from multiprocessing import Lock。这个Lock的锁使用和我们刚刚介绍的Manager` 的锁的使用有所区别。(这里不做详细介绍,感兴趣的话可以自行拓展一下。)

的使用可以让我们对某个任务 在同一时间只能对一个进程进行开发,但是 锁也不可以乱用 。因为如果某些原因造成 锁没有正常解开 ,就会造成死锁的现象,这样就无法再进行操作了。

因为 锁如果解不开 ,后面的任务也就没有办法继续执行任务,所以使用锁一定要谨慎。

推荐学习:python视频教程

以上是詳細了解Python進程池與進程鎖的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:csdn.net。如有侵權,請聯絡admin@php.cn刪除