首頁 >後端開發 >Python教學 >python中的Queue與多進程

python中的Queue與多進程

高洛峰
高洛峰原創
2017-02-25 10:10:021418瀏覽

最近接觸一個項目,要在多個虛擬機器中運行任務,參考別人之前項目的程式碼,採用了多進程來處理,於是上網查了查python中的多進程

一、先說說Queue(佇列物件)

Queue是python中的標準函式庫,可以直接import 引用,之前學習的時候有聽過著名的「先吃先拉”與“後吃先吐”,其實就是這裡說的隊列,隊列的構造的時候可以定義它的容量,別吃撐了,吃多了,就會報錯,構造的時候不寫或者寫個小於1的數則表示無限多

import Queue

q = Queue.Queue(10)

向佇列中放值(put)

q.put('yang')

#q.put(4)

q.put(['yan','xing'])

在佇列中取值get()

預設的佇列是先進先出的

>>> q.get()
'yang'
>>> q.get()
4
>>> q .get()
['yan', 'xing']

當一個隊列為空的時候如果再用get取則會堵塞,所以取隊列的時候一般是用到

get_nowait()方法,這種方法在向一個空隊列取值的時候會拋一個Empty異常

所以更常用的方法是先判斷一個隊列是否為空,如果不為空則取值

佇列中常用的方法

Queue.qsize() 傳回佇列的大小
Queue.empty() 如果佇列為空,返回True,反之False
Queue.full() 如果佇列滿了,返回True,反之False
Queue.get([block[, timeout]]) 取得佇列,timeout等待時間
Queue.get_nowait () 相當Queue.get(False)
非阻塞Queue.put(item) 寫入隊列,timeout等待時間
Queue.put_nowait(item) 相當Queue.put(item, False)

二、multiprocessing中使用子程序概念

from multiprocessing import Process

可以用Process來建構子程序

p = Process(target =fun,args=(args))

再透過p.start()來啟動子進程

再透過p.join()方法使得子進程運行結束後再執行父進程

from multiprocessing import Process
import os
 
# 子进程要执行的代码
def run_proc(name):
 print 'Run child process %s (%s)...' % (name, os.getpid())
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Process(target=run_proc, args=('test',))
 print 'Process will start.'
 p.start()
 p.join()
 print 'Process end.'

python中的Queue與多進程

#三、在multiprocessing中使用pool

##如果需要多個子進程時可以考慮使用進程池(pool)來管理

from multiprocessing import Pool

from multiprocessing import Pool
import os, time
 
def long_time_task(name):
 print 'Run task %s (%s)...' % (name, os.getpid())
 start = time.time()
 time.sleep(3)
 end = time.time()
 print 'Task %s runs %0.2f seconds.' % (name, (end - start))
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Pool()
 for i in range(5):
  p.apply_async(long_time_task, args=(i,))
 print 'Waiting for all subprocesses done...'
 p.close()
 p.join()
 print 'All subprocesses done.'

#pool建立子程序的方法與Process不同,是透過

p.apply_async(func,args=(args))實現,一個池子裡能同時運行的任務是取決你電腦的cpu數量,如我的電腦現在是有4個cpu,那會子進程task0,task1,task2,task3可以同時啟動,task4則在之前的一個某個進程結束後才開始

python中的Queue與多進程

上面的程式運行後的結果其實是按照上圖1,2,3分開進行的,先打印1,3秒後打印2,再3秒後打印3

代碼中的p. close()是關掉進程池子,是不再在裡面加入進程了,對Pool物件呼叫join()方法會等待所有子程序執行完畢,呼叫join()之前必須先呼叫close(),呼叫close()之後就不能繼續增加新的Process了。

當時也可以是實例pool的時候給它定義一個行程的多少

如果上面的程式碼中p=Pool(5)那麼所有的子程式就可以同時進行

三、多個子進程間的通訊

多個子進程間的通訊就要採用第一步中說到的Queue,例如有以下的需求,一個子進程向佇列中寫數據,另外一個程序從佇列中取數據,

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value
  q.put(value)
  time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(True)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break

if __name__=='__main__':
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 pw = Process(target=write, args=(q,))
 pr = Process(target=read, args=(q,))
 # 启动子进程pw,写入:
 pw.start() 
 # 等待pw结束:
 pw.join()
 # 启动子进程pr,读取:
 pr.start()
 pr.join()
 # pr进程里是死循环,无法等待其结束,只能强行终止:
 print
 print '所有数据都写入并且读完'


##關於上面程式碼的幾個有趣的問題

if __name__=='__main__': 
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'

如果main函數寫成上面的樣本,本來我想要的是將會得到一個佇列,將其作為參數傳入進程池子裡的每個子進程,但是卻得到

RuntimeError: Queue objects should only be shared between processes through inheritance

#的錯誤,查了下,大意是隊列物件不能在父進程與子進程間通信,這個如果想要使用進程池中使用佇列則要使用multiprocess的Manager類別

if __name__=='__main__':
 manager = multiprocessing.Manager()
 # 父进程创建Queue,并传给各个子进程:
 q = manager.Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,))
 time.sleep(0.5)
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'

這樣這個佇列物件就可以在父進程與子進程間通信,不用池則不需要Manager,以後再擴展multiprocess中的Manager類別吧

关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁

#coding:gbk
 
from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random
 
# 写数据进程执行的代码:
def write(q,lock):
 lock.acquire() #加上锁
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value  
  q.put(value)  
 lock.release() #释放锁 
 
# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(False)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break
 
if __name__=='__main__':
 manager = multiprocessing.Manager()
 # 父进程创建Queue,并传给各个子进程:
 q = manager.Queue()
 lock = manager.Lock() #初始化一把锁
 p = Pool()
 pw = p.apply_async(write,args=(q,lock)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'

更多python中的Queue與多進程相关文章请关注PHP中文网!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn