首頁 >後端開發 >Python教學 >python中多進程的詳細介紹(程式碼範例)

python中多進程的詳細介紹(程式碼範例)

不言
不言原創
2018-08-29 10:25:011840瀏覽

這篇文章帶給大家的內容是關於python中多進程的詳細介紹(程式碼範例),有一定的參考價值,有需要的朋友可以參考一下,希望對你有幫助。

本節講學習Python的多進程。

一、多進程和多執行緒比較

多進程Multiprocessing 和多執行緒threading 類似, 他們都是在python 中用來並行運算的. 不過既然有了threading, 為什麼Python 還要出一個multiprocessing 呢? 原因很簡單, 就是用來彌補threading 的一些劣勢, 比如在threading 教程中提到的GIL.

使用multiprocessing 也非常簡單, 如果對threading 有一定了解的朋友, 你們的享受時間就到了. 因為python 把multiprocessing 和threading 的使用方法做的幾乎差不多. 這樣我們就更容易上手. 也更容易發揮你電腦多核心系統的威力了!

二、新增進程Process

import multiprocessing as mp
import threading as td

def job(a,d):
    print('aaaaa')

t1 = td.Thread(target=job,args=(1,2))
p1 = mp.Process(target=job,args=(1,2))
t1.start()
p1.start()
t1.join()
p1.join()

從上面的使用比較程式碼可以看出,執行緒和進程的使用方法相似。

使用

在運用時需要新增上一個定義main函數的語句

if __name__=='__main__':

完整的應用程式碼:

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_test.py
@time: 18/8/26 01:12
"""

import multiprocessing as mp

def job(a, d):
    print a, d

if __name__ == '__main__':
    p1 = mp.Process(target=job, args=(1, 2))
    p1.start()
    p1.join()

執行環境要在terminal環境下,可能其他的編輯工具會出現運行結束後沒有列印結果,在terminal中的運行後打印的結果為:

➜  baseLearn python ./process/process_test.py
1 2
➜  baseLearn

三、存儲程序輸出Queue

Queue的功能是將每個核或執行緒的運算結果放在隊中, 等到每個執行緒或核運行完畢後再從佇列中取出結果, 繼續載入運算。原因很簡單, 多執行緒呼叫的函數不能有傳回值, 所以使用Queue儲存多個執行緒運算的結果

#process_queue.py

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_queue.py
@time: 18/8/26 01:12
"""

import multiprocessing as mp

# 定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果
def job(q):
    res = 0
    for i in range(1000):
        res += i + i**2 + i**3
    q.put(res)   #queue

if __name__ == '__main__':
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))

    # 分别启动、连接两个线程
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    # 上面是分两批处理的,所以这里分两批输出,将结果分别保存
    res1 = q.get()
    res2 = q.get()

    print res1,res2

列印輸出結果:

➜ python ./process/process_queue.py
249833583000 249833583000

四、進程池

進程池就是我們將要運行的東西,放到池子裡,Python會自行解決多進程的問題

1、導入多進程模組

首先import multiprocessing 和定義job()

import multiprocessing as mp

def job(x):
    return x*x

2、進程池Pool ()和map()

然後我們定義一個Pool

pool = mp.Pool()

有了池子之後,就可以讓池子對應某一個函數,我們向池子裡丟資料,池子就會傳回函數傳回的值。 Pool和先前的Process的不同點是丟向Pool的函數有回傳值,而Process沒有回傳值

接下來用map()取得結果,在map()中需要放入函數和需要迭代運算的值,然後它會自動分配給CPU核,回傳結果

res = pool.map(job, range(10))

讓我們來運行一下

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    
if __name__ == '__main__':
    multicore()

