Home  >  Article  >  Backend Development  >  Detailed introduction to multi-process in python (code example)

Detailed introduction to multi-process in python (code example)

不言
不言Original
2018-08-29 10:25:011735browse

This article brings you a detailed introduction (code example) about multi-processing in python. It has certain reference value. Friends in need can refer to it. I hope it will be helpful to you.

This section talks about learning Python’s multi-process.

1. Comparison between multi-process and multi-threading

Multi-processMultiprocessing is similar to multi-threading, they are both used in pythonParallel But since there is threading, why does Python have a multiprocessing? The reason is very simple, it is to make up for some disadvantages of threading, such as GIL.

mentioned in the threading tutorial.

Using multiprocessing is also very simple. If you have a certain understanding of threading, your time to enjoy is here. Because python makes the use of multiprocessing and threading almost the same. This makes it easier for us to get started. It is also easier to play. The power of your computer's multi-core system!

2. Add process Process

import multiprocessing as mp
import threading as td

def job(a,d):
    print('aaaaa')

t1 = td.Thread(target=job,args=(1,2))
p1 = mp.Process(target=job,args=(1,2))
t1.start()
p1.start()
t1.join()
p1.join()

As can be seen from the above usage comparison code, threads and processes are used in similar ways.

Use

You need to add a statement defining the main function when using it

if __name__=='__main__':

Complete application code:

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_test.py
@time: 18/8/26 01:12
"""

import multiprocessing as mp

def job(a, d):
    print a, d

if __name__ == '__main__':
    p1 = mp.Process(target=job, args=(1, 2))
    p1.start()
    p1.join()

The running environment must be in the terminal environment Under other editing tools, there may be no printed result after running. The result printed after running in the terminal is:

➜  baseLearn python ./process/process_test.py
1 2
➜  baseLearn

3. The function of the stored process output Queue

Queue is Put the operation results of each core or thread in the queue, wait until each thread or core has finished running, then take the results out of the queue, and continue loading the operation. The reason is very simple. Functions called by multiple threads cannot have return values, so Queue is used to store the results of multiple thread operations

process_queue.py

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_queue.py
@time: 18/8/26 01:12
"""

import multiprocessing as mp

# 定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果
def job(q):
    res = 0
    for i in range(1000):
        res += i + i**2 + i**3
    q.put(res)   #queue

if __name__ == '__main__':
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))

    # 分别启动、连接两个线程
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    # 上面是分两批处理的,所以这里分两批输出,将结果分别保存
    res1 = q.get()
    res2 = q.get()

    print res1,res2

Print the output results:

➜ python ./process/process_queue.py
249833583000 249833583000

4. Process Pool

Process Pool means that we put the things we want to run into the pool, Python will solve the problem of multiple processes by itself.

1. Import multi-process module

First import multiprocessing and define job()

import multiprocessing as mp

def job(x):
    return x*x

2. Process pool Pool () and map()

Then we define a Pool

pool = mp.Pool()

After we have the pool, we can make the pool correspond to a certain function, and we throw data into the pool , the pool will return the value returned by the function. The difference between Pool and the previous Process is that the function thrown to the Pool has a return value , while the of Process does not return value.

Next use

map() to get the result. In map() you need to put the function and the value that needs to be iterated, and then it will be automatically allocated to the CPU Core, return result

res = pool.map(job, range(10))
Let us run it

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    
if __name__ == '__main__':
    multicore()
