Maison  >  Article  >  développement back-end  >  Exemple de code pour multi-processus et pool de processus (bibliothèque de traitement) en python

Exemple de code pour multi-processus et pool de processus (bibliothèque de traitement) en python

零下一度
零下一度original
2017-06-16 11:13:102086parcourir

Cet article présente principalement l'explication détaillée du pool multi-processus et de processus de Python (bibliothèque Processing), qui est d'une grande valeur pratique. Les amis qui en ont besoin peuvent s'y référer

Environnement : win7. +python2.7

J'ai toujours voulu apprendre le multi-processus ou le multi-threading, mais avant de regarder quelques connaissances de base et une simple introduction, je ne comprenais pas comment l'appliquer , jusqu'à ce que je voie il y a quelque temps un projet de robot d'exploration sur github qui impliquait plusieurs processus et du contenu lié au multithread, tout en lisant les points de connaissances liés à Baidu, notez maintenant certains points de connaissances pertinents et certaines applications pour un enregistrement

. Tout d'abord, parlons de ce qu'est un processus : un processus est un programme sur l'ordinateur. Une fois qu'une activité est exécutée, lorsqu'un programme est exécuté, un processus est démarré. Le processus est divisé en processus système et processus utilisateur. comme le processus est utilisé pour exécuter diverses fonctions du système d'exploitation, il s'agit d'un processus système et ils sont en cours d'exécution. Le système d'exploitation lui-même et tous les processus démarrés par vous sont des processus utilisateur. Un processus est l'unité par laquelle le système d'exploitation alloue des ressources.

Intuitivement parlant, le nom d'utilisateur marqué système dans le gestionnaire de tâches est le processus système, et celui marqué administrateur est le processus utilisateur. De plus, net est netro et le service lcacal est un service local. sur le processus Vous pouvez encyclopédie, vous devez économiser quelques efforts ici, sinon vous ne pourrez pas le reprendre.

1. Utilisation simple du multi-processus

<.>Comme le montre la figure, le multitraitement a de multiples fonctions, dont beaucoup ne sont pas encore comprises, je ne parlerai donc que de ce que je connais jusqu'à présent

<.>Création de processus

:Process(target=mainly run function, name= Le nom du processus personnalisé n'a pas besoin d'être écrit, args=(parameter))Méthode :

    is_alive() : Déterminez si le processus est vivant
  1. join([timeout]) : Le processus enfant se termine avant d'exécuter l'étape suivante. Parfois, le délai d'attente est le délai d'attente. le processus est bloqué. Le délai d'attente est défini pour que le programme puisse continuer à s'exécuter
  2. run() : Si vous ne spécifiez pas de cible lors de la création d'un objet Process, la méthode run est utilisée. du processus sera exécuté par défaut
  3. start() : démarrer le processus, distinguer run()
  4. terminate() : terminer le processus . Mettre fin au processus n'est pas si simple. Il semble qu'il serait préférable d'utiliser le package psutil, j'en écrirai davantage lorsque j'en aurai l'occasion.

  5. Parmi eux, Process démarre un processus avec start().

Attributs :


    authkey : Recherchez cette phrase dans la fonction authkey() du document : Définir la clé d'autorisation du processus. Clé d'autorisation, aucun exemple d'application pertinent n'a été trouvé jusqu'à présent. Comment cette clé est-elle utilisée ? L'article ne mentionne pas le
  1. démon : il se terminera automatiquement une fois le processus parent terminé, et il ne peut pas en générer de nouveaux. Le processus doit être défini avant start()
  2. exitcode : le processus est Aucun lors de l'exécution. S'il est -N, cela signifie qu'il a été terminé par le signal N
  3. name : Le nom du processus, personnalisé
  4. pid : Chaque processus a un numéro PID unique.
  5. 1.Process(),start(),join()

