Python 中的多線程其實不是真正的多線程,如果想要充分地使用多核心 CPU 的資源,在 Python 中大部分情況都需要使用多進程。 Python 提供了非常好用的多進程套件 multiprocessing,只需要定義一個函數,Python 會完成其他所有事情。借助這個套件,可以輕鬆完成從單一進程到並發執行的轉換。 multiprocessing 支援子進程、通訊和共享資料、執行不同形式的同步,提供了 Process、Queue、Pipe、Lock 等元件。
1、類別Process
建立流程的類別:Process([group [, target [, name [, args [, kwargs]]]]])
target 表示呼叫物件
args 表示呼叫物件的位置參數元組
#kwargs表示呼叫物件的字典
name為別名
group實質上不使用
下面看一個創建函數並將其作為多個進程的例子:
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import multiprocessing import time def worker(interval, name): print(name + '【start】') time.sleep(interval) print(name + '【end】') if __name__ == "__main__": p1 = multiprocessing.Process(target=worker, args=(2, '两点水1')) p2 = multiprocessing.Process(target=worker, args=(3, '两点水2')) p3 = multiprocessing.Process(target=worker, args=(4, '两点水3')) p1.start() p2.start() p3.start() print("The number of CPU is:" + str(multiprocessing.cpu_count())) for p in multiprocessing.active_children(): print("child p.name:" + p.name + "\tp.id" + str(p.pid)) print("END!!!!!!!!!!!!!!!!!")
輸出的結果:
多進程輸出結果
2、把進程創建成類
當然我們也可以把進程創建成一個類,如下面的例子,當進程p 呼叫start() 時,自動調用run() 方法。
# -*- coding: UTF-8 -*- import multiprocessing import time class ClockProcess(multiprocessing.Process): def __init__(self, interval): multiprocessing.Process.__init__(self) self.interval = interval def run(self): n = 5 while n > 0: print("当前时间: {0}".format(time.ctime())) time.sleep(self.interval) n -= 1 if __name__ == '__main__': p = ClockProcess(3) p.start()
輸出結果如下:
建立進程類別
#3、daemon 屬性
想知道daemon 屬性有什麼用,看下面兩個例子吧,一個加了daemon 屬性,一個沒有加,對比輸出的結果:
沒有加deamon 屬性的例子:
# -*- coding: UTF-8 -*- import multiprocessing import time def worker(interval): print('工作开始时间:{0}'.format(time.ctime())) time.sleep(interval) print('工作结果时间:{0}'.format(time.ctime())) if __name__ == '__main__': p = multiprocessing.Process(target=worker, args=(3,)) p.start() print('【EMD】')
輸出結果:
【EMD】 工作开始时间:Mon Oct 9 17:47:06 2017 工作结果时间:Mon Oct 9 17:47:09 2017
在上面範例中,進程p 新增daemon 屬性:
# -*- coding: UTF-8 -*- import multiprocessing import time def worker(interval): print('工作开始时间:{0}'.format(time.ctime())) time.sleep(interval) print('工作结果时间:{0}'.format(time.ctime())) if __name__ == '__main__': p = multiprocessing.Process(target=worker, args=(3,)) p.daemon = True p.start() print('【EMD】')
輸出結果:
【EMD】
根據輸出結果可見,如果在子進程中新增了daemon 屬性,那麼當主進程結束的時候,子進程也會跟著結束。所以沒有列印子進程的資訊。
4、join 方法
結合上面的範例繼續,如果我們想要讓子執行緒執行完該怎麼做呢?
那麼我們可以用到 join 方法,join 方法的主要作用是:阻塞目前進程,直到呼叫 join 方法的那個進程執行完,然後再繼續執行目前進程。
因此看下加了join 方法的例子:
import multiprocessing import time def worker(interval): print('工作开始时间:{0}'.format(time.ctime())) time.sleep(interval) print('工作结果时间:{0}'.format(time.ctime())) if __name__ == '__main__': p = multiprocessing.Process(target=worker, args=(3,)) p.daemon = True p.start() p.join() print('【EMD】')
輸出的結果:
工作开始时间:Tue Oct 10 11:30:08 2017 工作结果时间:Tue Oct 10 11:30:11 2017 【EMD】
5、Pool
如果需要很多的子進程,我們就需要一個一個的去創建嗎?
當然不用,我們可以使用進程池的方法批次建立子進程。
範例如下:
# -*- coding: UTF-8 -*- from multiprocessing import Pool import os, time, random def long_time_task(name): print('进程的名称:{0} ;进程的PID: {1} '.format(name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('进程 {0} 运行了 {1} 秒'.format(name, (end - start))) if __name__ == '__main__': print('主进程的 PID:{0}'.format(os.getpid())) p = Pool(4) for i in range(6): p.apply_async(long_time_task, args=(i,)) p.close() # 等待所有子进程结束后在关闭主进程 p.join() print('【End】')
輸出的結果如下:
主进程的 PID:7256 进程的名称:0 ;进程的PID: 1492 进程的名称:1 ;进程的PID: 12232 进程的名称:2 ;进程的PID: 4332 进程的名称:3 ;进程的PID: 11604 进程 2 运行了 0.6500370502471924 秒 进程的名称:4 ;进程的PID: 4332 进程 1 运行了 1.0830621719360352 秒 进程的名称:5 ;进程的PID: 12232 进程 5 运行了 0.029001712799072266 秒 进程 4 运行了 0.9720554351806641 秒 进程 0 运行了 2.3181326389312744 秒 进程 3 运行了 2.5331451892852783 秒 【End】
這裡有一點要注意: Pool 物件呼叫join() 方法會等待所有子程序執行完畢,呼叫join() 之前必須先呼叫close() ,呼叫close() 之後就不能繼續加入新的Process 了。
請注意輸出的結果,子進程0,1,2,3是立刻執行的,而子進程4 要等待前面某個子進程完成後才執行,這是因為Pool 的預設大小在我的電腦上是4,因此,最多同時執行4 個行程。這是 Pool 有意設計的限制,並不是作業系統的限制。如果改成:
p = Pool(5)
就可以同時跑 5 個進程。
6、進程間通訊
Process 之間一定是需要通訊的,作業系統提供了許多機制來實現進程間的通訊。 Python 的 multiprocessing 模組包裝了底層的機制,提供了Queue、Pipes 等多種方式來交換資料。
以 Queue 為例,在父進程中建立兩個子進程,一個往 Queue 寫數據,一個從 Queue 讀取資料:
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- from multiprocessing import Process, Queue import os, time, random def write(q): # 写数据进程 print('写进程的PID:{0}'.format(os.getpid())) for value in ['两点水', '三点水', '四点水']: print('写进 Queue 的值为:{0}'.format(value)) q.put(value) time.sleep(random.random()) def read(q): # 读取数据进程 print('读进程的PID:{0}'.format(os.getpid())) while True: value = q.get(True) print('从 Queue 读取的值为:{0}'.format(value)) if __name__ == '__main__': # 父进程创建 Queue,并传给各个子进程 q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 启动子进程 pw pw.start() # 启动子进程pr pr.start() # 等待pw结束: pw.join() # pr 进程里是死循环,无法等待其结束,只能强行终止 pr.terminate()
輸出的結果為:
读进程的PID:13208 写进程的PID:10864 写进 Queue 的值为:两点水 从 Queue 读取的值为:两点水 写进 Queue 的值为:三点水 从 Queue 读取的值为:三点水 写进 Queue 的值为:四点水 从 Queue 读取的值为:四点水下一節