Python 표준 라이브러리는 해당 멀티스레딩/멀티프로세스 코드를 작성하기 위한 스레딩 및 멀티프로세싱 모듈을 제공합니다. 그러나 프로젝트가 특정 규모에 도달하면 프로세스의 생성/파괴가 자주 발생합니다. 스레드는 매우 리소스 집약적입니다. 예, 지금은 시간을 위해 공간을 교환하기 위해 자체 스레드 풀/프로세스 풀을 작성해야 합니다. 그러나 Python 3.2부터 표준 라이브러리는 ThreadPoolExecutor 및 ProcessPoolExecutor라는 두 가지 클래스를 제공하는 concurrent.futures 모듈을 제공하여 스레딩 및 다중 처리의 추가 추상화를 실현합니다. 스레드 풀/프로세스 풀 작성을 직접 지원합니다.
concurrent.futures 모듈은 Executor를 기반으로 하며 추상 클래스이므로 직접 사용할 수 없습니다. 그러나 이 클래스가 제공하는 두 하위 클래스 ThreadPoolExecutor 및 ProcessPoolExecutor는 이름에서 알 수 있듯이 각각 스레드 풀 및 프로세스 풀 코드를 생성하는 데 사용됩니다. 해당 작업을 스레드 풀/프로세스 풀에 직접 넣을 수 있으며, 교착 상태를 걱정하기 위해 대기열을 유지할 필요가 없습니다. 스레드 풀/프로세스 풀이 자동으로 이를 예약합니다.
FutureJava와 nodejs 프로그래밍 경험이 있는 친구들이라면 이 개념이 익숙할 거라 믿습니다. 미래에 완성되는 작업으로 이해하시면 됩니다. 이는 비동기 프로그래밍의 기본입니다. 예를 들어 queue.get을 작동하면 결과가 반환되기를 기다리기 전에 차단이 발생하고 CPU는 다른 작업을 수행할 수 없습니다. Future는 대기 기간 동안 작업을 완료하는 데 도움이 됩니다. Python의 비동기 IO에 대해서는 이 기사를 읽은 후 내 Python 동시 프로그래밍 코루틴/비동기 IO를 참조할 수 있습니다.
p.s: 여전히 Python2.x를 사용하고 있다면 futures 모듈을 먼저 설치하세요.
pip install futures
먼저 다음 코드를 통해 스레드 풀의 개념을 이해해 봅시다
# example1.py from concurrent.futures import ThreadPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ThreadPoolExecutor(max_workers=2) # 创建一个最大可容纳2个task的线程池 future1 = pool.submit(return_future_result, ("hello")) # 往线程池里面加入一个task future2 = pool.submit(return_future_result, ("world")) # 往线程池里面加入一个task print(future1.done()) # 判断task1是否结束 time.sleep(3) print(future2.done()) # 判断task2是否结束 print(future1.result()) # 查看task1返回的结果 print(future2.result()) # 查看task2返回的结果
다음 코드를 사용하겠습니다. 스레드 풀의 개념을 이해하기 위해 분석해 보겠습니다. submit 메소드를 사용하여 스레드 풀에 작업을 추가하고 submit은 Future 객체를 반환합니다. Future 객체는 간단히 미래에 완료되는 작업으로 이해될 수 있습니다. 첫 번째 print 문에서는 메인 스레드를 일시 중지하기 위해 time.sleep(3)을 사용했기 때문에 time.sleep(2) 때문에 future1이 완료되지 않았음이 분명합니다. 따라서 두 번째 print 문에 관해서는 다음과 같습니다. 스레드 풀 여기의 모든 작업이 완료되었습니다.
ziwenxie :: ~ » python example1.py False True hello world # 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行 ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8361 7557 8361 3 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8362 0 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8363 0 3 19:45 pts/0 00:00:00 python example1.py
위 코드를 프로세스 풀 형식으로 다시 작성할 수도 있습니다. API와 스레드 풀은 완전히 동일하므로 장황하게 설명하지 않겠습니다.
# example2.py from concurrent.futures import ProcessPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ProcessPoolExecutor(max_workers=2) future1 = pool.submit(return_future_result, ("hello")) future2 = pool.submit(return_future_result, ("world")) print(future1.done()) time.sleep(3) print(future2.done()) print(future1.result()) print(future2.result())
실행 결과는 다음과 같습니다
ziwenxie :: ~ » python example2.py False True hello world ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8560 7557 8560 3 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8563 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8564 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8561 8560 8561 0 1 19:53 pts/0 00:00:00 python example2.py ziwenxie 8562 8560 8562 0 1 19:53 pts/0 00:00:00 python example2.py
Executor에서는 submit 외에도 map 메소드를 제공합니다. 및 내장 map의 사용법은 비슷합니다. 두 가지 예를 통해 두 가지의 차이점을 비교해 보겠습니다.
# example3.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
실행 결과에서 알 수 있듯이 as_completed가 URLS 목록 요소 순서대로 반환되지 않습니다.
ziwenxie :: ~ » python example3.py 'http://example.com/' page is 1270 byte 'https://api.github.com/' page is 2039 bytes 'http://httpbin.org' page is 12150 bytes
# example4.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: for url, data in zip(URLS, executor.map(load_url, URLS)): print('%r page is %d bytes' % (url, len(data)))
실행 결과에서 알 수 있듯이 map은 URLS 목록의 순서대로 요소를 반환하며 작성된 코드가 더 간결해졌습니다. 그리고 직관적입니다. 귀하의 특정한 필요에 따라 어느 하나를 선택할 수 있습니다.
ziwenxie :: ~ » python example4.py 'http://httpbin.org' page is 12150 bytes 'http://example.com/' page is 1270 bytes 'https://api.github.com/' page is 2039 bytes
wait 메소드는 튜플을 두 개 포함하며, 하나는 완료되고 다른 하나는 완료되지 않습니다. 대기 방법을 사용하면 더 많은 자유를 얻을 수 있다는 것입니다. FIRST_COMPLETED, FIRST_EXCEPTION 및 ALL_COMPLETE의 세 가지 매개변수를 받습니다.
다음 예시를 통해 세 매개변수의 차이점을 살펴보겠습니다.
from concurrent.futures import ThreadPoolExecutor, wait, as_completed from time import sleep from random import randint def return_after_random_secs(num): sleep(randint(1, 5)) return "Return of {}".format(num) pool = ThreadPoolExecutor(5) futures = [] for x in range(5): futures.append(pool.submit(return_after_random_secs, x)) print(wait(futures)) # print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
기본값인 ALL_COMPLETED를 사용하면 스레드 풀의 모든 작업이 완료될 때까지 프로그램이 차단됩니다.
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f0b06c9bc88 state=finished returned str>, <Future at 0x7f0b06cbaa90 state=finished returned str>, <Future at 0x7f0b06373898 state=finished returned str>, <Future at 0x7f0b06352ba8 state=finished returned str>, <Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())
FIRST_COMPLETED 매개변수를 사용하면 프로그램은 스레드 풀의 모든 작업이 완료될 때까지 기다리지 않습니다.
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f84109edb00 state=finished returned str>, <Future at 0x7f840e2e9320 state=finished returned str>, <Future at 0x7f840f25ccc0 state=finished returned str>}, not_done={<Future at 0x7f840e2e9ba8 state=running>, <Future at 0x7f840e2e9940 state=running>})
multiprocessing.pool(ThreadPool)과 ProcessPollExecutor(ThreadPoolExecutor) 사이의 실행 효율성 격차를 비교하는 작은 프로그램을 작성하고 위에서 언급한 Future를 기반으로 왜 이런 일이 발생하는지 생각해 보세요. 결과.
위 내용은 Python 동시 프로그래밍 스레드 풀/프로세스 풀의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!