>백엔드 개발 >파이썬 튜토리얼 >Python의 다중 프로세스에 대한 자세한 소개(코드 예)

Python의 다중 프로세스에 대한 자세한 소개(코드 예)

不言
不言원래의
2018-08-29 10:25:011793검색

이 기사는 Python의 다중 처리에 대한 자세한 소개(코드 예제)를 제공합니다. 이는 특정 참고 가치가 있으므로 도움이 될 수 있습니다.

이 섹션에서는 Python의 다중 프로세스 학습에 대해 설명합니다.

1. 멀티프로세싱과 멀티스레딩 비교

멀티프로세싱 멀티프로세싱은 둘 다 Python의 병렬 작업에 사용됩니다. 그런데 스레딩이 없는데도 Python에 다중 처리가 있는 이유는 매우 간단합니다. 스레딩 튜토리얼에서 언급한 GIL과 같은 몇 가지 단점을 보완하기 위한 것입니다. 멀티프로세싱을 사용하는 것도 매우 간단합니다. 스레딩에 대해 어느 정도 이해하고 있다면 이제 즐길 시간이 되었습니다. 왜냐하면 Python은 멀티프로세싱과 스레딩을 거의 동일하게 사용하기 때문에 시작하기도 더 쉽습니다. 컴퓨터의 멀티코어 시스템의 위력을 활용하기 위해! Multiprocessing 和多线程 threading 类似, 他们都是在 python 中用来并行运算的. 不过既然有了 threading, 为什么 Python 还要出一个 multiprocessing 呢? 原因很简单, 就是用来弥补 threading 的一些劣势, 比如在 threading 教程中提到的GIL.

使用 multiprocessing 也非常简单, 如果对 threading 有一定了解的朋友, 你们的享受时间就到了. 因为 python 把 multiprocessing 和 threading 的使用方法做的几乎差不多. 这样我们就更容易上手. 也更容易发挥你电脑多核系统的威力了!

二、添加进程Process

import multiprocessing as mp
import threading as td

def job(a,d):
    print('aaaaa')

t1 = td.Thread(target=job,args=(1,2))
p1 = mp.Process(target=job,args=(1,2))
t1.start()
p1.start()
t1.join()
p1.join()

从上面的使用对比代码可以看出,线程和进程的使用方法相似。

使用

在运用时需要添加上一个定义main函数的语句

if __name__=='__main__':

完整的应用代码:

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_test.py
@time: 18/8/26 01:12
"""

import multiprocessing as mp

def job(a, d):
    print a, d

if __name__ == '__main__':
    p1 = mp.Process(target=job, args=(1, 2))
    p1.start()
    p1.join()

运行环境要在terminal环境下,可能其他的编辑工具会出现运行结束后没有打印结果,在terminal中的运行后打印的结果为:

➜  baseLearn python ./process/process_test.py
1 2
➜  baseLearn

三、存储进程输出Queue

Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后再从队列中取出结果, 继续加载运算。原因很简单, 多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果

process_queue.py

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_queue.py
@time: 18/8/26 01:12
"""

import multiprocessing as mp

# 定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果
def job(q):
    res = 0
    for i in range(1000):
        res += i + i**2 + i**3
    q.put(res)   #queue

if __name__ == '__main__':
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))

    # 分别启动、连接两个线程
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    # 上面是分两批处理的,所以这里分两批输出,将结果分别保存
    res1 = q.get()
    res2 = q.get()

    print res1,res2

打印输出结果:

➜ python ./process/process_queue.py
249833583000 249833583000

四、进程池

进程池就是我们将所要运行的东西,放到池子里,Python会自行解决多进程的问题

1、导入多进程模块

首先import multiprocessing 和定义job()

import multiprocessing as mp

def job(x):
    return x*x

2、进程池Pool()和map()

然后我们定义一个Pool

pool = mp.Pool()

有了池子之后,就可以让池子对应某一个函数,我们向池子里丢数据,池子就会返回函数返回的值。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process没有返回值

接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果

res = pool.map(job, range(10))

让我们来运行一下

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    
if __name__ == '__main__':
    multicore()

完成代码:

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_queue.py
@time: 18/8/26 01:12
"""

import multiprocessing as mp

def job(x):
    return x*x  # 注意这里的函数有return返回值

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    
if __name__ == '__main__':
    multicore()

执行结果:

➜  baseLearn python ./process/process_pool.py
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

3、自定义核数量

我们怎么知道Pool是否真的调用了多个核呢?我们可以把迭代次数增大些,然后打开CPU负载看下CPU运行情况

打开CPU负载(Mac):活动监视器 > CPU > CPU负载(单击一下即可)

Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量

def multicore():
    pool = mp.Pool(processes=3) # 定义CPU核数量为3
    res = pool.map(job, range(10))
    print(res)

4、apply_async()

Pool除了map()外,还有可以返回结果的方式,那就是apply_async().

apply_async()只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值

def multicore():
    pool = mp.Pool() 
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get获得结果
    print(res.get())

运行结果;

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]  # map()
4 # apply_async()

总结

  • Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数

  • map() 放入迭代参数,返回多个结果

  • apply_async()只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代

五、共享内存shared memory

这节我们学习如何定义共享内存。只有用共享内存才能让CPU之间有交流

Shared Value

我们可以通过使用Value数据存储在一个共享的内存表中。

import multiprocessing as mp

value1 = mp.Value('i', 0) 
value2 = mp.Value('d', 3.14)

其中di参数用来设置数据类型的,d表示一个双精浮点类型 double,i表示一个带符号的整型

2. 프로세스 추가 Process🎜
array = mp.Array('i', [1, 2, 3, 4])
🎜 위의 사용 비교 코드를 보면 스레드와 프로세스가 비슷한 방식으로 사용되는 것을 알 수 있습니다. 🎜

사용

🎜사용 시 주요 기능을 정의하는 문을 추가해야 합니다🎜
array = mp.Array('i', [[1, 2], [3, 4]]) # 2维list

"""
TypeError: an integer is required
"""
🎜완전한 애플리케이션 코드:🎜
# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_no_lock.py
@time: 18/8/26 09:22
"""

import multiprocessing as mp
import time

def job(v, num):
    for _ in range(5):
        time.sleep(0.5) # 暂停0.5秒,让输出效果更明显
        v.value += num  # v.value获取共享变量值
        print(v.value)

def multicore():
    v = mp.Value('i', 0)  # 定义共享变量
    p1 = mp.Process(target=job, args=(v, 1))
    p2 = mp.Process(target=job, args=(v, 4)) # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()
🎜실행 환경은 터미널 환경이어야 하며, 다른 편집 도구에서는 작업이 종료될 수 있습니다. 실행 후 출력되는 결과는 없으며 터미널에서 실행 후 출력되는 결과는 다음과 같습니다. 🎜
➜  baseLearn python ./process/process_no_lock.py
1
5
9
9
13
13
17
17
18
18
➜  baseLearn
🎜 3. 저장 프로세스 출력 Queue🎜🎜 Queue의 기능은 각 코어나 스레드의 연산 결과를 큐에 넣는 것이고, 각 스레드가 실행될 때까지 기다리거나 코어 실행이 완료된 후 대기열에서 결과가 검색되고 로딩 작업이 계속됩니다. 이유는 매우 간단합니다. 여러 스레드에서 호출되는 함수는 반환 값을 가질 수 없으므로 Queue는 여러 스레드 작업의 결과를 저장하는 데 사용됩니다🎜🎜process_queue.py🎜
 l = mp.Lock() # 定义一个进程锁
🎜출력 결과 인쇄:🎜
p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入
p2 = mp.Process(target=job, args=(v,3,l))
🎜 4. 프로세스 풀 🎜🎜프로세스 풀은 실행하려는 항목을 풀에 넣는 것을 의미하며, Python은 여러 프로세스의 문제를 자체적으로 해결합니다. 🎜

1. 다중 프로세스 모듈 가져오기

🎜먼저 다중 처리 가져오기job()🎜
def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # v.value获取共享内存
        print(v.value)
    l.release() # 释放

정의 2. 그리고 map()

🎜그런 다음 🎜
# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_lock.py
@time: 18/8/26 09:22
"""

import multiprocessing as mp
import time

def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.5) # 暂停0.5秒,让输出效果更明显
        v.value += num  # v.value获取共享变量值
        print(v.value)
    l.release() # 释放

def multicore():
    l = mp.Lock() # 定义一个进程锁
    v = mp.Value('i', 0)  # 定义共享变量
    p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将lock传入
    p2 = mp.Process(target=job, args=(v, 4, l)) # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()
🎜을 정의합니다. 풀을 만든 후에는 풀이 특정 기능에 해당하도록 만들 수 있습니다. pool은 함수 반환 값을 반환합니다. Pool과 이전 Process의 차이점은 Pool에 던져진 함수는 반환 값을 가지고 있는 반면, Process반환 값이 없습니다. 🎜🎜다음으로 map()을 사용하여 결과를 얻어야 합니다. map()에 반복해야 할 함수와 값을 넣은 다음, 반환 결과🎜
➜  baseLearn python ./process/process_lock.py
1
2
3
4
5
9
13
17
21
25
🎜실행해 보겠습니다🎜rrreee🎜전체 코드:🎜rrreee🎜실행 결과:🎜rrreee

3.맞춤형 코어 수

🎜알 수 있는 방법 다중 코어가 실제로 호출됩니까? 반복 횟수를 늘린 다음 CPU 로드를 열어 CPU 작업을 확인할 수 있습니다🎜🎜CPU 로드 열기(Mac): Activity Monitor > CPU > CPU 로드(한 번 클릭) 🎜🎜풀 기본 크기는 CPU 코어 수입니다. , Pool🎜rrreee

4, apply_async() h4>🎜Pool에 <code>processes 매개변수를 전달하여 필요한 코어 수를 사용자 정의할 수도 있습니다. map() 외에도 결과를 반환하는 방법, 즉 apply_async()도 있습니다.🎜🎜 Apply_async() code>하나의 값만 전달할 수 있어 하나의 코어에만 넣어 연산을 하게 되는데, 값을 전달할 때 iterable이므로 전달된 값 뒤에 쉼표를 추가해야 한다는 점에 유의하세요. 반환 값을 얻으려면 get() 메서드를 사용해야 합니다.🎜rrreee🎜실행 결과 🎜rrreee

요약

  • 🎜Pool기본 호출은 CPU 코어 수입니다. 프로세스 매개변수🎜
  • 🎜map()를 전달하여 CPU 코어 수를 맞춤 설정할 수 있습니다. 반복 매개변수를 입력하고 여러 결과를 반환합니다🎜
  • 🎜apply_async()는 맵의 효과를 얻으려는 경우에만 매개변수 집합을 입력하고 결과를 반환할 수 있습니다. (), 반복해야 합니다🎜
🎜5. 공유 메모리 공유 메모리🎜🎜이 섹션에서는 공유 메모리를 정의하는 방법을 알아봅니다. 공유 메모리를 통해서만 CPU가 서로 통신할 수 있습니다. 🎜

공유 값

🎜 을 사용하여 공유 메모리 테이블에 데이터를 저장할 수 있습니다. 🎜rrreee🎜 di 매개변수는 데이터 유형을 설정하는 데 사용됩니다. d는 배정밀도 부동 소수점 유형인 i를 나타냅니다. 는 부호 있는 정수를 나타냅니다. 🎜
Type code C Type Python Type Minimum size in bytes
'b' signed char int 1
'B' unsigned char int 1
'u' Py_UNICODE Unicode character 2
'h' signed short int 2
'H' unsigned short int 2
'i' signed int int 2
'I' unsigned int int 2
'l' signed long int 4
'L' unsigned long int 4
'q' signed long long int 8
'Q' unsigned long long int 8
'f' float float 4
'd' double float 8

Shared Array

在Python的 mutiprocessing 中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据

array = mp.Array('i', [1, 2, 3, 4])

这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。 我们会在后一节举例说明这两种的使用方法.

错误形式

array = mp.Array('i', [[1, 2], [3, 4]]) # 2维list

"""
TypeError: an integer is required
"""

六、进程锁Lock

不加进程锁

让我们看看没有加进程锁时会产生什么样的结果。

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_no_lock.py
@time: 18/8/26 09:22
"""

import multiprocessing as mp
import time

def job(v, num):
    for _ in range(5):
        time.sleep(0.5) # 暂停0.5秒,让输出效果更明显
        v.value += num  # v.value获取共享变量值
        print(v.value)

def multicore():
    v = mp.Value('i', 0)  # 定义共享变量
    p1 = mp.Process(target=job, args=(v, 1))
    p2 = mp.Process(target=job, args=(v, 4)) # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()

在上面的代码中,我们定义了一个共享变量v,两个进程都可以对它进行操作。 在job()中我们想让v每隔0.1秒输出一次累加num的结果,但是在两个进程p1p2 中设定了不同的累加值。所以接下来让我们来看下这两个进程是否会出现冲突。

结果打印:

➜  baseLearn python ./process/process_no_lock.py
1
5
9
9
13
13
17
17
18
18
➜  baseLearn

我们可以看到,进程1和进程2在相互着使用共享内存v

加进程锁

为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。

首先需要定义一个进程锁

 l = mp.Lock() # 定义一个进程锁

然后将进程锁的信息传入各个进程中

p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入
p2 = mp.Process(target=job, args=(v,3,l))

job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占

def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # v.value获取共享内存
        print(v.value)
    l.release() # 释放

全部代码:

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_lock.py
@time: 18/8/26 09:22
"""

import multiprocessing as mp
import time

def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.5) # 暂停0.5秒,让输出效果更明显
        v.value += num  # v.value获取共享变量值
        print(v.value)
    l.release() # 释放

def multicore():
    l = mp.Lock() # 定义一个进程锁
    v = mp.Value('i', 0)  # 定义共享变量
    p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将lock传入
    p2 = mp.Process(target=job, args=(v, 4, l)) # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()

运行一下,让我们看看是否还会出现抢占资源的情况:

结果打印:

➜  baseLearn python ./process/process_lock.py
1
2
3
4
5
9
13
17
21
25

显然,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行

相关推荐:

python 多进程通信模块

Python守护进程(多线程开发)

위 내용은 Python의 다중 프로세스에 대한 자세한 소개(코드 예)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.