>백엔드 개발 >파이썬 튜토리얼 >Python 멀티스레딩 및 멀티프로세스 세부 정리

Python 멀티스레딩 및 멀티프로세스 세부 정리

WBOY
WBOY앞으로
2023-04-15 20:07:011420검색

Python 멀티스레딩 및 멀티프로세스 세부 정리


스레드와 프로세스의 차이점

프로세스와 스레드는 운영체제의 기본 개념이지만 상대적으로 추상적이고 익히기가 어렵습니다. 멀티프로세스와 멀티스레딩에 관해 교과서에서 가장 고전적인 문장은 "프로세스는 자원할당의 최소 단위이고, 스레드는 CPU 스케줄링의 최소 단위이다"이다. 스레드는 프로그램의 단일 순차적 제어 흐름입니다. 프로세스 내에서 상대적으로 독립적이고 스케줄링 가능한 실행 단위입니다. 시스템이 독립적으로 CPU를 스케줄링하고 할당하는 기본 단위입니다. 단일 프로그램에서 여러 작업을 완료하기 위해 동시에 여러 스레드를 실행하는 것을 멀티스레딩이라고 합니다.

Python 멀티스레딩 및 멀티프로세스 세부 정리

프로세스와 스레드의 차이점

프로세스는 자원 할당의 기본 단위입니다. 공정과 관련된 모든 자원은 공정 제어 블록 PCB에 기록됩니다. 프로세스가 이러한 리소스를 소유하거나 사용하고 있음을 나타냅니다. 또한 프로세스는 프로세서를 선점하는 스케줄링 단위이기도 하며 완전한 가상 주소 공간을 갖습니다. 프로세스가 예약되면 서로 다른 프로세스는 서로 다른 가상 주소 공간을 가지며 동일한 프로세스 내의 서로 다른 스레드는 동일한 주소 공간을 공유합니다.

프로세스에 해당하는 스레드는 리소스 할당과 관련이 없으며 특정 프로세스에 속하며 프로세스의 다른 스레드와 프로세스의 리소스를 공유합니다. 스레드는 관련 스택(시스템 스택 또는 사용자 스택) 레지스터와 스레드 제어 테이블 TCB로만 구성됩니다. 레지스터는 스레드 내에 지역 변수를 저장하는 데 사용할 수 있지만 다른 스레드와 관련된 변수는 저장할 수 없습니다.

일반적으로 프로세스에는 프로세스가 소유한 리소스를 활용할 수 있는 여러 스레드가 포함될 수 있습니다. 스레드를 도입한 운영 체제에서 프로세스는 일반적으로 자원 할당의 기본 단위로 간주되며, 스레드는 독립적인 작업 및 독립적인 스케줄링의 기본 단위로 간주됩니다.

스레드는 프로세스보다 작고 기본적으로 시스템 리소스를 소유하지 않기 때문에 스레드를 예약하는 데 드는 오버헤드가 훨씬 작아지고 시스템의 여러 프로그램 간의 동시 실행 정도를 보다 효율적으로 높일 수 있으므로 시스템 리소스 활용도와 시스템 리소스 활용도가 크게 향상됩니다. 처리량.

따라서 최근 출시된 범용 운영 체제에서는 시스템의 동시성을 더욱 향상시키기 위해 스레드를 도입했으며 이를 현대 운영 체제의 중요한 지표로 간주합니다.

Python 멀티스레딩 및 멀티프로세스 세부 정리

스레드와 프로세스의 차이점은 다음 네 가지로 요약할 수 있습니다.

  • 주소 공간 및 기타 리소스(예: 열린 파일): 프로세스는 서로 독립적이며 동일한 프로세스의 스레드 간에 공유됩니다. 프로세스의 스레드는 다른 프로세스에서 표시되지 않습니다.
  • 통신: 프로세스 간 통신 IPC, 스레드는 프로세스 데이터 세그먼트(예: 전역 변수)를 직접 읽고 쓸 수 있어 통신할 수 있습니다. 데이터 일관성을 보장하려면 프로세스 동기화 및 상호 배제 수단의 지원이 필요합니다.
  • 예약 및 전환: 스레드 컨텍스트 전환은 프로세스 컨텍스트 전환보다 훨씬 빠릅니다.
  • 멀티 스레드 OS에서 프로세스는 실행 가능한 엔터티가 아닙니다.

멀티 프로세스와 멀티 스레드 비교

크기 비교

멀티 프로세스

멀티스레딩

요약

데이터 공유 및 동기화

데이터 공유는 복잡하고 동기화는 간단합니다

데이터 공유는 간단하고 동기화는 복잡합니다

각각의 장점과 단점이 있습니다

메모리, CPU

를 차지합니다. 많은 메모리, 복잡한 전환, CPU 사용률 낮음

더 적은 메모리 차지, 간단한 전환, 높은 CPU 사용률

스레드가 지배적

생성, 파괴, 전환

복잡함, 느림

간단하고 빠르다

스레드가 지배적

프로그래밍 및 디버깅

간단한 프로그래밍 및 간단한 디버깅

복잡한 프로그래밍 및 복잡한 디버깅

프로세스가 지배합니다

신뢰성

프로세스가 서로 영향을 미치지 않습니다

스레드 하나가 멈추면 전체 프로세스가 중단됩니다

프로세스 지배력

분산

멀티 코어 및 멀티 머신에 적합하며 여러 머신으로 확장이 용이함

적합한 지배적 멀티 코어

프로세스

요약하자면, 프로세스와 스레드는 기차와 마차에 비유될 수도 있습니다.

  • 스레드는 프로세스 아래로 이동합니다(간단한 캐리지는 실행할 수 없음)
  • 프로세스는 여러 스레드를 포함할 수 있습니다(열차는 여러 캐리지를 가질 수 있음)
  • 서로 다른 프로세스 간 데이터 공유가 어려움(역 환승 등 한 열차의 승객이 다른 열차로 환승하기 어려움)
  • 동일한 프로세스 내 서로 다른 스레드 간 데이터 공유가 용이함(객차에서 변경이 용이함) A에서 캐리지 B)
  • 프로세스는 스레드보다 더 많은 컴퓨터 리소스를 소비합니다(여러 트레인을 사용하면 여러 캐리지보다 더 많은 리소스를 소비합니다)
  • 프로세스는 서로 영향을 미치지 않습니다. 하나의 스레드가 중단되면 전체 프로세스가 중단됩니다. 다른 열차에는 영향을 미치지 않지만 열차의 중간 객차에 불이 붙으면 열차의 모든 객차에 영향을 미칩니다)
  • 프로세스는 여러 기계로 확장될 수 있으며 프로세스는 최대 다중 코어에 적합합니다(다른 열차는 가능) 선로에서 동일한 열차의 객차가 서로 다른 선로에 있을 수 없습니다)
  • 프로세스에서 사용하는 메모리 주소는 잠길 수 있습니다. 즉, 스레드가 일부 공유 메모리를 사용하면 다른 스레드가 이를 기다려야 합니다. 이 메모리 조각을 사용하기 전에 종료해야 합니다. (예를 들어 기차의 화장실) - "뮤텍스"
  • 프로세스에서 사용되는 메모리 주소는 사용을 제한할 수 있습니다. (예를 들어 기차의 레스토랑에서는 최대 인원만 입장이 허용됩니다. 꽉 차서 문 앞에서 기다려야 하는 등 누군가 나와야 들어가야 한다) - "Semaphore"

Python Global Interpreter Lock GIL

Global Interpreter Lock(영어: Global Interpreter Lock, 약어 GIL)은 아님 Python의 기능으로 구현됩니다. Python 파서(CPython)에서 도입한 개념입니다. CPython은 대부분의 환경에서 기본 Python 실행 환경이기 때문입니다. 따라서 많은 사람들의 개념에서는 CPython이 Python이고 GIL이 Python 언어의 결함이라고 당연하게 여깁니다. 그렇다면 CPython 구현에서 GIL은 무엇입니까? 공식 설명을 살펴보겠습니다.

한 번에 하나의 스레드만 Python 바이트코드를 실행하도록 보장하기 위해 CPython 인터프리터에서 사용하는 메커니즘입니다. 이는 객체 모델(예: 중요한 내장 유형 포함)을 만들어 CPython 구현을 단순화합니다. dict) 전체 인터프리터를 잠그면 멀티 프로세서 시스템에서 제공하는 병렬 처리가 많이 희생되면서 인터프리터가 멀티 스레드를 더 쉽게 수행할 수 있습니다.

Python 코드 실행은 Python 가상 코드에 의해 수행됩니다. 머신(인터프리터 메인 루프, CPython 버전이라고도 함)을 제어하기 위해 Python은 원래 인터프리터의 메인 루프에서 동시에 하나의 스레드만 실행되도록 설계되었습니다. 즉, 언제든지 하나의 스레드만 인터프리터에서 실행됩니다. Python 가상 머신에 대한 액세스는 한 번에 하나의 스레드만 실행되도록 보장하는 GIL(Global Interpreter Lock)에 의해 제어됩니다.

Python 멀티스레딩 및 멀티프로세스 세부 정리

GIL의 장점은 무엇인가요? 간단히 말해서 단일 스레드 상황에서는 더 빠르고 C 라이브러리와 결합하면 더 편리하며 스레드 안전성 문제를 고려할 필요도 없습니다. 이는 초기 Python의 가장 일반적인 응용 시나리오이자 장점이기도 했습니다. 또한 GIL의 설계는 CPython의 구현을 단순화하여 사전과 같은 주요 내장 유형을 포함한 객체 모델을 암시적으로 동시에 액세스할 수 있게 만듭니다. 전역 인터프리터를 잠그면 멀티스레딩 지원을 구현하기가 더 쉬워지지만 멀티프로세서 호스트의 병렬 컴퓨팅 기능도 상실됩니다.

멀티 스레드 환경에서 Python 가상 머신은 다음과 같이 실행됩니다.

  1. GIL 설정
  2. 실행할 스레드로 전환
  3. 지정된 바이트 코드 명령어 수까지 실행하거나 스레드가 적극적으로 제어를 포기합니다. (sleep(0)을 호출할 수 있습니다)
  4. 스레드를 절전 상태로 설정합니다
  5. GIL 잠금 해제
  6. 위의 모든 단계를 다시 반복하세요

Python 멀티스레딩 및 멀티프로세스 세부 정리

Python3.2 이전에는 GIL의 릴리스 로직이 현재 스레드였습니다. IO 작업이 발생했거나 틱 수가 100에 도달하고(틱은 특별히 GIL에 사용되는 Python 자체의 카운터로 간주될 수 있으며 각 릴리스 후에 0으로 재설정됩니다. 이 수는 sys.setcheckinterval을 통해 조정할 수 있음) 릴리스됩니다. 컴퓨팅 집약적인 스레드는 GIL을 해제한 후 즉시 GIL을 신청하고 일반적으로 다른 스레드가 예약을 완료하기 전에 GIL을 다시 획득하기 때문에 컴퓨팅 집약적인 스레드는 매우 짧은 시간에 GIL을 획득하게 됩니다. GIL은 스레드 실행이 끝날 때까지 오랜 시간 동안 점유됩니다.

Python 3.2는 새로운 GIL을 사용하기 시작합니다. 새로운 GIL 구현에서는 고정된 시간 초과를 사용하여 현재 스레드에 전역 잠금을 포기하도록 지시합니다. 현재 스레드가 이 잠금을 보유하고 있고 다른 스레드가 이 잠금을 요청하면 현재 스레드는 5밀리초 후에 강제로 잠금을 해제하게 됩니다. 이번 개선으로 단일 코어의 경우 단일 스레드가 오랫동안 GIL을 점유하는 상황이 개선됐다.

단일 코어 CPU에서는 수백 번의 간격 검사로 인해 스레드 전환이 발생합니다. 멀티 코어 CPU에서는 심각한 스레드 스래싱이 발생합니다. GIL 잠금이 해제될 때마다 스레드는 잠금을 놓고 경쟁하고 스레드를 전환하여 리소스를 소비합니다. 단일 코어 아래에 여러 스레드가 있으면 GIL이 해제될 때마다 깨어난 스레드가 GIL 잠금을 얻을 수 있으므로 원활하게 실행할 수 있습니다. 그러나 멀티 코어에서는 CPU0이 GIL을 해제한 후 다른 CPU의 스레드가 경쟁합니다. 그러나 GIL은 CPU0에 의해 즉시 획득될 수 있으며, 이로 인해 다른 여러 CPU의 깨어난 스레드가 깨어나 예약 상태로 들어가기 전에 전환 시간을 기다리게 됩니다. 이로 인해 스레드 스래싱이 발생하여 효율성이 저하됩니다.

또한, 위의 구현 메커니즘에서 Python의 멀티스레딩이 CPU 집약적인 코드보다 IO 집약적인 코드에 더 친숙하다는 것을 추론할 수 있습니다.

GIL에 대한 대책:

  • 더 높은 버전의 Python 사용(GIL 메커니즘이 최적화됨)
  • 멀티 프로세스를 사용하여 멀티 스레딩 대체(멀티 프로세스 간에 GIL이 없지만 프로세스 자체가 더 많은 리소스를 소비함) )
  • 스레드를 실행하는 CPU 지정(어피니티 모듈 사용)
  • Jython 및 IronPython과 같은 GIL이 없는 인터프리터를 사용하세요.
  • 전체 IO 집약적인 작업에만 멀티스레딩을 사용하세요.
  • 코루틴을 사용하세요(효율적인 단일 스레드 모드, 마이크로 스레드라고도 함; 일반적으로 여러 프로세스와 함께 사용됨)
  • 주요 구성 요소를 C/C++의 Python 확장으로 작성하고 ctypes를 사용하여 Python 프로그램이 C 언어로 컴파일된 동적 링크 라이브러리의 내보낸 함수를 직접 호출하도록 합니다. . (GIL 제한 사항을 알리기 위해 nogil 사용)

Python의 멀티프로세싱 패키지 multiprocessing

Python의 스레딩 패키지는 주로 멀티스레딩 개발을 사용하지만 GIL의 존재로 인해 Python의 멀티스레딩은 실제로 멀티스레딩이 아닙니다. 멀티 코어 CPU의 리소스를 최대한 활용하려면 대부분의 경우 여러 프로세스를 사용해야 합니다. 멀티프로세싱 패키지는 Python 2.6에서 도입되었습니다. 이 패키지는 마이그레이션을 용이하게 하기 위해 스레딩에서 제공하는 인터페이스 세트를 완전히 복제합니다. 유일한 차이점은 다중 스레드 대신 다중 프로세스를 사용한다는 것입니다. 각 프로세스에는 독립적인 GIL이 있으므로 프로세스 간에 GIL 경합이 발생하지 않습니다.

이 다중 처리를 사용하면 단일 프로세스에서 동시 실행으로의 변환을 쉽게 완료할 수 있습니다. 멀티프로세싱은 하위 프로세스, 통신 및 데이터 공유를 지원하고 다양한 형태의 동기화를 수행하며 프로세스, 큐, 파이프 및 잠금과 같은 구성 요소를 제공합니다.

멀티프로세싱의 배경

파이썬의 GIL을 처리하는 것 외에도 멀티프로세싱을 하는 또 다른 이유는 Windows 운영 체제와 Linux/Unix 시스템 간의 불일치 때문입니다.

Unix/Linux 운영 체제는 매우 특별한 fork() 시스템 호출을 제공합니다. 일반적인 함수는 한 번 호출하고 한 번 반환하지만, fork()는 한 번 호출하고 두 번 반환하는데, 운영체제가 현재 프로세스(부모 프로세스)(자식 프로세스)를 자동으로 복사한 후 각각 부모 프로세스와 자식 프로세스에 복사하기 때문이다. . 반품. 자식 프로세스는 항상 0을 반환하고, 부모 프로세스는 자식 프로세스의 ID를 반환합니다. 그 이유는 부모 프로세스가 많은 자식 프로세스를 분기할 수 있으므로 부모 프로세스는 각 자식 프로세스의 ID를 기록해야 하고 자식 프로세스는 부모 프로세스의 ID를 가져오기 위해 getpid()만 호출하면 되기 때문입니다.

Python의 os 모듈은 Python 프로그램에서 하위 프로세스를 쉽게 생성할 수 있는 fork를 포함한 일반적인 시스템 호출을 캡슐화합니다.

import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

Linux, Unix 및 Mac에서 위 코드의 실행 결과는 다음과 같습니다.

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

Fork 호출을 사용하는 경우 프로세스가 새 작업을 받으면 새 작업을 처리하기 위해 하위 프로세스를 복사할 수 있습니다. 일반적인 Apache 서버에는 새 http 요청이 있을 때마다 이를 처리하기 위해 하위 프로세스가 분기됩니다. http 요청.

Windows에는 포크 호출이 없으므로 위 코드는 Windows에서 실행될 수 없습니다. Python은 크로스 플랫폼이므로 자연스럽게 크로스 플랫폼 다중 프로세스 지원을 제공해야 합니다. 다중 처리 모듈은 다중 프로세스 모듈의 크로스 플랫폼 버전입니다. 다중 처리 모듈은 fork() 호출을 캡슐화하므로 우리는 fork()의 세부 사항에 주의를 기울일 필요가 없습니다. Windows에는 포크 호출이 없으므로 다중 처리에서는 포크 효과를 "시뮬레이트"해야 합니다.

다중 처리의 공통 구성 요소 및 기능

Python 멀티스레딩 및 멀티프로세스 세부 정리

관리 프로세스 모듈 생성:

  • 프로세스(프로세스 생성에 사용)
  • 풀(관리 프로세스 풀 생성에 사용)
  • 큐(프로세스 통신, 리소스 공유에 사용) )
  • Value, Array(프로세스 통신, 리소스 공유용)
  • Pipe(파이프 통신용)
  • Manager(리소스 공유용)

동기화 하위 프로세스 모듈:

  • Condition(조건변수)
  • 이벤트 (이벤트)
  • Lock(뮤텍스 잠금)
  • RLock(재진입 뮤텍스 잠금(동일한 프로세스가 차단을 일으키지 않고 여러 번 얻을 수 있음)
  • Semaphore(세마포어)

Connect 각 구성요소와 기능을 함께 사용하는 방법을 알아봅시다.

Process(프로세스 생성에 사용)

Multiprocessing 모듈은 프로세스 객체를 나타내는 Process 클래스를 제공합니다.

Multiprocessing에서 각 프로세스는 Process 클래스로 표현됩니다.

구성 방법: Process([group [, target. [, 이름 [, args [, kwargs]]]]))

  • group:分组,实际上不使用,值始终为None
  • target:表示调用对象,即子进程要执行的任务,你可以传入方法名
  • name:为子进程设定名称
  • args:要传给target函数的位置参数,以元组方式进行传入。
  • kwargs:要传给target函数的字典参数,以字典方式进行传入。

实例方法:

  • start():启动进程,并调用该子进程中的p.run()
  • run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
  • terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
  • is_alive():返回进程是否在运行。如果p仍然运行,返回True
  • join([timeout]):进程同步,主进程等待子进程完成后再执行后面的代码。线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间(超过这个时间,父线程不再等待子线程,继续往下执行),需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

属性介绍:

  •  daemon:默认值为False,如果设为True,代表p为后台运行的守护进程;当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程;必须在p.start()之前设置
  • name:进程的名称
  • pid:进程的pid
  • exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
  •  authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

使用示例:(注意:在windows中Process()必须放到if name == ‘main’:下)

from multiprocessing import Process
import os
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')

Pool(用于创建管理进程池)

Python 멀티스레딩 및 멀티프로세스 세부 정리

Pool类用于需要执行的目标很多,而手动限制进程数量又太繁琐时,如果目标少且不用控制进程数量则可以用Process类。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

构造方法:Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

  • processes :要创建的进程数,如果省略,将默认使用cpu_count()返回的数量。
  • initializer:每个工作进程启动时要执行的可调用对象,默认为None。如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
  • initargs:是要传给initializer的参数组。
  • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
  • context: 用在制定工作进程启动时的上下文,一般使用Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。

实例方法:

  • apply(func[, args[, kwargs]]):在一个池工作进程中执行func(args,*kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()。它是阻塞的。apply很少使用
  •  apply_async(func[, arg[, kwds={}[, callback=None]]]):在一个池工作进程中执行func(args,*kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。它是非阻塞。
  • map(func, iterable[, chunksize=None]):Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。
  • map_async(func, iterable[, chunksize=None]):map_async与map的关系同apply与apply_async
  • imap():imap 与 map的区别是,map是当所有的进程都已经执行完了,并将结果返回了,imap()则是立即返回一个iterable可迭代对象。
  • imap_unordered():不保证返回的结果顺序与进程添加的顺序一致。
  • close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成。
  •  join():等待所有工作进程退出。此方法只能在close()或teminate()之后调用,让其不再接受新的Process。
  • terminate():结束工作进程,不再处理未处理的任务。

方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法:

  • get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发异常。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
  • ready():如果调用完成,返回True
  • successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
  • wait([timeout]):等待结果变为可用。
  • terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
<span style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># -*- coding:utf-8 -*-</span><br><span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">Queue(用于进程通信,资源共享)</span><br><span style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># Pool+map</span><br><span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">from</span> <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">multiprocessing</span> <span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">import</span> <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">Pool</span><br><span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">def</span> <span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">test</span>(<span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>):<br><span style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">print</span>(<span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>)<br><span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">if</span> <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">__name__</span> <span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">==</span> <span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">"__main__"</span>:<br><span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">lists</span> <span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">=</span> <span style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">range</span>(<span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">100</span>)<br><span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span> <span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">=</span> <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">Pool</span>(<span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">8</span>)<br><span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">map</span>(<span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">test</span>, <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">lists</span>)<br><span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">close</span>()<br><span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">join</span>()<br>
<span style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># -*- coding:utf-8 -*-</span><br><span style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># 异步进程池(非阻塞)</span><br><span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">from</span> <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">multiprocessing</span> <span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">import</span> <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">Pool</span><br><span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">def</span> <span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">test</span>(<span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>):<br><span style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">print</span>(<span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>)<br><span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">if</span> <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">__name__</span> <span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">==</span> <span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">"__main__"</span>:<br><span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span> <span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">=</span> <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">Pool</span>(<span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">8</span>)<br><span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">for</span> <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span> <span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">in</span> <span style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">range</span>(<span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">100</span>):<br><span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">'''</span><br><span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">For循环中执行步骤:</span><br><span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">(1)循环遍历,将100个子进程添加到进程池(相对父进程会阻塞)</span><br><span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">(2)每次执行8个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)</span><br><span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">apply_async为异步进程池写法。异步指的是启动子进程的过程,与父进程本身的执行(print)是异步的,而For循环中往进程池添加子进程的过程,与父进程本身的执行却是同步的。</span><br><span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">'''</span><br><span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">apply_async</span>(<span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">test</span>, <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">args</span><span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">=</span>(<span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>,))<span style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># 维持执行的进程总数为8,当一个进程执行完后启动一个新进程.</span><br><span style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">print</span>(<span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">"test"</span>)<br><span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">close</span>()<br><span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">join</span>()<br>
<span style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># -*- coding:utf-8 -*-</span><br><span style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># 异步进程池(非阻塞)</span><br><span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">from</span> <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">multiprocessing</span> <span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">import</span> <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">Pool</span><br><span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">def</span> <span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">test</span>(<span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>):<br><span style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">print</span>(<span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>)<br><span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">if</span> <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">__name__</span> <span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">==</span> <span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">"__main__"</span>:<br><span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span> <span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">=</span> <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">Pool</span>(<span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">8</span>)<br><span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">for</span> <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span> <span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">in</span> <span style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">range</span>(<span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">100</span>):<br><span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">'''</span><br><span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">实际测试发现,for循环内部执行步骤:</span><br><span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">(1)遍历100个可迭代对象,往进程池放一个子进程</span><br><span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">(2)执行这个子进程,等子进程执行完毕,再往进程池放一个子进程,再执行。(同时只执行一个子进程)</span><br><span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">for循环执行完毕,再执行print函数。</span><br><span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">'''</span><br><span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">apply</span>(<span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">test</span>, <span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">args</span><span style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">=</span>(<span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>,))<span style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># 维持执行的进程总数为8,当一个进程执行完后启动一个新进程.</span><br><span style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">print</span>(<span style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">"test"</span>)<br><span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">close</span>()<br><span style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">join</span>()<br>

Queue(用于进程通信,资源共享)

在使用多进程的过程中,最好不要使用共享资源。普通的全局变量是不能被子进程所共享的,只有通过Multiprocessing组件构造的数据结构可以被共享。

Queue是用来创建进程间资源共享的队列的类,使用Queue可以达到多进程间数据传递的功能(缺点:只适用Process类,不能在Pool进程池中使用)。

构造方法:Queue([maxsize])

  • maxsize是队列中允许最大项数,省略则无大小限制。

实例方法:

  • put():用以插入数据到队列。put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
  • get():可以从队列读取并且删除一个元素。get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。若不希望在empty的时候抛出异常,令blocked为True或者参数全部置空即可。
  • get_nowait():同q.get(False)
  • put_nowait():同q.put(False)
  • empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
  • full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
  • qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

使用示例:

from multiprocessing import Process, Queue
import os, time, random
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__ == "__main__":
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()# 等待pw结束
pr.terminate()# pr进程里是死循环,无法等待其结束,只能强行终止

JoinableQueue就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

构造方法:JoinableQueue([maxsize])

  • maxsize:队列中允许最大项数,省略则无大小限制。

实例方法

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:

  • task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
  • join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

使用示例:

# -*- coding:utf-8 -*-
from multiprocessing import Process, JoinableQueue
import time, random
def consumer(q):
while True:
res = q.get()
print('消费者拿到了 %s' % res)
q.task_done()
def producer(seq, q):
for item in seq:
time.sleep(random.randrange(1,2))
q.put(item)
print('生产者做好了 %s' % item)
q.join()
if __name__ == "__main__":
q = JoinableQueue()
seq = ('产品%s' % i for i in range(5))
p = Process(target=consumer, args=(q,))
p.daemon = True# 设置为守护进程,在主线程停止时p也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
p.start()
producer(seq, q)
print('主线程')

Value,Array(用于进程通信,资源共享)

multiprocessing 中Value和Array的实现原理都是在共享内存中创建ctypes()对象来达到共享数据的目的,两者实现方法大同小异,只是选用不同的ctypes数据类型而已。

Value

构造方法:Value((typecode_or_type, args[, lock])

  • typecode_or_type:定义ctypes()对象的类型,可以传Type code或 C Type,具体对照表见下文。
  • args:传递给typecode_or_type构造函数的参数
  • lock:默认为True,创建一个互斥锁来限制对Value对象的访问,如果传入一个锁,如Lock或RLock的实例,将用于同步。如果传入False,Value的实例就不会被锁保护,它将不是进程安全的。

typecode_or_type支持的类型:

| Type code | C Type | Python Type | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'` | signed char| int | 1 |
| `'B'` | unsigned char| int | 1 |
| `'u'` | Py_UNICODE | Unicode character | 2 |
| `'h'` | signed short | int | 2 |
| `'H'` | unsigned short | int | 2 |
| `'i'` | signed int | int | 2 |
| `'I'` | unsigned int | int | 2 |
| `'l'` | signed long| int | 4 |
| `'L'` | unsigned long| int | 4 |
| `'q'` | signed long long | int | 8 |
| `'Q'` | unsigned long long | int | 8 |
| `'f'` | float| float | 4 |
| `'d'` | double | float | 8 |

参考地址:https://docs.python.org/3/library/array.html

Array

构造方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])

  • typecode_or_type:同上
  •  size_or_initializer:如果它是一个整数,那么它确定数组的长度,并且数组将被初始化为零。否则,size_or_initializer是用于初始化数组的序列,其长度决定数组的长度。
  • kwds:传递给typecode_or_type构造函数的参数
  • lock:同上

使用示例:

import multiprocessing
def f(n, a):
n.value = 3.14
a[0] = 5
if __name__ == '__main__':
num = multiprocessing.Value('d', 0.0)
arr = multiprocessing.Array('i', range(10))
p = multiprocessing.Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])

注意:Value和Array只适用于Process类。

Pipe(用于管道通信)

多进程还有一种数据传递方式叫管道原理和 Queue相同。Pipe可以在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道。

构造方法:Pipe([duplex])

  • dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。

实例方法:

  • send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
  • recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
  • close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
  • fileno():返回连接使用的整数文件描述符
  • poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
  • recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
  • send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收
  • recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

使用示例:

from multiprocessing import Process, Pipe
import time
# 子进程执行方法
def f(Subconn):
time.sleep(1)
Subconn.send("吃了吗")
print("来自父亲的问候:", Subconn.recv())
Subconn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe()# 创建管道两端
p = Process(target=f, args=(child_conn,))# 创建子进程
p.start()
print("来自儿子的问候:", parent_conn.recv())
parent_conn.send("嗯")

Manager(用于资源共享)

Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。Manager模块常与Pool模块一起使用。

Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

管理器是独立运行的子进程,其中存在真实的对象,并以服务器的形式运行,其他进程通过使用代理访问共享对象,这些代理作为客户端运行。Manager()是BaseManager的子类,返回一个启动的SyncManager()实例,可用于创建共享对象并返回访问这些共享对象的代理。

BaseManager,创建管理器服务器的基类

构造方法:BaseManager([address[, authkey]])

  • address:(hostname,port),指定服务器的网址地址,默认为简单分配一个空闲的端口
  •  authkey:连接到服务器的客户端的身份验证,默认为current_process().authkey的值

实例方法:

  • start([initializer[, initargs]]):启动一个单独的子进程,并在该子进程中启动管理器服务器
  • get_server():获取服务器对象
  • connect():连接管理器对象
  • shutdown():关闭管理器对象,只能在调用了start()方法之后调用

