Maison >développement back-end >Tutoriel Python >File d'attente et multi-processus en python

File d'attente et multi-processus en python

高洛峰
高洛峰original
2017-02-25 10:10:021432parcourir

Je suis récemment tombé sur un projet qui nécessitait l'exécution de tâches sur plusieurs machines virtuelles. J'ai fait référence au code des projets précédents d'autres personnes et j'ai utilisé le multi-processus pour le gérer, j'ai donc vérifié en ligne à propos du multi-processus en python

1. Parlons d'abord de Queue (objet file d'attente)

Queue est la bibliothèque standard en python, qui peut être directement importée et citée lorsque j'étudiais auparavant. , j'ai entendu dire que les fameux "manger d'abord, tirer d'abord" et "manger d'abord, vomir d'abord", sont en fait la file d'attente mentionnée ici. Lors de la construction de la file d'attente, vous pouvez définir sa capacité. Ne mangez pas trop. Si vous mangez trop , une erreur sera signalée. Ne l'écrivez pas ou écrivez-le inférieur à 1 lors de sa construction. Le nombre signifie infini

file d'attente d'importation

q =. Queue.Queue(10)

mis dans la file d'attente Value(put)

q.put('yang')

q.put(4)

q.put(['yan','xing'])

Obtenir la valeur dans la file d'attente ()

La file d'attente par défaut est premier entré, premier sorti

>>> q.get()
'yang'
>>> ; q.get()
4
>>> q .get()
['yan', 'xing']

Quand une file d'attente est vide , si vous utilisez get pour le récupérer, il sera bloqué, il est donc généralement utilisé lors de la récupération de la file d'attente

méthode get_nowait(), cette méthode lancera une exception vide lors de l'obtention d'une valeur d'une file d'attente vide.

La méthode la plus courante consiste donc à déterminer d'abord si une file d'attente est vide, sinon si elle est vide, prenez la valeur

Méthodes couramment utilisées dans les files d'attente

Queue.qsize() renvoie la taille de la file d'attente
Queue.empty() Si la file d'attente est vide, renvoie True, sinon False
Queue.full() Si la file d'attente est pleine, renvoie True , sinon False
Queue.get([block[, timeout]]) Récupère la file d'attente, délai d'attente
Queue.get_nowait () Équivalent à Queue.get(False)
Queue.put non bloquant (item) Écrire dans la file d'attente, délai d'attente
Queue.put_nowait(item) Équivalent à Queue.put(item, False)

Utilisation du concept de sous-processus en multitraitement

2. 🎜>

à partir d'un processus d'importation multitraitement

Vous pouvez construire un sous-processus via Process

p = Process(target =fun,args=(args))

Utilisez ensuite p.start() pour démarrer le processus enfant

Utilisez ensuite la méthode p.join() pour que le processus enfant termine son exécution avant d'exécuter le processus parent

from multiprocessing import Process
import os
 
# 子进程要执行的代码
def run_proc(name):
 print 'Run child process %s (%s)...' % (name, os.getpid())
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Process(target=run_proc, args=('test',))
 print 'Process will start.'
 p.start()
 p.join()
 print 'Process end.'

File dattente et multi-processus en python

3. Utiliser pool en multitraitement

Si vous avez besoin de plusieurs processus enfants, vous pouvez envisager utiliser un pool de processus (pool) pour gérer

à partir du pool d'importation multitraitement

from multiprocessing import Pool
import os, time
 
def long_time_task(name):
 print 'Run task %s (%s)...' % (name, os.getpid())
 start = time.time()
 time.sleep(3)
 end = time.time()
 print 'Task %s runs %0.2f seconds.' % (name, (end - start))
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Pool()
 for i in range(5):
  p.apply_async(long_time_task, args=(i,))
 print 'Waiting for all subprocesses done...'
 p.close()
 p.join()
 print 'All subprocesses done.'

La méthode du pool la création de sous-processus est différente de Process. Elle est implémentée via

p.apply_async(func,args=(args)). ordinateur. , par exemple, mon ordinateur dispose maintenant de 4 processeurs, puis les sous-processus task0, task1, task2 et task3 peuvent être démarrés en même temps, et la tâche 4 démarrera après la fin du processus précédent File dattente et multi-processus en python

Le résultat après l'exécution du programme ci-dessus est en fait effectué séparément selon 1, 2 et 3 dans l'image ci-dessus. Imprimez d'abord 1, puis imprimez 2 après 3 secondes, puis imprimez 3 après 3 secondes.

le code p.close() in consiste à fermer le pool de processus et à ne plus y ajouter de processus. L'appel de la méthode join() sur l'objet Pool attendra que tous les processus enfants terminent leur exécution. doit être appelé avant d'appeler join(). Après avoir appelé close(), vous ne pouvez pas continuer à ajouter de nouveaux processus.

Vous pouvez également définir le nombre de processus pour un pool d'instances à ce moment-là

Si p=Pool(5) dans le code ci-dessus, alors tous les processus enfants peuvent être traités en même temps

3. Communication entre plusieurs sous-processus

La communication entre plusieurs sous-processus nécessite l'utilisation de la file d'attente mentionnée dans la première étape. l'exigence suivante, un sous-processus écrit les données dans la file d'attente, et un autre processus prend les données de la file d'attente,

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value
  q.put(value)
  time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(True)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break

if __name__=='__main__':
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 pw = Process(target=write, args=(q,))
 pr = Process(target=read, args=(q,))
 # 启动子进程pw,写入:
 pw.start() 
 # 等待pw结束:
 pw.join()
 # 启动子进程pr,读取:
 pr.start()
 pr.join()
 # pr进程里是死循环,无法等待其结束,只能强行终止:
 print
 print '所有数据都写入并且读完'


4. Plusieurs questions intéressantes sur le code ci-dessus

if __name__=='__main__': 
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'

Si la fonction principale est écrite comme l'exemple ci-dessus, ce que je voulais à l'origine, c'est que j'obtienne une file d'attente, que je la transmette comme paramètre à chaque processus enfant du pool de processus, mais j'ai obtenu

RuntimeError : les objets de file d'attente ne doivent être partagés qu'entre les processus par héritage

erreur, coché, l'idée générale est que l'objet file d'attente ne peut pas communiquer entre le processus parent et le processus enfant. Si vous souhaitez utiliser la file d'attente dans le pool de processus, vous devez utiliser la classe Manager de multiprocess

if __name__=='__main__':
 manager = multiprocessing.Manager()
 # 父进程创建Queue,并传给各个子进程:
 q = manager.Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,))
 time.sleep(0.5)
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'

De cette façon, cet objet file d'attente peut communiquer entre le processus parent et le processus enfant. Si vous n'utilisez pas de pool, vous n'avez pas besoin de Manager. . Vous pourrez étendre la classe Manager en multiprocessus à l'avenir.

关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁

#coding:gbk
 
from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random
 
# 写数据进程执行的代码:
def write(q,lock):
 lock.acquire() #加上锁
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value  
  q.put(value)  
 lock.release() #释放锁 
 
# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(False)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break
 
if __name__=='__main__':
 manager = multiprocessing.Manager()
 # 父进程创建Queue,并传给各个子进程:
 q = manager.Queue()
 lock = manager.Lock() #初始化一把锁
 p = Pool()
 pw = p.apply_async(write,args=(q,lock)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'

更多File dattente et multi-processus en python相关文章请关注PHP中文网!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn