Home > Article > Backend Development > Example code for multi-process and process pool (Processing library) in python
This article mainly introduces the detailed explanation of python's multi-process and process pool (Processing library), which is of great practical value. Friends who need it can refer to it
Environment:win7+python2.7
I have always wanted to learn multi-process or multi-threading, but before I just looked at some basic knowledge and a simple introduction, I couldn't understand how to apply it, until I saw a crawler project on github some time ago that involved multiple Process, multi-threading related content, while reading Baidu related knowledge points, now write down some relevant knowledge points and some applications for a record.
First of all, let’s talk about what a process is: a process is a program on the computer Once an activity is executed, when a program is run, a process is started. The process is divided into system process and user process. As long as the process is used to complete various functions of the operating system, it is a system process, and they are in a running state. The operating system itself; and all processes started by you are user processes. A process is the unit by which the operating system allocates resources.
Intuitively speaking, the user name marked system in the task manager is the system process, and the one marked administrator is the user process. In addition, net is netro, and lcacal service is local service. More specific information about the process. You can encyclopedia, you have to save some effort here, otherwise you will not be able to take it back.
1. Simple use of multi-process
As shown in the figure, multiprocessing has multiple functions, many I haven’t understood it yet, so I’ll just talk about what I know so far.
Process Creation:Process(target=mainly running function, name= The custom process name does not need to be written, args=(parameter))
Method:
is_alive(): Determine whether the process is alive
join([timeout]): The child process ends before executing the next step. Timeout is the timeout. Sometimes the process is blocked. The timeout is set so that the program can continue to run.
run(): If you do not specify a target when creating a Process object, the run method of Process will be executed by default
start(): Start the process, distinguish run()
terminate(): Terminate the process. Terminating the process is not that simple. It seems that it would be better to use the psutil package. I will write down more when I have the opportunity.
Among them, Process starts a process with start().
Attributes:
authkey: Find this sentence in the authkey() function in the document: Set authorization key of process. Authorization key, no relevant application examples have been found so far. How is this key used? The article does not mention it
daemon: It automatically terminates after the parent process terminates, and it cannot generate new ones. Process must be set before start()
1.Process(),start(),join()
# -*- coding:utf-8 -*- from multiprocessing import Process import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a=time.time() p1=Process(target=fun1,args=(4,)) p2 = Process(target=fun2, args=(6,)) p1.start() p2.start() p1.join() p2.join() b=time.time() print 'finish',b-aThere are two processes opened here, p1 and p2, The 4 in arg=(4,) is the parameter of the fun1 function. The tulpe type is used here. If there are two parameters or more, it is arg=(parameter 1, parameter 2...), and then use start() to start the process. We set up to wait for the p1 and p2 processes to end before executing the next step. Looking at the following running results, fun2 and fun1 basically start running at the same time. When the running is completed (fun1 sleeps for 4 seconds and fun2 sleeps for 6 seconds), print ' finish', b-a statement
this is fun2 Mon Jun 05 13:48:04 2017 this is fun1 Mon Jun 05 13:48:04 2017 fun1 finish Mon Jun 05 13:48:08 2017 fun2 finish Mon Jun 05 13:48:10 2017 finish 6.20300006866 Process finished with exit code 0Let’s take a look at what happens when start() and join() are in different positions
# -*- coding:utf-8 -*- from multiprocessing import Process import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a=time.time() p1=Process(target=fun1,args=(4,)) p2 = Process(target=fun2, args=(6,)) p1.start() p1.join() p2.start() p2.join() b=time.time() print 'finish',b-aResult:
this is fun1 Mon Jun 05 14:19:28 2017 fun1 finish Mon Jun 05 14:19:32 2017 this is fun2 Mon Jun 05 14:19:32 2017 fun2 finish Mon Jun 05 14:19:38 2017 finish 10.1229999065 Process finished with exit code 0Look, now it’s the first step Run the fun1 function, then run fun2 and then print 'finish', that is, first run the process p1 and then run the process p2. You will feel the charm of join(). Now try to comment out join() and see if it works again. What happened
# -*- coding:utf-8 -*- from multiprocessing import Process import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a=time.time() p1=Process(target=fun1,args=(4,)) p2 = Process(target=fun2, args=(6,)) p1.start() p2.start() p1.join() #p2.join() b=time.time() print 'finish',b-aResult:
this is fun1 Mon Jun 05 14:23:57 2017 this is fun2 Mon Jun 05 14:23:58 2017 fun1 finish Mon Jun 05 14:24:01 2017 finish 4.05900001526 fun2 finish Mon Jun 05 14:24:04 2017 Process finished with exit code 0This time fun1 has been run (because the p1 process uses join(), so the main program waits for p1 to finish running and then executes the next step), and then continues to run the main program The print 'finish' of the process ends when fun2 is finished running
2.name,daemon,is_alive():
# -*- coding:utf-8 -*- from multiprocessing import Process import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a=time.time() p1=Process(name='fun1进程',target=fun1,args=(4,)) p2 = Process(name='fun2进程',target=fun2, args=(6,)) p1.daemon=True p2.daemon = True p1.start() p2.start() p1.join() print p1,p2 print '进程1:',p1.is_alive(),'进程2:',p2.is_alive() #p2.join() b=time.time() print 'finish',b-aResult:
this is fun2 Mon Jun 05 14:43:49 2017 this is fun1 Mon Jun 05 14:43:49 2017 fun1 finish Mon Jun 05 14:43:53 2017 <Process(fun1进程, stopped daemon)> <Process(fun2进程, started daemon)> 进程1: False 进程2: True finish 4.06500005722 Process finished with exit code 0You can see that name is to give a name to the process. When the sentence print 'process 1:', p1.is_alive(), 'process 2:', p2.is_alive() is reached, the p1 process has ended. (returns False), the p2 process is still running (returns True), but p2 does not use join(), so the main process is executed directly. Because daemon=Ture is used, the parent process terminates automatically after it terminates, and the p2 process is forced before it ends. Ending the entire program.
3.run()
run()When Process does not specify a target function, the run() function is used by default to run the program,# -*- coding:utf-8 -*- from multiprocessing import Process import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a = time.time() p=Process() p.start() p.join() b = time.time() print 'finish', b - aResult:
finish 0.0840001106262It can be seen from the result that process p did nothing. In order to let the process run normally, we Jiangzi wrote:
# -*- coding:utf-8 -*- from multiprocessing import Process import time def fun1(): print 'this is fun1',time.ctime() time.sleep(2) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a = time.time() p=Process() p.run=fun1 p.start() p.join() b = time.time() print 'finish', b - aResult:
this is fun1 Mon Jun 05 16:34:41 2017 fun1 finish Mon Jun 05 16:34:43 2017 finish 2.11500000954 Process finished with exit code 0The target function has parameters:
# -*- coding:utf-8 -*- from multiprocessing import Process import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a = time.time() p=Process() p.run=fun1(2) p.start() p.join() b = time.time() print 'finish', b - aResult:
this is fun1 Mon Jun 05 16:36:27 2017 fun1 finish Mon Jun 05 16:36:29 2017 Process Process-1: Traceback (most recent call last): File "E:\Anaconda2\lib\multiprocessing\process.py", line 258, in _bootstrap self.run() TypeError: 'NoneType' object is not callable finish 2.0529999733 Process finished with exit code 0The target function has parameters and an exception occurred. Why? I I can't find the reason yet, but I found in practice that when the last parameter is given to the process to run, and there are no other parameters, this exception will occur. If anyone knows, please let me know.
二.Process Pool
对于需要使用几个甚至十几个进程时,我们使用Process还是比较方便的,但是如果要成百上千个进程,用Process显然太笨了,multiprocessing提供了Pool类,即现在要讲的进程池,能够将众多进程放在一起,设置一个运行进程上限,每次只运行设置的进程数,等有进程结束,再添加新的进程
Pool(processes =num):设置运行进程数,当一个进程运行完,会添加新的进程进去
apply_async(函数,(参数)):非阻塞,其中参数是tulpe类型,
apply(函数,(参数)):阻塞
close():关闭pool,不能再添加新的任务
terminate():结束运行的进程,不再处理未完成的任务
join():和Process介绍的作用一样, 但要在close或terminate之后使用。
1.单个进程池
# -*- coding:utf-8 -*- from multiprocessing import Pool import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a=time.time() pool = Pool(processes =3) # 可以同时跑3个进程 for i in range(3,8): pool.apply_async(fun1,(i,)) pool.close() pool.join() b=time.time() print 'finish',b-a
结果:
this is fun1 Mon Jun 05 15:15:38 2017 this is fun1 Mon Jun 05 15:15:38 2017 this is fun1 Mon Jun 05 15:15:38 2017 fun1 finish Mon Jun 05 15:15:41 2017 this is fun1 Mon Jun 05 15:15:41 2017 fun1 finish Mon Jun 05 15:15:42 2017 this is fun1 Mon Jun 05 15:15:42 2017 fun1 finish Mon Jun 05 15:15:43 2017 fun1 finish Mon Jun 05 15:15:47 2017 fun1 finish Mon Jun 05 15:15:49 2017 finish 11.1370000839 Process finished with exit code 0
从上面的结果可以看到,设置了3个运行进程上限,15:15:38这个时间同时开始三个进程,当第一个进程结束时(参数为3秒那个进程),会添加新的进程,如此循环,直至进程池运行完再执行主进程语句b=time.time() print 'finish',b-a .这里用到非阻塞apply_async(),再来对比下阻塞apply()
# -*- coding:utf-8 -*- from multiprocessing import Pool import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a=time.time() pool = Pool(processes =3) # 可以同时跑3个进程 for i in range(3,8): pool.apply(fun1,(i,)) pool.close() pool.join() b=time.time() print 'finish',b-a
结果:
this is fun1 Mon Jun 05 15:59:26 2017 fun1 finish Mon Jun 05 15:59:29 2017 this is fun1 Mon Jun 05 15:59:29 2017 fun1 finish Mon Jun 05 15:59:33 2017 this is fun1 Mon Jun 05 15:59:33 2017 fun1 finish Mon Jun 05 15:59:38 2017 this is fun1 Mon Jun 05 15:59:38 2017 fun1 finish Mon Jun 05 15:59:44 2017 this is fun1 Mon Jun 05 15:59:44 2017 fun1 finish Mon Jun 05 15:59:51 2017 finish 25.1610000134 Process finished with exit code 0
可以看到,阻塞是当一个进程结束后,再进行下一个进程,一般我们都用非阻塞apply_async()
2.多个进程池
上面是使用单个进程池的,对于多个进程池,我们可以用for循环,直接看代码
# -*- coding:utf-8 -*- from multiprocessing import Pool import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a=time.time() pool = Pool(processes =3) # 可以同时跑3个进程 for fun in [fun1,fun2]: for i in range(3,8): pool.apply_async(fun,(i,)) pool.close() pool.join() b=time.time() print 'finish',b-a
结果:
this is fun1 Mon Jun 05 16:04:38 2017 this is fun1 Mon Jun 05 16:04:38 2017 this is fun1 Mon Jun 05 16:04:38 2017 fun1 finish Mon Jun 05 16:04:41 2017 this is fun1 Mon Jun 05 16:04:41 2017 fun1 finish Mon Jun 05 16:04:42 2017 this is fun1 Mon Jun 05 16:04:42 2017 fun1 finish Mon Jun 05 16:04:43 2017 this is fun2 Mon Jun 05 16:04:43 2017 fun2 finish Mon Jun 05 16:04:46 2017 this is fun2 Mon Jun 05 16:04:46 2017 fun1 finish Mon Jun 05 16:04:47 2017 this is fun2 Mon Jun 05 16:04:47 2017 fun1 finish Mon Jun 05 16:04:49 2017 this is fun2 Mon Jun 05 16:04:49 2017 fun2 finish Mon Jun 05 16:04:50 2017 this is fun2 Mon Jun 05 16:04:50 2017 fun2 finish Mon Jun 05 16:04:52 2017 fun2 finish Mon Jun 05 16:04:55 2017 fun2 finish Mon Jun 05 16:04:57 2017 finish 19.1670000553 Process finished with exit code 0
看到了,在fun1运行完接着运行fun2.
另外对于没有参数的情况,就直接 pool.apply_async(funtion),无需写上参数.
在学习编写程序过程,曾遇到不用if _name_ == '_main_':而直接运行程序,这样结果会出错,经查询,在Windows上要想使用进程模块,就必须把有关进程的代码写在当前.py文件的if _name_ == ‘_main_' :语句的下面,才能正常使用Windows下的进程模块。Unix/Linux下则不需要。原因有人这么说:在执行的時候,由于你写的 py 会被当成module 读进执行。所以,一定要判断自身是否为 _main_。也就是要:
if name == ‘main' : # do something.
这里我自己还搞不清楚,期待以后能够理解
The above is the detailed content of Example code for multi-process and process pool (Processing library) in python. For more information, please follow other related articles on the PHP Chinese website!