实例属性:

  • address:只读属性,管理器服务器正在使用的地址

SyncManager,以下类型均不是进程安全的,需要加锁..

实例方法:

  • Array(self,*args,**kwds)
  • BoundedSemaphore(self,*args,**kwds)
  • Condition(self,*args,**kwds)
  • Event(self,*args,**kwds)
  • JoinableQueue(self,*args,**kwds)
  • Lock(self,*args,**kwds)
  • Namespace(self,*args,**kwds)
  • Pool(self,*args,**kwds)
  • Queue(self,*args,**kwds)
  • RLock(self,*args,**kwds)
  •  Semaphore(self,*args,**kwds)
  • Value(self,*args,**kwds)
  • dict(self,*args,**kwds)
  • list(self,*args,**kwds)

使用示例:

import multiprocessing
def f(x, arr, l, d, n):
x.value = 3.14
arr[0] = 5
l.append('Hello')
d[1] = 2
n.a = 10
if __name__ == '__main__':
server = multiprocessing.Manager()
x = server.Value('d', 0.0)
arr = server.Array('i', range(10))
l = server.list()
# 子进程执行方法
def f(Subconn):
time.sleep(1)
Subconn.send("吃了吗")
print("来自父亲的问候:", Subconn.recv())
print(x.value)
print(arr)
print(l)
print(d)
print(n)

同步子进程模块

Lock(互斥锁)

Lock锁的作用是当多个进程需要访问共享资源的时候,避免访问的冲突。加锁保证了多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,牺牲了速度但保证了数据安全。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。

构造方法:Lock()

实例方法:

  • acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。
  • release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。

使用示例:

from multiprocessing import Process, Lock
def l(lock, num):
lock.acquire()
print("Hello Num: %s" % (num))
lock.release()
if __name__ == '__main__':
lock = Lock()# 这个一定要定义为全局
for num in range(20):
Process(target=l, args=(lock, num)).start()