完成程式碼:

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_queue.py
@time: 18/8/26 01:12
"""

import multiprocessing as mp

def job(x):
    return x*x  # 注意这里的函数有return返回值

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    
if __name__ == '__main__':
    multicore()

執行結果:

➜  baseLearn python ./process/process_pool.py
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

3、自訂核數量

我們怎麼知道Pool是否真的呼叫了多個核心呢?我們可以把迭代次數增大些,然後打開CPU負載看下CPU運行情況

打開CPU負載(Mac):活動監視器> CPU > CPU負載(單擊即可)

Pool預設大小是CPU的核數,我們也可以透過在Pool中傳入processes參數即可自訂所需的核數

def multicore():
    pool = mp.Pool(processes=3) # 定义CPU核数量为3
    res = pool.map(job, range(10))
    print(res)

4、apply_async()

Pool除了map()外,還有可以回傳結果的方式,那就是apply_async() .

apply_async()只能傳遞一個值,它只會放入一個核進行運算,但是傳入值時要注意是可迭代的,所以在傳入值後需要加逗號, 同時需要用get()方法取得回傳值

def multicore():
    pool = mp.Pool() 
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get获得结果
    print(res.get())

運行結果;

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]  # map()
4 # apply_async()

總結

  • Pool預設呼叫是CPU的核數,傳入processes參數可自訂CPU核數

  • map() 放入迭代參數,傳回多個結果

  • apply_async()只能放入一組參數,並傳回一個結果,如果想得到map()的效果需要透過迭代

五、共享記憶體shared memory

這節我們學習如何定義共享記憶體。 只有用共享記憶體才能讓CPU之間有交流

Shared Value

我們可以透過使用Value資料儲存在一個共享的記憶體表中。

import multiprocessing as mp

value1 = mp.Value('i', 0) 
value2 = mp.Value('d', 3.14)

其中di參數用來設定資料類型的,d表示一個雙精浮點型別double,i表示一個帶符號的整數

Type code C Type Python Type Minimum size in bytes
'b' signed char int 1
'B' unsigned char int 1
'u' Py_UNICODE Unicode character 2
'h' signed short int 2
'H' unsigned short int 2
'i' signed int int 2
'I' unsigned int int 2
'l' signed long int 4
'L' unsigned long int 4
'q' signed long long int 8
'Q' unsigned long long int 8
'f' float float 4
'd' double float 8

Shared Array

在Python的 mutiprocessing 中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据

array = mp.Array('i', [1, 2, 3, 4])

这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。 我们会在后一节举例说明这两种的使用方法.

错误形式

array = mp.Array('i', [[1, 2], [3, 4]]) # 2维list

"""
TypeError: an integer is required
"""

六、进程锁Lock

不加进程锁

让我们看看没有加进程锁时会产生什么样的结果。

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_no_lock.py
@time: 18/8/26 09:22
"""

import multiprocessing as mp
import time

def job(v, num):
    for _ in range(5):
        time.sleep(0.5) # 暂停0.5秒,让输出效果更明显
        v.value += num  # v.value获取共享变量值
        print(v.value)

def multicore():
    v = mp.Value('i', 0)  # 定义共享变量
    p1 = mp.Process(target=job, args=(v, 1))
    p2 = mp.Process(target=job, args=(v, 4)) # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()

在上面的代码中,我们定义了一个共享变量v,两个进程都可以对它进行操作。 在job()中我们想让v每隔0.1秒输出一次累加num的结果,但是在两个进程p1p2 中设定了不同的累加值。所以接下来让我们来看下这两个进程是否会出现冲突。

结果打印:

➜  baseLearn python ./process/process_no_lock.py
1
5
9
9
13
13
17
17
18
18
➜  baseLearn

我们可以看到,进程1和进程2在相互着使用共享内存v

加进程锁

为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。

首先需要定义一个进程锁

 l = mp.Lock() # 定义一个进程锁

然后将进程锁的信息传入各个进程中

p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入
p2 = mp.Process(target=job, args=(v,3,l))

job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占

def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # v.value获取共享内存
        print(v.value)
    l.release() # 释放

全部代码:

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_lock.py
@time: 18/8/26 09:22
"""

import multiprocessing as mp
import time

def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.5) # 暂停0.5秒,让输出效果更明显
        v.value += num  # v.value获取共享变量值
        print(v.value)
    l.release() # 释放

def multicore():
    l = mp.Lock() # 定义一个进程锁
    v = mp.Value('i', 0)  # 定义共享变量
    p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将lock传入
    p2 = mp.Process(target=job, args=(v, 4, l)) # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()

运行一下,让我们看看是否还会出现抢占资源的情况:

结果打印:

➜  baseLearn python ./process/process_lock.py
1
2
3
4
5
9
13
17
21
25

显然,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行

相关推荐:

python 多进程通信模块

Python守护进程(多线程开发)

以上是python中多進程的詳細介紹(程式碼範例)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn