>  기사  >  백엔드 개발  >  Python 동시 프로그래밍 스레드 풀/프로세스 풀

Python 동시 프로그래밍 스레드 풀/프로세스 풀

巴扎黑
巴扎黑원래의
2017-03-18 11:37:101843검색

소개

Python 표준 라이브러리는 해당 멀티스레딩/멀티프로세스 코드를 작성하기 위한 스레딩 및 멀티프로세싱 모듈을 제공합니다. 그러나 프로젝트가 특정 규모에 도달하면 프로세스의 생성/파괴가 자주 발생합니다. 스레드는 매우 리소스 집약적입니다. 예, 지금은 시간을 위해 공간을 교환하기 위해 자체 스레드 풀/프로세스 풀을 작성해야 합니다. 그러나 Python 3.2부터 표준 라이브러리는 ThreadPoolExecutor 및 ProcessPoolExecutor라는 두 가지 클래스를 제공하는 concurrent.futures 모듈을 제공하여 스레딩 및 다중 처리의 추가 추상화를 실현합니다. 스레드 풀/프로세스 풀 작성을 직접 지원합니다.

Executor와 Future

concurrent.futures 모듈은 Executor를 기반으로 하며 추상 클래스이므로 직접 사용할 수 없습니다. 그러나 이 클래스가 제공하는 두 하위 클래스 ThreadPoolExecutor 및 ProcessPoolExecutor는 이름에서 알 수 있듯이 각각 스레드 풀 및 프로세스 풀 코드를 생성하는 데 사용됩니다. 해당 작업을 스레드 풀/프로세스 풀에 직접 넣을 수 있으며, 교착 상태를 걱정하기 위해 대기열을 유지할 필요가 없습니다. 스레드 풀/프로세스 풀이 자동으로 이를 예약합니다.

FutureJava와 nodejs 프로그래밍 경험이 있는 친구들이라면 이 개념이 익숙할 거라 믿습니다. 미래에 완성되는 작업으로 이해하시면 됩니다. 이는 비동기 프로그래밍의 기본입니다. 예를 들어 queue.get을 작동하면 결과가 반환되기를 기다리기 전에 차단이 발생하고 CPU는 다른 작업을 수행할 수 없습니다. Future는 대기 기간 동안 작업을 완료하는 데 도움이 됩니다. Python의 비동기 IO에 대해서는 이 기사를 읽은 후 내 Python 동시 프로그래밍 코루틴/비동기 IO를 참조할 수 있습니다.

p.s: 여전히 Python2.x를 사용하고 있다면 futures 모듈을 먼저 설치하세요.

pip install futures

submit을 사용하여 스레드 풀/프로세스 풀 운영

먼저 다음 코드를 통해 스레드 풀의 개념을 이해해 봅시다

# example1.py
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ThreadPoolExecutor(max_workers=2)  # 创建一个最大可容纳2个task的线程池
future1 = pool.submit(return_future_result, ("hello"))  # 往线程池里面加入一个task
future2 = pool.submit(return_future_result, ("world"))  # 往线程池里面加入一个task
print(future1.done())  # 判断task1是否结束
time.sleep(3)
print(future2.done())  # 判断task2是否结束
print(future1.result())  # 查看task1返回的结果
print(future2.result())  # 查看task2返回的结果

다음 코드를 사용하겠습니다. 스레드 풀의 개념을 이해하기 위해 분석해 보겠습니다. submit 메소드를 사용하여 스레드 풀에 작업을 추가하고 submit은 Future 객체를 반환합니다. Future 객체는 간단히 미래에 완료되는 작업으로 이해될 수 있습니다. 첫 번째 print 문에서는 메인 스레드를 일시 중지하기 위해 time.sleep(3)을 사용했기 때문에 time.sleep(2) 때문에 future1이 완료되지 않았음이 분명합니다. 따라서 두 번째 print 문에 관해서는 다음과 같습니다. 스레드 풀 여기의 모든 작업이 완료되었습니다.

ziwenxie :: ~ » python example1.py
False
True
hello
world
# 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8361  7557  8361  3    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8362  0    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8363  0    3 19:45 pts/0    00:00:00 python example1.py

위 코드를 프로세스 풀 형식으로 다시 작성할 수도 있습니다. API와 스레드 풀은 완전히 동일하므로 장황하게 설명하지 않겠습니다.

# example2.py
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(return_future_result, ("hello"))
future2 = pool.submit(return_future_result, ("world"))
print(future1.done())
time.sleep(3)
print(future2.done())
print(future1.result())
print(future2.result())

실행 결과는 다음과 같습니다

ziwenxie :: ~ » python example2.py
False
True
hello
world
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8560  7557  8560  3    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8563  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8564  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8561  8560  8561  0    1 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8562  8560  8562  0    1 19:53 pts/0    00:00:00 python example2.py

map/wait를 사용하여 스레드 풀/프로세스 풀 작업

Executor에서는 submit 외에도 map 메소드를 제공합니다. 및 내장 map의 사용법은 비슷합니다. 두 가지 예를 통해 두 가지의 차이점을 비교해 보겠습니다.

submit 작업 사용 검토

# example3.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

실행 결과에서 알 수 있듯이 as_completed가 URLS 목록 요소 순서대로 반환되지 않습니다.

ziwenxie :: ~ » python example3.py
'http://example.com/' page is 1270 byte
'https://api.github.com/' page is 2039 bytes
'http://httpbin.org' page is 12150 bytes

map 사용

# example4.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for url, data in zip(URLS, executor.map(load_url, URLS)):
        print('%r page is %d bytes' % (url, len(data)))

실행 결과에서 알 수 있듯이 map은 URLS 목록의 순서대로 요소를 반환하며 작성된 코드가 더 간결해졌습니다. 그리고 직관적입니다. 귀하의 특정한 필요에 따라 어느 하나를 선택할 수 있습니다.

ziwenxie :: ~ » python example4.py
'http://httpbin.org' page is 12150 bytes
'http://example.com/' page is 1270 bytes
'https://api.github.com/' page is 2039 bytes

세 번째 옵션은 wait입니다

wait 메소드는 튜플을 두 개 포함하며, 하나는 완료되고 다른 하나는 완료되지 않습니다. 대기 방법을 사용하면 더 많은 자유를 얻을 수 있다는 것입니다. FIRST_COMPLETED, FIRST_EXCEPTION 및 ALL_COMPLETE의 세 가지 매개변수를 받습니다.

다음 예시를 통해 세 매개변수의 차이점을 살펴보겠습니다.

from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_random_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_random_secs, x))
print(wait(futures))
# print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))

기본값인 ALL_COMPLETED를 사용하면 스레드 풀의 모든 작업이 완료될 때까지 프로그램이 차단됩니다.

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f0b06c9bc88 state=finished returned str>,
<Future at 0x7f0b06cbaa90 state=finished returned str>,
<Future at 0x7f0b06373898 state=finished returned str>,
<Future at 0x7f0b06352ba8 state=finished returned str>,
<Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())

FIRST_COMPLETED 매개변수를 사용하면 프로그램은 스레드 풀의 모든 작업이 완료될 때까지 기다리지 않습니다.

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f84109edb00 state=finished returned str>,
<Future at 0x7f840e2e9320 state=finished returned str>,
<Future at 0x7f840f25ccc0 state=finished returned str>},
not_done={<Future at 0x7f840e2e9ba8 state=running>,
<Future at 0x7f840e2e9940 state=running>})

생각하는 질문

multiprocessing.pool(ThreadPool)과 ProcessPollExecutor(ThreadPoolExecutor) 사이의 실행 효율성 격차를 비교하는 작은 프로그램을 작성하고 위에서 언급한 Future를 기반으로 왜 이런 일이 발생하는지 생각해 보세요. 결과.

위 내용은 Python 동시 프로그래밍 스레드 풀/프로세스 풀의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.