RLock(可重入的互斥锁(同一个进程可以多次获得它,同时不会造成阻塞)

RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。

构造方法:RLock()

实例方法:

  • acquire([timeout]):同Lock
  • release(): 同Lock

Semaphore(信号量)

信号量是一个更高级的锁机制。信号量内部有一个计数器而不像锁对象内部有锁标识,而且只有当占用信号量的线程数超过信号量时线程才阻塞。这允许了多个线程可以同时访问相同的代码区。比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。一旦释放,就有人可以获得一把锁。

构造方法:Semaphore([value])

  • value:设定信号量,默认值为1

实例方法:

  • acquire([timeout]):同Lock
  • release(): 同Lock

使用示例:

from multiprocessing import Process, Semaphore
import time, random
def go_wc(sem, user):
sem.acquire()
print('%s 占到一个茅坑' % user)
time.sleep(random.randint(0, 3))
sem.release()
print(user, 'OK')
if __name__ == '__main__':
sem = Semaphore(2)
p_l = []
for i in range(5):
p = Process(target=go_wc, args=(sem, 'user%s' % i,))
p.start()
p_l.append(p)
for i in p_l:
i.join()

Condition(条件变量)

可以把Condition理解为一把高级的锁,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。Condition在内部维护一个锁对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。Condition也提供了acquire, release方法,其含义与锁的acquire, release方法一致,其实它只是简单的调用内部锁对象的对应的方法而已。Condition还提供了其他的一些方法。

构造方法:Condition([lock/rlock])

  • 可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。

实例方法:

  • acquire([timeout]):首先进行acquire,然后判断一些条件。如果条件不满足则wait
  • release():释放 Lock
  • wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。处于wait状态的线程接到通知后会重新判断条件。
  • notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
  • notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

使用示例:

import multiprocessing
import time
def stage_1(cond):
"""perform first stage of work,
then notify stage_2 to continue
"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
print('{} done and ready for stage 2'.format(name))
cond.notify_all()
def stage_2(cond):
"""wait for the condition telling us stage_1 is done"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
cond.wait()
print('{} running'.format(name))
if __name__ == '__main__':
condition = multiprocessing.Condition()
s1 = multiprocessing.Process(name='s1',
 target=stage_1,
 args=(condition,))
s2_clients = [
multiprocessing.Process(
name='stage_2[{}]'.format(i),
target=stage_2,
args=(condition,),
)
for i in range(1, 3)
]
for c in s2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()
for c in s2_clients:
c.join()

Event(事件)

Event内部包含了一个标志位,初始的时候为false。可以使用set()来将其设置为true;或者使用clear()将其从新设置为false;可以使用is_set()来检查标志位的状态;另一个最重要的函数就是wait(timeout=None),用来阻塞当前线程,直到event的内部标志位被设置为true或者timeout超时。如果内部标志位为true则wait()函数理解返回。

使用示例:

import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print('wait_for_event: starting')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(
name='block',
target=wait_for_event,
args=(e,),
)
w1.start()
w2 = multiprocessing.Process(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
e.set()
print('main: event is set')

其他内容

multiprocessing.dummy 模块与 multiprocessing 模块的区别:dummy 模块是多线程,而 multiprocessing 是多进程, api 都是通用的。所有可以很方便将代码在多线程和多进程之间切换。multiprocessing.dummy通常在IO场景可以尝试使用,比如使用如下方式引入线程池。

from multiprocessing.dummy import Pool as ThreadPool

multiprocessing.dummy与早期的threading,不同的点好像是在多多核CPU下,只绑定了一个核心(具体未考证)。

参考文档:

Python并发之concurrent.futures

Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码。从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。concurrent.futures基础模块是executor和future。

Executor

Executor是一个抽象类,它不能被直接使用。它为具体的异步执行定义了一些基本的方法。ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码。

ThreadPoolExecutor对象

ThreadPoolExecutor类是Executor子类,使用线程池执行异步调用。

class concurrent.futures.ThreadPoolExecutor(max_workers)

使用max_workers数目的线程池执行异步调用。

ProcessPoolExecutor对象

ThreadPoolExecutor类是Executor子类,使用进程池执行异步调用。

class concurrent.futures.ProcessPoolExecutor(max_workers=None)

使用max_workers数目的进程池执行异步调用,如果max_workers为None则使用机器的处理器数目(如4核机器max_worker配置为None时,则使用4个进程进行异步并发)。

submit()方法

Executor中定义了submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future对象代表的就是给定的调用。

Executor.submit(fn, *args, **kwargs)

  • fn:需要异步执行的函数
  • *args, **kwargs:fn参数

使用示例:

from concurrent import futures
def test(num):
import time
return time.ctime(), num
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test, 1)
print(future.result())

map()方法

除了submit,Exectuor还为我们提供了map方法,这个方法返回一个map(func, *iterables)迭代器,迭代器中的回调执行返回的结果有序的。

Executor.map(func, *iterables, timeout=None)

  • func:需要异步执行的函数
  • *iterables:可迭代对象,如列表等。每一次func执行,都会从iterables中取参数。
  • timeout:设置每次异步操作的超时时间,timeout的值可以是int或float,如果操作超时,会返回raisesTimeoutError;如果不指定timeout参数,则不设置超时间。

使用示例:

from concurrent import futures
def test(num):
import time
return time.ctime(), num
data = [1, 2, 3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
for future in executor.map(test, data):
print(future)

shutdown()方法

释放系统资源,在Executor.submit()或 Executor.map()等异步操作后调用。使用with语句可以避免显式调用此方法。

Executor.shutdown(wait=True)

Future

Future可以理解为一个在未来完成的操作,这是异步编程的基础。通常情况下,我们执行io操作,访问url时(如下)在等待结果返回之前会产生阻塞,cpu不能做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。

Future类封装了可调用的异步执行。Future 实例通过 Executor.submit()方法创建。

  • cancel():试图取消调用。如果调用当前正在执行,并且不能被取消,那么该方法将返回False,否则调用将被取消,方法将返回True。
  • cancelled():如果成功取消调用,返回True。
  •  running():如果调用当前正在执行并且不能被取消,返回True。
  • done():如果调用成功地取消或结束了,返回True。
  • result(timeout=None):返回调用返回的值。如果调用还没有完成,那么这个方法将等待超时秒。如果调用在超时秒内没有完成,那么就会有一个Futures.TimeoutError将报出。timeout可以是一个整形或者浮点型数值,如果timeout不指定或者为None,等待时间无限。如果futures在完成之前被取消了,那么 CancelledError 将会报出。
  • exception(timeout=None):返回调用抛出的异常,如果调用还未完成,该方法会等待timeout指定的时长,如果该时长后调用还未完成,就会报出超时错误futures.TimeoutError。timeout可以是一个整形或者浮点型数值,如果timeout不指定或者为None,等待时间无限。如果futures在完成之前被取消了,那么 CancelledError 将会报出。如果调用完成并且无异常报出,返回None.
  • add_done_callback(fn):将可调用fn捆绑到future上,当Future被取消或者结束运行,fn作为future的唯一参数将会被调用。如果future已经运行完成或者取消,fn将会被立即调用。
  • wait(fs, timeout=None, return_when=ALL_COMPLETED)
  •  等待fs提供的 Future 实例(possibly created by different Executor instances) 运行结束。返回一个命名的2元集合,分表代表已完成的和未完成的
  •  return_when 表明什么时候函数应该返回。它的值必须是一下值之
  • FIRST_COMPLETED :函数在任何future结束或者取消的时候返回。
  • FIRST_EXCEPTION :函数在任何future因为异常结束的时候返回,如果没有future报错,效果等于
  •  ALL_COMPLETED :函数在所有future结束后才会返回。
  • as_completed(fs, timeout=None):参数是一个 Future 实例列表,返回值是一个迭代器,在运行结束后产出 Future实例 。

使用示例:

from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_5_secs(num):
sleep(randint(1, 5))
return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
futures.append(pool.submit(return_after_5_secs, x))
print(1)
for x in as_completed(futures):
print(x.result())
print(2)

参考链接:

위 내용은 Python 멀티스레딩 및 멀티프로세스 세부 정리의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 51cto.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제