Il y a deux processus ouverts ici, p1 et The 4 dans p2, arg=(4,) est le paramètre de la fonction fun1. Le type tulpe est utilisé ici s'il y a deux paramètres ou plus, c'est arg=(paramètre 1, paramètre 2...), puis. Commencez par le processus start(), nous avons configuré pour attendre la fin des processus p1 et p2 avant d'exécuter l'étape suivante. En regardant les résultats d'exécution suivants, fun2 et fun1 commencent essentiellement à s'exécuter en même temps lorsque l'exécution est terminée (. fun1 dort pendant 4 secondes et fun2 dort pendant 6 secondes), il est exécuté print 'finish', b-a instruction

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a=time.time()
 p1=Process(target=fun1,args=(4,))
 p2 = Process(target=fun2, args=(6,))
 p1.start()
 p2.start()
 p1.join()
 p2.join()
 b=time.time()
 print &#39;finish&#39;,b-a

Jetons un coup d'œil à ce qui se passe lorsque start() et join() sont. dans différentes positions

this is fun2 Mon Jun 05 13:48:04 2017
this is fun1 Mon Jun 05 13:48:04 2017
fun1 finish Mon Jun 05 13:48:08 2017
fun2 finish Mon Jun 05 13:48:10 2017
finish 6.20300006866

Process finished with exit code 0

Résultat :

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a=time.time()
 p1=Process(target=fun1,args=(4,))
 p2 = Process(target=fun2, args=(6,))
 p1.start()
 p1.join()
 p2.start()
 p2.join()
 b=time.time()
 print &#39;finish&#39;,b-a

Regardez, maintenant nous exécutons d'abord la fonction fun1, puis exécutons fun2 une fois qu'elle est terminée, puis imprimons 'terminer', c'est-à-dire , exécutez d'abord le processus p1, puis exécutez le processus p2. Vous pouvez maintenant ressentir le charme de join(). Essayez maintenant les commentaires Supprimer join() et voyez à nouveau ce qui se passe

this is fun1 Mon Jun 05 14:19:28 2017
fun1 finish Mon Jun 05 14:19:32 2017
this is fun2 Mon Jun 05 14:19:32 2017
fun2 finish Mon Jun 05 14:19:38 2017
finish 10.1229999065

Process finished with exit code 0

Résultat :

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a=time.time()
 p1=Process(target=fun1,args=(4,))
 p2 = Process(target=fun2, args=(6,))
 p1.start()
 p2.start()
 p1.join()
 #p2.join()
 b=time.time()
 print &#39;finish&#39;,b-a
Cette fois, fun1 a fini de s'exécuter (car le processus p1 utilise join(), donc le programme principal attend p1 Après l'exécution, exécutez l'étape suivante), puis continuez à exécuter l'impression 'finish' du principal processus, et enfin se terminer une fois que fun2 a fini de s'exécuter

this is fun1 Mon Jun 05 14:23:57 2017
this is fun2 Mon Jun 05 14:23:58 2017
fun1 finish Mon Jun 05 14:24:01 2017
finish 4.05900001526
fun2 finish Mon Jun 05 14:24:04 2017

Process finished with exit code 0

2.name,daemon,is_alive():

Résultat :

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a=time.time()
 p1=Process(name=&#39;fun1进程&#39;,target=fun1,args=(4,))
 p2 = Process(name=&#39;fun2进程&#39;,target=fun2, args=(6,))
 p1.daemon=True
 p2.daemon = True
 p1.start()
 p2.start()
 p1.join()
 print p1,p2
 print &#39;进程1:&#39;,p1.is_alive(),&#39;进程2:&#39;,p2.is_alive()
 #p2.join()
 b=time.time()
 print &#39;finish&#39;,b-a
Comme vous pouvez le voir, name consiste à donner un nom au processus et à l'exécuter pour imprimer 'Process 1:',p1.is_alive(),'Process 2: ',p2.is_alive() À ce moment-là de cette phrase, le processus p1 est terminé (renvoyant False) et le processus p2 est toujours en cours d'exécution (renvoyant True), mais p2 n'a pas utilisé join(), donc le processus principal a été exécuté directement en raison de l'utilisation de daemon =. Bien sûr, il se terminera automatiquement après la fin du processus parent. Si le processus p2 ne se termine pas, l'ensemble du programme sera terminé de force

this is fun2 Mon Jun 05 14:43:49 2017
this is fun1 Mon Jun 05 14:43:49 2017
fun1 finish Mon Jun 05 14:43:53 2017
<Process(fun1进程, stopped daemon)> <Process(fun2进程, started daemon)>
进程1: False 进程2: True
finish 4.06500005722

Process finished with exit code 0
3.run()

run() n'existe pas dans Process Lors de la spécification de la fonction cible, la fonction run() est utilisée par défaut pour exécuter le programme

Résultat :

De. le résultat, on voit que le processus p n'a rien fait. Afin de rendre le processus normal Pour s'exécuter, on écrit :
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a = time.time()
 p=Process()
 p.start()
 p.join()
 b = time.time()
 print &#39;finish&#39;, b - a

La fonction objectif n'a pas de paramètres :
finish 0.0840001106262


Résultat :

La fonction objectif a des paramètres :
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1():
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(2)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a = time.time()
 p=Process()
 p.run=fun1
 p.start()
 p.join()
 b = time.time()
 print &#39;finish&#39;, b - a

Résultat :
this is fun1 Mon Jun 05 16:34:41 2017
fun1 finish Mon Jun 05 16:34:43 2017
finish 2.11500000954

Process finished with exit code 0

La fonction cible a des paramètres et une exception s'est produite. Pourquoi ? Je ne trouve pas encore la raison, mais en pratique, j'ai constaté que lorsque le dernier paramètre est donné au processus et s'exécute, il n'y a pas d'autres paramètres, cette exception se produira. Si quelqu'un le sait, faites-le-moi savoir.
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a = time.time()
 p=Process()
 p.run=fun1(2)
 p.start()
 p.join()
 b = time.time()
 print &#39;finish&#39;, b - a

2. Pool de processus
this is fun1 Mon Jun 05 16:36:27 2017
fun1 finish Mon Jun 05 16:36:29 2017
Process Process-1:
Traceback (most recent call last):
 File "E:\Anaconda2\lib\multiprocessing\process.py", line 258, in _bootstrap
 self.run()
TypeError: &#39;NoneType&#39; object is not callable
finish 2.0529999733

Process finished with exit code 0

对于需要使用几个甚至十几个进程时,我们使用Process还是比较方便的,但是如果要成百上千个进程,用Process显然太笨了,multiprocessing提供了Pool类,即现在要讲的进程池,能够将众多进程放在一起,设置一个运行进程上限,每次只运行设置的进程数,等有进程结束,再添加新的进程

Pool(processes =num):设置运行进程数,当一个进程运行完,会添加新的进程进去

apply_async(函数,(参数)):非阻塞,其中参数是tulpe类型,

apply(函数,(参数)):阻塞

close():关闭pool,不能再添加新的任务

terminate():结束运行的进程,不再处理未完成的任务

join():和Process介绍的作用一样, 但要在close或terminate之后使用。

1.单个进程池

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time

def fun1(t):
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a=time.time()
 pool = Pool(processes =3) # 可以同时跑3个进程
 for i in range(3,8):
  pool.apply_async(fun1,(i,))
 pool.close()
 pool.join()
 b=time.time()
 print &#39;finish&#39;,b-a

结果:

this is fun1 Mon Jun 05 15:15:38 2017
this is fun1 Mon Jun 05 15:15:38 2017
this is fun1 Mon Jun 05 15:15:38 2017
fun1 finish Mon Jun 05 15:15:41 2017
this is fun1 Mon Jun 05 15:15:41 2017
fun1 finish Mon Jun 05 15:15:42 2017
this is fun1 Mon Jun 05 15:15:42 2017
fun1 finish Mon Jun 05 15:15:43 2017
fun1 finish Mon Jun 05 15:15:47 2017
fun1 finish Mon Jun 05 15:15:49 2017
finish 11.1370000839

Process finished with exit code 0

从上面的结果可以看到,设置了3个运行进程上限,15:15:38这个时间同时开始三个进程,当第一个进程结束时(参数为3秒那个进程),会添加新的进程,如此循环,直至进程池运行完再执行主进程语句b=time.time() print 'finish',b-a .这里用到非阻塞apply_async(),再来对比下阻塞apply()

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time

def fun1(t):
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a=time.time()
 pool = Pool(processes =3) # 可以同时跑3个进程
 for i in range(3,8):
  pool.apply(fun1,(i,))
 pool.close()
 pool.join()
 b=time.time()
 print &#39;finish&#39;,b-a

结果:

this is fun1 Mon Jun 05 15:59:26 2017
fun1 finish Mon Jun 05 15:59:29 2017
this is fun1 Mon Jun 05 15:59:29 2017
fun1 finish Mon Jun 05 15:59:33 2017
this is fun1 Mon Jun 05 15:59:33 2017
fun1 finish Mon Jun 05 15:59:38 2017
this is fun1 Mon Jun 05 15:59:38 2017
fun1 finish Mon Jun 05 15:59:44 2017
this is fun1 Mon Jun 05 15:59:44 2017
fun1 finish Mon Jun 05 15:59:51 2017
finish 25.1610000134

Process finished with exit code 0

可以看到,阻塞是当一个进程结束后,再进行下一个进程,一般我们都用非阻塞apply_async()

2.多个进程池

上面是使用单个进程池的,对于多个进程池,我们可以用for循环,直接看代码

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time

def fun1(t):
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a=time.time()
 pool = Pool(processes =3) # 可以同时跑3个进程
 for fun in [fun1,fun2]:
  for i in range(3,8):
   pool.apply_async(fun,(i,))
 pool.close()
 pool.join()
 b=time.time()
 print &#39;finish&#39;,b-a

结果:

this is fun1 Mon Jun 05 16:04:38 2017
this is fun1 Mon Jun 05 16:04:38 2017
this is fun1 Mon Jun 05 16:04:38 2017
fun1 finish Mon Jun 05 16:04:41 2017
this is fun1 Mon Jun 05 16:04:41 2017
fun1 finish Mon Jun 05 16:04:42 2017
this is fun1 Mon Jun 05 16:04:42 2017
fun1 finish Mon Jun 05 16:04:43 2017
this is fun2 Mon Jun 05 16:04:43 2017
fun2 finish Mon Jun 05 16:04:46 2017
this is fun2 Mon Jun 05 16:04:46 2017
fun1 finish Mon Jun 05 16:04:47 2017
this is fun2 Mon Jun 05 16:04:47 2017
fun1 finish Mon Jun 05 16:04:49 2017
this is fun2 Mon Jun 05 16:04:49 2017
fun2 finish Mon Jun 05 16:04:50 2017
this is fun2 Mon Jun 05 16:04:50 2017
fun2 finish Mon Jun 05 16:04:52 2017
fun2 finish Mon Jun 05 16:04:55 2017
fun2 finish Mon Jun 05 16:04:57 2017
finish 19.1670000553

Process finished with exit code 0

看到了,在fun1运行完接着运行fun2.

另外对于没有参数的情况,就直接 pool.apply_async(funtion),无需写上参数.

在学习编写程序过程,曾遇到不用if _name_ == '_main_':而直接运行程序,这样结果会出错,经查询,在Windows上要想使用进程模块,就必须把有关进程的代码写在当前.py文件的if _name_ == ‘_main_' :语句的下面,才能正常使用Windows下的进程模块。Unix/Linux下则不需要。原因有人这么说:在执行的時候,由于你写的 py 会被当成module 读进执行。所以,一定要判断自身是否为 _main_。也就是要:

if name == ‘main&#39; :
# do something.

这里我自己还搞不清楚,期待以后能够理解

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn