Home >Backend Development >Python Tutorial >Queue and multi-process in python

Queue and multi-process in python

高洛峰
高洛峰Original
2017-02-25 10:10:021348browse

I recently came across a project that required running tasks in multiple virtual machines. I referred to the code of other people's previous projects and used multi-process to handle it, so I checked online about multi-process in python

1. Let’s talk about Queue (queue object) first

Queue is a standard library in python, which can be directly imported and referenced. When I was studying before, I heard the famous "eat first, pull first" " and "eat first, vomit first", are actually the queue mentioned here. When constructing the queue, you can define its capacity. Don't overeat. If you eat too much, an error will be reported. Do not write it or write it less than 1 when constructing it. The number means infinite

import Queue

q = Queue.Queue(10)

Put it into the queue Value(put)

q.put('yang')

q.put(4)

q.put(['yan','xing'])

Get the value in the queue get()

The default queue is first in first out

>>> q.get()
'yang'
>>> q.get()
4
>>> q .get()
['yan', 'xing']

When a queue is empty, if you use get to retrieve it, it will be blocked, so it is generally used when retrieving the queue.

get_nowait() method, this method will throw an Empty exception when getting a value from an empty queue

So the more common method is to first determine whether a queue is empty. If not If it is empty, take the value

Commonly used methods in queues

Queue.qsize() Returns the size of the queue
Queue.empty() If the queue is empty, Return True, otherwise False
Queue.full() If the queue is full, return True, otherwise False
Queue.get([block[, timeout]]) Get the queue, timeout waiting time
Queue.get_nowait () Equivalent to Queue.get(False)
Non-blocking Queue.put(item) writes to the queue, timeout waiting time
Queue.put_nowait(item) Equivalent to Queue.put(item, False)

2. Using the concept of subprocess in multiprocessing

from multiprocessing import Process

You can construct a subprocess through Process

p = Process(target =fun,args=(args))

Then use p.start() to start the child process

Then use the p.join() method to make the child process finish running before executing the parent Process

from multiprocessing import Process
import os
 
# 子进程要执行的代码
def run_proc(name):
 print 'Run child process %s (%s)...' % (name, os.getpid())
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Process(target=run_proc, args=('test',))
 print 'Process will start.'
 p.start()
 p.join()
 print 'Process end.'

Queue and multi-process in python

3. Use pool

if necessary in multiprocessing When you have multiple child processes, you can consider using a process pool (pool) to manage

from multiprocessing import Pool

from multiprocessing import Pool
import os, time
 
def long_time_task(name):
 print 'Run task %s (%s)...' % (name, os.getpid())
 start = time.time()
 time.sleep(3)
 end = time.time()
 print 'Task %s runs %0.2f seconds.' % (name, (end - start))
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Pool()
 for i in range(5):
  p.apply_async(long_time_task, args=(i,))
 print 'Waiting for all subprocesses done...'
 p.close()
 p.join()
 print 'All subprocesses done.'

pool The method of creating a child process is different from that of Process. It is implemented through

p.apply_async(func,args=(args)). The tasks that can be run simultaneously in a pool depend on the number of CPUs in your computer, such as mine The computer now has 4 CPUs. Then the sub-processes task0, task1, task2, and task3 can be started at the same time. Task4 will only start after the previous process ends

Queue and multi-process in python

The results after running the above program are actually performed separately according to 1, 2, and 3 in the above picture. 1 is printed first, 2 is printed after 3 seconds, and 3 is printed after 3 seconds.

p in the code. close() closes the process pool and no longer adds processes to it. Calling the join() method on the Pool object will wait for all child processes to complete execution. Before calling join(), you must first call close() and call close() After that, you cannot continue to add new Processes.

You can also define the number of processes for the instance pool at that time

If p=Pool(5) in the above code, then all child processes can be processed at the same time

3. Communication between multiple sub-processes

Communication between multiple sub-processes requires the use of the Queue mentioned in the first step. For example, if there is the following requirement, a sub-process Write data to the queue, and another process takes data from the queue,

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value
  q.put(value)
  time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(True)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break

if __name__=='__main__':
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 pw = Process(target=write, args=(q,))
 pr = Process(target=read, args=(q,))
 # 启动子进程pw,写入:
 pw.start() 
 # 等待pw结束:
 pw.join()
 # 启动子进程pr,读取:
 pr.start()
 pr.join()
 # pr进程里是死循环,无法等待其结束,只能强行终止:
 print
 print '所有数据都写入并且读完'


4. About the above Several interesting issues with the code

if __name__=='__main__': 
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'

If the main function is written as the sample above, what I originally want is to get a queue and It is passed as a parameter to each child process in the process pool, but I get the error

RuntimeError: Queue objects should only be shared between processes through inheritance

. After checking, it turns out that it is a queue. The object cannot communicate between the parent process and the child process. If you want to use a queue in the process pool, you must use the Manager class of multiprocess

if __name__=='__main__':
 manager = multiprocessing.Manager()
 # 父进程创建Queue,并传给各个子进程:
 q = manager.Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,))
 time.sleep(0.5)
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'

This queue The object can communicate between the parent process and the child process. If you don’t use a pool, you don’t need a Manager. You can extend the Manager class in multiprocess later

关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁

#coding:gbk
 
from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random
 
# 写数据进程执行的代码:
def write(q,lock):
 lock.acquire() #加上锁
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value  
  q.put(value)  
 lock.release() #释放锁 
 
# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(False)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break
 
if __name__=='__main__':
 manager = multiprocessing.Manager()
 # 父进程创建Queue,并传给各个子进程:
 q = manager.Queue()
 lock = manager.Lock() #初始化一把锁
 p = Pool()
 pw = p.apply_async(write,args=(q,lock)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'

更多Queue and multi-process in python相关文章请关注PHP中文网!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn