進程與執行緒概述:
#很多同學都聽過,現代作業系統像是Mac OS X,UNIX,Linux,Windows等,都是支援「多任務」的作業系統。 什麼叫「多任務」呢?簡單地說,就是作業系統可以同時運行多個任務。打個比方,你一邊在用瀏覽器上網,一邊在聽MP3,一邊在用Word趕作業,這就是多任務,至少同時有3個任務正在運行。還有很多任務悄悄地在後台同時運行著,只是桌面上沒有顯示而已。 現在,多核心CPU已經非常普及了,但是,即使過去的
單核心CPU,也可以執行多任務。由於CPU執行程式碼都是順序執行的,那麼,單核心CPU是怎麼執行多任務的呢?
答案就是作業系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒…這樣重複執行下去。表面上看,每個任務都是交替執行的,但是,由於CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣。
真正的並行執行多任務只能在多核心CPU上實現,但是,由於任務數量遠遠多於CPU的核心數量,所以,作業系統也會自動把很多任務輪流調度到每個核心執行。 對作業系統來說,一個任務就是一個進程(Process),例如開啟一個瀏覽器就是啟動一個瀏覽器進程,開啟一個記事本就啟動了一個記事本進程,打開兩個記事本就啟動了兩個記事本進程,打開一個Word就啟動了一個Word進程。 有些進程還不只同時做一件事,例如Word,它可以同時進行打字、拼字檢查、列印等事情。在一個進程內部,要
同時幹多件事,就需要同時運行多個“子任務”,我們把進程內的這些“子任務”稱為線程(Thread)。
由於每個行程至少要做一件事,所以,一個行程至少有
一個執行緒。當然,像Word這種複雜的進程可以有多個線程,多個線程可以同時執行,多線程的執行方式和多進程是一樣的,也是由作業系統在
多個線程之間快速切換,讓每個線程都短暫地交替運行,看起來就像同時執行一樣。當然,真正地同時執行多執行緒需要
多核心CPU才可能實現。 我們前面所寫的所有的Python程序,都是執行單任務的進程,也就是只有一個執行緒。如果我們要同時執行多個任務怎麼辦?
還有一個方法是啟動一個進程,在一個進程內啟動多個線程,這樣,多個執行緒也可以一塊執行多個任務。
。
########## ###########多重行程模式;###################多執行緒模式;############################################################# #####多行程+多執行緒模式。 ######
同時執行多個任務通常各個任務之間並不是沒有關聯的,而是需要相互通信和協調,有時,任務1必須暫停等待任務2完成後才能繼續執行,有時,任務3和任務4又不能同時執行,所以,多進程和多執行緒的程式的複雜度要遠高於我們前面寫的單進程單執行緒的程式。
因為複雜度高,調試困難,所以,不是迫不得已,我們也不想寫多任務。但是,有很多時候,沒有多工還真不行。想想在電腦上看電影,就必須由一個線程播放視頻,另一個線程播放音頻,否則,單線程實現的話就只能先把視頻播放完再播放音頻,或者先把音頻播放完再播放視頻,這顯然是不行的。
Python既支援多進程,又支援多線程,我們會討論如何編寫這兩種多任務程式。
#初識:
##要讓Python程式實現多進程(multiprocessing),我們先了解作業系統的相關知識。 Unix/Linux作業系統提供了一個fork()系統調用,它非常特殊。普通的函數調用,調用一次,返回一次,但是fork()
調用一次,返回
兩次,因為作業系統自動把當前進程(稱為父進程)複製了一份(稱為子進程),然後,分別在父進程和子進程內返回。 子程序永遠回傳0,而父行程回傳子程序的ID# 。這樣做的理由是,一個父進程可以fork出很多子進程,所以,父進程要記下每個子進程的ID,而子進程只需要呼叫getppid()
就可以拿到父行程的ID。 Python的os模組封裝了常見的系統調用,其中就包含fork
,可以在Python程式中輕鬆建立子程序:
import os print('Process (%s) start...' % os.getpid()) # Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else: print('I (%s) just created a child process (%s).' % (os.getpid(), pid)) # Process (44587) start... # I (44587) just created a child process (44588). # I am child process (44588) and my parent is 44587.
#由於Windows沒有fork調用,上面的程式碼在Windows上無法運行。由於Mac系統是基於BSD(Unix的一種)內核,所以,在Mac下運行是沒有問題的,推薦大家用Mac學Python!有了
fork調用,一個進程在接到新任務時就可以複製出一個子進程來處理新任務,常見的Apache伺服器就是由父進程監聽端口,每當有新的http請求時,就fork出子程序來處理新的http請求。
multiprocessing模組:
如果你打算寫多進程的服務程序,Unix/Linux無疑是正確的選擇。由於Windows沒有fork調用,難道在Windows上無法用Python編寫多進程的程式?由於Python是跨平台的,自然也應該提供一個跨平台的多進程支援。 multiprocessing
模組就是跨平台版本的多進程模組。 multiprocessing
模組提供了一個Process
類別來代表一個進程對象,下面的例子演示了啟動一個子進程並等待其結束:
import os import time # 子进程要执行的代码 def run_proc(name): time.sleep(1) print('Run child process %s (%s)...' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) # args里面为何要用,隔开? p.start() # 子进程启动,不加这个子进程不执行 p.join() # 等待子进程p的执行完毕后再向下执行,不加此项,主程序执行完毕,子进程依然会继续执行不受影响 print('Child process end.'), # Parent process 8428. # Run child process test (9392)... # Child process end.
<span style="font-size: 13px; color: #800000">Process实例化时执行self._args = tuple(args)操作,如果不用,隔开生成的slef._args就是一个个字母了,传入两个参数以上是就不用加,号了,如下:<br></span>
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):assert group is None, 'group argument must be None for now'count = next(_process_counter) self._identity = _current_process._identity + (count,) self._config = _current_process._config.copy() self._parent_pid = os.getpid() self._popen = None self._target = target self._args = tuple(args) a =('ers') b = tuple(a)print(b)# ('e', 'r', 's')a1 =('ers','gte') b1 = tuple(a1)print(b1)# ('ers', 'gte')Process程式碼##
###### ####Pool進程池:###############如果要啟動大量的子進程,可以用進程池的方式批次建立子進程:###### ## #######
from multiprocessing import Pool,cpu_count import os, time, random def long_time_task(name): print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) def Bar(arg): print('-->exec done:',arg,os.getpid()) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Pool(cpu_count()) # 获取当前cpu核数,多核cpu的情况下多进程才能实现真正的并发 for i in range(5): # p.apply_async(func=long_time_task, args=(i,), callback=Bar) #callback回调 执行完func后再执行callback 用主程序执行 p.apply_async(long_time_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() # !等待进程池执行完毕,不然主进程执行完毕后,进程池直接关闭 print('All subprocesses done.') # Parent process 4492. # Waiting for all subprocesses done... # Run task 0 (3108)... # Run task 1 (7936)... # Run task 2 (11236)... # Run task 3 (8284)... # Task 2 runs 0.86 seconds. # Run task 4 (11236)... # Task 0 runs 1.34 seconds. # Task 1 runs 1.49 seconds. # Task 3 runs 2.62 seconds. # Task 4 runs 1.90 seconds. # All subprocesses done.###
重点:另进程池里的进程执行完毕后,进程关闭自动销毁,不再占用内存,同理,非进程池创建的子进程,执行完毕后也是自动销毁,具体测试如下:
from multiprocessing import Pool,cpu_countimport os, time, randomdef long_time_task(name):print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time()print('Task %s runs %0.2f seconds.' % (name, (end - start)))def count_process():import psutil pids = psutil.pids() process_name = []for pid in pids: p = psutil.Process(pid) process_name.append(p.name()) # 获取进程名# process_name.append(p.num_threads()) # 获取进程的线程数# print process_nameprint len(process_name)if __name__=='__main__':print('Parent process %s.' % os.getpid()) p = Pool(4)for i in range(5): p.apply_async(long_time_task, args=(i,))print('Waiting for all subprocesses done...') count_process() # 进程池开始时进程数(包含系统其他应用进程) p.close() p.join() count_process() # 进程池关闭时进程数print('All subprocesses done.')# Parent process 8860.# Waiting for all subprocesses done...# Run task 0 (2156)...# Run task 1 (1992)...# Run task 2 (10680)...# Run task 3 (11216)...# 109 开始# Task 2 runs 0.93 seconds.# Run task 4 (10680)...# Task 1 runs 1.71 seconds.# Task 3 runs 2.01 seconds.# Task 0 runs 2.31 seconds.# Task 4 runs 2.79 seconds.# 105 结束# All subprocesses done.
代码解读:
对Pool
对象调用join()
方法会等待所有子进程执行完毕,调用join()
之前必须先调用close()
,调用close()
之后就不能继续添加新的Process
了。
请注意输出的结果,task 0
,1
,2
,3
是立刻执行的,而task 4
要等待前面某个task完成后才执行,这是因为Pool
的默认大小在我的电脑上是4,因此,最多同时执行4个进程。这是Pool
有意设计的限制,并不是操作系统的限制。如果改成:
p = Pool(5)
就可以同时跑5个进程。
由于Pool
的默认大小是CPU的核数,如果你不幸拥有8核CPU,你要提交至少9个子进程才能看到上面的等待效果。
进程间通信:
Process
之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing
模块包装了底层的机制,提供了Queue
、Pipes
等多种方式来交换数据。
我们以Queue
为例,在父进程中创建两个子进程,一个往Queue
里写数据,一个从Queue
里读数据:
from multiprocessing import Process, Queue import os, time, random # 写数据进程执行的代码: def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) # 读数据进程执行的代码: def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) print('Get %s from queue.' % value) if __name__=='__main__': # 父进程创建Queue,并传给各个子进程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 启动子进程pw,写入: pw.start() # 启动子进程pr,读取: pr.start() # 等待pw结束: pw.join() # pr进程里是死循环,无法等待其结束,只能强行终止: pr.terminate() # 强制关闭子进程 # Process to write: 9472 # Put A to queue... # Process to read: 3948 # Get A from queue. # Put B to queue... # Get B from queue. # Put C to queue... # Get C from queue.
在Unix/Linux下,multiprocessing
模块封装了fork()
调用,使我们不需要关注fork()
的细节。由于Windows没有fork
调用,因此,multiprocessing
需要“模拟”出fork
的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所有,如果multiprocessing
在Windows下调用失败了,要先考虑是不是pickle失败了。
进程间共享数据:
有时候我们不仅仅需要进程间数据传输,也需要多进程间进行数据共享,即可以使用同一全局变量;如:为何下面程序的列表输出为空?
from multiprocessing import Process, Manager import os # manager = Manager() vip_list = [] #vip_list = manager.list() def testFunc(cc): vip_list.append(cc) print 'process id:', os.getpid() if __name__ == '__main__': threads = [] for ll in range(10): t = Process(target=testFunc, args=(ll,)) t.daemon = True threads.append(t) for i in range(len(threads)): threads[i].start() for j in range(len(threads)): threads[j].join() print "------------------------" print 'process id:', os.getpid() print vip_list # process id: 9436 # process id: 11120 # process id: 10636 # process id: 1380 # process id: 10976 # process id: 10708 # process id: 2524 # process id: 9392 # process id: 10060 # process id: 8516 # ------------------------ # process id: 9836 # []
如果你了解 python 的多线程模型,GIL 问题,然后了解多线程、多进程原理,上述问题不难回答,不过如果你不知道也没关系,跑一下上面的代码你就知道是什么问题了。因为进程间内存是独立的
正如上面提到的,在进行并发编程时,最好尽可能避免使用共享状态。在使用多个进程时尤其如此。但是,如果您确实需要使用一些共享数据,那么多处理提供了两种方法。
① 共享内存:
数据可以使用值或数组存储在共享内存映射中。例如,下面的代码:
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print num.value print arr[:] # 3.1415927 # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
在创建num和arr时使用的“i”和“i”参数是数组模块使用的类型的类型:“表示双精度浮点数”,“i”表示一个已签名的整数。这些共享对象将是进程和线程安全的。为了更灵活地使用共享内存,您可以使用多处理。sharedctypes模块支持创建从共享内存中分配的任意类型的ctypes对象。
② 服务进程:
manager()返回的manager对象控制一个保存Python对象的服务器进程,并允许其他进程使用代理来操作它们。manager()返回的管理器将支持类型列<span class="pre">list</span>
, <span class="pre">dict</span>
, <span class="pre">Namespace</span>
, <span class="pre">Lock</span>
, <span class="pre">RLock</span>
, <span class="pre">Semaphore</span>
, <span class="pre">BoundedSemaphore</span>
, <span class="pre">Condition</span>
, <span class="pre">Event</span>
, <span class="pre">Queue</span>
, <span class="pre">Value</span>
and <span class="pre">Array</span>
。如下:
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': manager = Manager() d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print d print l # {0.25: None, 1: '1', '2': 2} # [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
服务器进程管理器比使用共享内存对象更灵活,因为它们可以用来支持任意对象类型。另外,单个管理器可以通过网络上不同计算机上的进程共享。但是,它们比使用共享内存要慢。
更多-》》点击
小结
在Unix/Linux下,可以使用fork()
调用实现多进程。
要实现跨平台的多进程,可以使用multiprocessing
模块。
进程间通信是通过Queue(多进程间)
、Pipes(两个进程间)
等实现的。
补充小知识点-》父进程开辟子进程,子进程开辟子子进程,如果把子进程杀掉,子子进程会被杀死吗?
import timefrom multiprocessing import Processimport osdef count_process():import psutil pids = psutil.pids()print len(pids)def test3(): count_process()for i in range(10):print "test3 %s"%os.getpid() time.sleep(0.5)def test1():print "test1 %s"%os.getpid() p2 = Process(target=test3, name="protest2") p2.start() p2.join()if __name__ == '__main__': count_process() p1 = Process(target=test1, name="protest1") p1.start() time.sleep(2) p1.terminate() time.sleep(2) count_process()for i in range(10):print(i) time.sleep(1)# 86# test1 9500# 88# test3 3964# test3 3964# test3 3964# test3 3964# test3 3964# test3 3964# test3 3964# test3 3964# 87# 0# test3 3964# test3 3964# 1# 2# 3# 4# 5# 6# 7# 8# 9
以上是Python開發之進程與執行緒概述的詳細內容。更多資訊請關注PHP中文網其他相關文章!