Complete code:

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_queue.py
@time: 18/8/26 01:12
"""

import multiprocessing as mp

def job(x):
    return x*x  # 注意这里的函数有return返回值

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    
if __name__ == '__main__':
    multicore()
Execution result:

➜  baseLearn python ./process/process_pool.py
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
3. Customized number of cores

How do we know whether

Pool actually calls multiple cores? We can increase the number of iterations, and then open the CPU load to see the CPU operation

Open the CPU load (Mac): Activity Monitor>CPU>CPU load (click once)

The default size of the Pool is the number of cores of the CPU. We can also customize the required number of cores by passing the

processes parameter in Pool

def multicore():
    pool = mp.Pool(processes=3) # 定义CPU核数量为3
    res = pool.map(job, range(10))
    print(res)
4. apply_async()

PoolIn addition to map(), there is also a way to return results, that is apply_async() .

apply_async() can only pass one value, it will only put one core into the operation, but when passing in the value, you should pay attention to iterable, so in You need to add a comma after passing in the value, and you need to use the get() method to get the return value

def multicore():
    pool = mp.Pool() 
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get获得结果
    print(res.get())
Running results;

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]  # map()
4 # apply_async()
Summary

  • PoolThe default call is the number of CPU cores. You can customize the number of CPU cores by passing in the processes parameter.

  • map() Put in iteration Parameters, return multiple results

  • apply_async()You can only put a set of parameters and return one result. If you want to get the effect of map(), you need to iterate

5. Shared memory shared memory

In this section we learn how to define shared memory.

Only shared memory can allow communication between CPUs.

Shared Value

We can store data in a shared memory table by using

Value.

import multiprocessing as mp

value1 = mp.Value('i', 0) 
value2 = mp.Value('d', 3.14)
The

d and i parameters are used to set the data type, d represents a double precision floating point type, i represents a signed integer type .

Type code C Type Python Type Minimum size in bytes
'b' signed char int 1
'B' unsigned char int 1
'u' Py_UNICODE Unicode character 2
'h' signed short int 2
'H' unsigned short int 2
'i' signed int int 2
'I' unsigned int int 2
'l' signed long int 4
'L' unsigned long int 4
'q' signed long long int 8
'Q' unsigned long long int 8
'f' float float 4
'd' double float 8

Shared Array

在Python的 mutiprocessing 中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据

array = mp.Array('i', [1, 2, 3, 4])

这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。 我们会在后一节举例说明这两种的使用方法.

错误形式

array = mp.Array('i', [[1, 2], [3, 4]]) # 2维list

"""
TypeError: an integer is required
"""

六、进程锁Lock

不加进程锁

让我们看看没有加进程锁时会产生什么样的结果。

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_no_lock.py
@time: 18/8/26 09:22
"""

import multiprocessing as mp
import time

def job(v, num):
    for _ in range(5):
        time.sleep(0.5) # 暂停0.5秒,让输出效果更明显
        v.value += num  # v.value获取共享变量值
        print(v.value)

def multicore():
    v = mp.Value('i', 0)  # 定义共享变量
    p1 = mp.Process(target=job, args=(v, 1))
    p2 = mp.Process(target=job, args=(v, 4)) # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()

在上面的代码中,我们定义了一个共享变量v,两个进程都可以对它进行操作。 在job()中我们想让v每隔0.1秒输出一次累加num的结果,但是在两个进程p1p2 中设定了不同的累加值。所以接下来让我们来看下这两个进程是否会出现冲突。

结果打印:

➜  baseLearn python ./process/process_no_lock.py
1
5
9
9
13
13
17
17
18
18
➜  baseLearn

我们可以看到,进程1和进程2在相互着使用共享内存v

加进程锁

为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。

首先需要定义一个进程锁

 l = mp.Lock() # 定义一个进程锁

然后将进程锁的信息传入各个进程中

p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入
p2 = mp.Process(target=job, args=(v,3,l))

job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占

def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # v.value获取共享内存
        print(v.value)
    l.release() # 释放

全部代码:

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_lock.py
@time: 18/8/26 09:22
"""

import multiprocessing as mp
import time

def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.5) # 暂停0.5秒,让输出效果更明显
        v.value += num  # v.value获取共享变量值
        print(v.value)
    l.release() # 释放

def multicore():
    l = mp.Lock() # 定义一个进程锁
    v = mp.Value('i', 0)  # 定义共享变量
    p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将lock传入
    p2 = mp.Process(target=job, args=(v, 4, l)) # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()

运行一下,让我们看看是否还会出现抢占资源的情况:

结果打印:

➜  baseLearn python ./process/process_lock.py
1
2
3
4
5
9
13
17
21
25

显然,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行

相关推荐:

python 多进程通信模块

Python守护进程(多线程开发)

The above is the detailed content of Detailed introduction to multi-process in python (code example). For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn