이 기사는 Python의 다중 처리에 대한 자세한 소개(코드 예제)를 제공합니다. 이는 특정 참고 가치가 있으므로 도움이 될 수 있습니다.
이 섹션에서는 Python의 다중 프로세스 학습에 대해 설명합니다.
멀티프로세싱 멀티프로세싱
은 둘 다 Python의 병렬
작업에 사용됩니다. 그런데 스레딩이 없는데도 Python에 다중 처리가 있는 이유는 매우 간단합니다. 스레딩 튜토리얼에서 언급한 GIL
과 같은 몇 가지 단점을 보완하기 위한 것입니다. 멀티프로세싱을 사용하는 것도 매우 간단합니다. 스레딩에 대해 어느 정도 이해하고 있다면 이제 즐길 시간이 되었습니다. 왜냐하면 Python은 멀티프로세싱과 스레딩을 거의 동일하게 사용하기 때문에 시작하기도 더 쉽습니다. 컴퓨터의 멀티코어 시스템의 위력을 활용하기 위해! Multiprocessing
和多线程 threading 类似, 他们都是在 python 中用来并行
运算的. 不过既然有了 threading, 为什么 Python 还要出一个 multiprocessing 呢? 原因很简单, 就是用来弥补 threading 的一些劣势, 比如在 threading 教程中提到的GIL
.
使用 multiprocessing 也非常简单, 如果对 threading 有一定了解的朋友, 你们的享受时间就到了. 因为 python 把 multiprocessing 和 threading 的使用方法做的几乎差不多. 这样我们就更容易上手. 也更容易发挥你电脑多核系统的威力了!
import multiprocessing as mp import threading as td def job(a,d): print('aaaaa') t1 = td.Thread(target=job,args=(1,2)) p1 = mp.Process(target=job,args=(1,2)) t1.start() p1.start() t1.join() p1.join()
从上面的使用对比代码可以看出,线程和进程的使用方法相似。
在运用时需要添加上一个定义main函数的语句
if __name__=='__main__':
完整的应用代码:
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_test.py @time: 18/8/26 01:12 """ import multiprocessing as mp def job(a, d): print a, d if __name__ == '__main__': p1 = mp.Process(target=job, args=(1, 2)) p1.start() p1.join()
运行环境要在terminal环境下,可能其他的编辑工具会出现运行结束后没有打印结果,在terminal中的运行后打印的结果为:
➜ baseLearn python ./process/process_test.py 1 2 ➜ baseLearn
Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后再从队列中取出结果, 继续加载运算。原因很简单, 多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果
process_queue.py
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_queue.py @time: 18/8/26 01:12 """ import multiprocessing as mp # 定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果 def job(q): res = 0 for i in range(1000): res += i + i**2 + i**3 q.put(res) #queue if __name__ == '__main__': q = mp.Queue() p1 = mp.Process(target=job, args=(q,)) p2 = mp.Process(target=job, args=(q,)) # 分别启动、连接两个线程 p1.start() p2.start() p1.join() p2.join() # 上面是分两批处理的,所以这里分两批输出,将结果分别保存 res1 = q.get() res2 = q.get() print res1,res2
打印输出结果:
➜ python ./process/process_queue.py 249833583000 249833583000
进程池
就是我们将所要运行的东西,放到池子里,Python会自行解决多进程的问题
。
首先import multiprocessing
和定义job()
import multiprocessing as mp def job(x): return x*x
然后我们定义一个Pool
pool = mp.Pool()
有了池子之后,就可以让池子对应某一个函数,我们向池子里丢数据,池子就会返回函数返回的值。 Pool
和之前的Process的
不同点是丢向Pool的函数有返回值,而Process
的没有返回值。
接下来用map()
获取结果,在map()
中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果
res = pool.map(job, range(10))
让我们来运行一下
def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) if __name__ == '__main__': multicore()
完成代码:
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_queue.py @time: 18/8/26 01:12 """ import multiprocessing as mp def job(x): return x*x # 注意这里的函数有return返回值 def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) if __name__ == '__main__': multicore()
执行结果:
➜ baseLearn python ./process/process_pool.py [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
我们怎么知道Pool
是否真的调用了多个核呢?我们可以把迭代次数增大些,然后打开CPU负载看下CPU运行情况
打开CPU负载(Mac):活动监视器 > CPU > CPU负载(单击一下即可)
Pool默认大小是CPU的核数,我们也可以通过在Pool
中传入processes
参数即可自定义需要的核数量
def multicore(): pool = mp.Pool(processes=3) # 定义CPU核数量为3 res = pool.map(job, range(10)) print(res)
Pool
除了map()
外,还有可以返回结果的方式,那就是apply_async()
.
apply_async()
中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值
def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) res = pool.apply_async(job, (2,)) # 用get获得结果 print(res.get())
运行结果;
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map() 4 # apply_async()
Pool
默认调用是CPU的核数,传入processes参数可自定义CPU核数
map()
放入迭代参数,返回多个结果
apply_async()
只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代
这节我们学习如何定义共享内存。只有用共享内存才能让CPU之间有交流
。
我们可以通过使用Value
数据存储在一个共享的内存表中。
import multiprocessing as mp value1 = mp.Value('i', 0) value2 = mp.Value('d', 3.14)
其中d
和i
参数用来设置数据类型的,d
表示一个双精浮点类型 double,i
表示一个带符号的整型
array = mp.Array('i', [1, 2, 3, 4])🎜 위의 사용 비교 코드를 보면 스레드와 프로세스가 비슷한 방식으로 사용되는 것을 알 수 있습니다. 🎜
array = mp.Array('i', [[1, 2], [3, 4]]) # 2维list """ TypeError: an integer is required """🎜완전한 애플리케이션 코드:🎜
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_no_lock.py @time: 18/8/26 09:22 """ import multiprocessing as mp import time def job(v, num): for _ in range(5): time.sleep(0.5) # 暂停0.5秒,让输出效果更明显 v.value += num # v.value获取共享变量值 print(v.value) def multicore(): v = mp.Value('i', 0) # 定义共享变量 p1 = mp.Process(target=job, args=(v, 1)) p2 = mp.Process(target=job, args=(v, 4)) # 设定不同的number看如何抢夺内存 p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()🎜실행 환경은 터미널 환경이어야 하며, 다른 편집 도구에서는 작업이 종료될 수 있습니다. 실행 후 출력되는 결과는 없으며 터미널에서 실행 후 출력되는 결과는 다음과 같습니다. 🎜
➜ baseLearn python ./process/process_no_lock.py 1 5 9 9 13 13 17 17 18 18 ➜ baseLearn🎜 3. 저장 프로세스 출력 Queue🎜🎜 Queue의 기능은 각 코어나 스레드의 연산 결과를 큐에 넣는 것이고, 각 스레드가 실행될 때까지 기다리거나 코어 실행이 완료된 후 대기열에서 결과가 검색되고 로딩 작업이 계속됩니다. 이유는 매우 간단합니다. 여러 스레드에서 호출되는 함수는 반환 값을 가질 수 없으므로 Queue는 여러 스레드 작업의 결과를 저장하는 데 사용됩니다🎜🎜
process_queue.py
🎜l = mp.Lock() # 定义一个进程锁🎜출력 결과 인쇄:🎜
p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入 p2 = mp.Process(target=job, args=(v,3,l))🎜 4. 프로세스 풀 🎜🎜
프로세스 풀
은 실행하려는 항목을 풀에 넣는 것을 의미하며, Python은 여러 프로세스의 문제를 자체적으로 해결합니다
. 🎜다중 처리 가져오기
및 job()
🎜def job(v, num, l): l.acquire() # 锁住 for _ in range(5): time.sleep(0.1) v.value += num # v.value获取共享内存 print(v.value) l.release() # 释放
풀
🎜# -*- coding:utf-8 -*- """ @author: Corwien @file: process_lock.py @time: 18/8/26 09:22 """ import multiprocessing as mp import time def job(v, num, l): l.acquire() # 锁住 for _ in range(5): time.sleep(0.5) # 暂停0.5秒,让输出效果更明显 v.value += num # v.value获取共享变量值 print(v.value) l.release() # 释放 def multicore(): l = mp.Lock() # 定义一个进程锁 v = mp.Value('i', 0) # 定义共享变量 p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将lock传入 p2 = mp.Process(target=job, args=(v, 4, l)) # 设定不同的number看如何抢夺内存 p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()🎜을 정의합니다. 풀을 만든 후에는 풀이 특정 기능에 해당하도록 만들 수 있습니다. pool은 함수 반환 값을 반환합니다.
Pool
과 이전 Process
의 차이점은 Pool에 던져진 함수는 반환 값을 가지고 있는 반면, Process
는 반환 값이 없습니다. 🎜🎜다음으로 map()
을 사용하여 결과를 얻어야 합니다. map()
에 반복해야 할 함수와 값을 넣은 다음, 반환 결과🎜➜ baseLearn python ./process/process_lock.py 1 2 3 4 5 9 13 17 21 25🎜실행해 보겠습니다🎜rrreee🎜전체 코드:🎜rrreee🎜실행 결과:🎜rrreee
풀
다중 코어가 실제로 호출됩니까? 반복 횟수를 늘린 다음 CPU 로드를 열어 CPU 작업을 확인할 수 있습니다🎜🎜CPU 로드 열기(Mac): Activity Monitor > CPU > CPU 로드(한 번 클릭) 🎜🎜풀 기본 크기는 CPU 코어 수입니다. , Pool
🎜rrreeePool에 <code>processes
매개변수를 전달하여 필요한 코어 수를 사용자 정의할 수도 있습니다. map()
외에도 결과를 반환하는 방법, 즉 apply_async()
도 있습니다.🎜🎜 Apply_async()
code>하나의 값만 전달할 수 있어 하나의 코어에만 넣어 연산을 하게 되는데, 값을 전달할 때 iterable이므로 전달된 값 뒤에 쉼표를 추가해야 한다는 점에 유의하세요. 반환 값을 얻으려면 get() 메서드를 사용해야 합니다.🎜rrreee🎜실행 결과 🎜rrreeemap()
를 전달하여 CPU 코어 수를 맞춤 설정할 수 있습니다. 반복 매개변수를 입력하고 여러 결과를 반환합니다🎜 apply_async()
는 맵의 효과를 얻으려는 경우에만 매개변수 집합을 입력하고 결과를 반환할 수 있습니다. (), 반복해야 합니다🎜공유 메모리를 통해서만 CPU가 서로 통신할 수 있습니다
. 🎜값
을 사용하여 공유 메모리 테이블에 데이터를 저장할 수 있습니다. 🎜rrreee🎜 d
및 i
매개변수는 데이터 유형을 설정하는 데 사용됩니다. d
는 배정밀도 부동 소수점 유형인 i를 나타냅니다.
는 부호 있는 정수
를 나타냅니다. 🎜Type code | C Type | Python Type | Minimum size in bytes |
---|---|---|---|
'b' |
signed char | int | 1 |
'B' |
unsigned char | int | 1 |
'u' |
Py_UNICODE | Unicode character | 2 |
'h' |
signed short | int | 2 |
'H' |
unsigned short | int | 2 |
'i' |
signed int | int | 2 |
'I' |
unsigned int | int | 2 |
'l' |
signed long | int | 4 |
'L' |
unsigned long | int | 4 |
'q' |
signed long long | int | 8 |
'Q' |
unsigned long long | int | 8 |
'f' |
float | float | 4 |
'd' |
double | float | 8 |
在Python的 mutiprocessing
中,有还有一个Array
类,可以和共享内存交互,来实现在进程之间共享数据。
array = mp.Array('i', [1, 2, 3, 4])
这里的Array
和numpy中的不同,它只能是一维
的,不能是多维的。同样和Value
一样,需要定义数据形式,否则会报错。 我们会在后一节举例说明这两种的使用方法.
错误形式
array = mp.Array('i', [[1, 2], [3, 4]]) # 2维list """ TypeError: an integer is required """
让我们看看没有加进程锁时会产生什么样的结果。
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_no_lock.py @time: 18/8/26 09:22 """ import multiprocessing as mp import time def job(v, num): for _ in range(5): time.sleep(0.5) # 暂停0.5秒,让输出效果更明显 v.value += num # v.value获取共享变量值 print(v.value) def multicore(): v = mp.Value('i', 0) # 定义共享变量 p1 = mp.Process(target=job, args=(v, 1)) p2 = mp.Process(target=job, args=(v, 4)) # 设定不同的number看如何抢夺内存 p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()
在上面的代码中,我们定义了一个共享变量v
,两个进程都可以对它进行操作。 在job()中我们想让v
每隔0.1秒输出一次累加num
的结果,但是在两个进程p1
和p2
中设定了不同的累加值。所以接下来让我们来看下这两个进程是否会出现冲突。
结果打印:
➜ baseLearn python ./process/process_no_lock.py 1 5 9 9 13 13 17 17 18 18 ➜ baseLearn
我们可以看到,进程1和进程2在相互抢
着使用共享内存v
。
为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。
首先需要定义一个进程锁
l = mp.Lock() # 定义一个进程锁
然后将进程锁的信息传入各个进程中
p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入 p2 = mp.Process(target=job, args=(v,3,l))
在job()
中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占
def job(v, num, l): l.acquire() # 锁住 for _ in range(5): time.sleep(0.1) v.value += num # v.value获取共享内存 print(v.value) l.release() # 释放
全部代码:
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_lock.py @time: 18/8/26 09:22 """ import multiprocessing as mp import time def job(v, num, l): l.acquire() # 锁住 for _ in range(5): time.sleep(0.5) # 暂停0.5秒,让输出效果更明显 v.value += num # v.value获取共享变量值 print(v.value) l.release() # 释放 def multicore(): l = mp.Lock() # 定义一个进程锁 v = mp.Value('i', 0) # 定义共享变量 p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将lock传入 p2 = mp.Process(target=job, args=(v, 4, l)) # 设定不同的number看如何抢夺内存 p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()
运行一下,让我们看看是否还会出现抢占资源的情况:
结果打印:
➜ baseLearn python ./process/process_lock.py 1 2 3 4 5 9 13 17 21 25
显然,进程锁保证了进程p1
的完整运行,然后才进行了进程p2
的运行
相关推荐:
위 내용은 Python의 다중 프로세스에 대한 자세한 소개(코드 예)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!