Python표준 라이브러리는 해당 멀티스레딩/멀티프로세스 코드를 작성할 수 있는 스레딩 및 멀티프로세싱 모듈을 제공하지만 프로젝트가 일정한 규모, 빈번한 프로세스나 스레드 생성/파괴는 많은 리소스를 소비합니다. 이때 공간을 시간과 교환하기 위해 자체 스레드 풀/프로세스 풀을 작성해야 합니다. 그러나 Python3.2부터 표준 라이브러리는 concurrent.futures 모듈을 제공합니다. 이 모듈은 ThreadPoolExecutor와 ProcessPoolExecutor라는 두 클래스를 제공합니다. 스레딩 및 다중 처리의 추가 추상화는 스레드 풀/프로세스 풀 작성을 직접 지원합니다.
concurrent.futures 모듈은 Executor를 기반으로 합니다. Executor는 직접 사용할 수 없는 추상 클래스입니다. 그러나 이 클래스가 제공하는 두 하위 클래스 ThreadPoolExecutor 및 ProcessPoolExecutor는 이름에서 알 수 있듯이 각각 스레드 풀 및 프로세스 풀 코드를 생성하는 데 사용됩니다. 해당 작업을 스레드 풀/프로세스 풀에 직접 넣을 수 있으며, 교착 상태를 걱정하기 위해 대기열을 유지할 필요가 없습니다. 스레드 풀/프로세스 풀이 자동으로 이를 예약합니다.
Future java, nodejs 프로그래밍 경험이 있는 친구들은 이 개념을 확실히 알 수 있을 거라 믿습니다. 사용 미래에 완료되는 작업으로 이해 이는 비동기 프로그래밍의 기본입니다. 예를 들어 queue.get을 작업하면 결과가 반환되기를 기다리기 전에 차단이 발생하며, CPU는 다른 작업을 수행하기 위해 해제될 수 없습니다. Future를 도입하면 기다리는 동안 다른 작업을 완료하는 데 도움이 됩니다. Python의 비동기 IO에 대해서는 이 기사를 읽은 후 내 Python 동시 프로그래밍 코루틴/비동기 IO를 참조할 수 있습니다.
p.s: 여전히 Python2.x를 사용하고 있다면 먼저 futures 모듈을 설치하세요.
pip install futuressubmit을 사용하여 스레드 풀/프로세스 풀 운영먼저 다음 코드를 통해 스레드 풀의 개념을 이해해 봅시다
# example1.py from concurrent.futures import ThreadPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ThreadPoolExecutor(max_workers=2) # 创建一个最大可容纳2个task的线程池 future1 = pool.submit(return_future_result, ("hello")) # 往线程池里面加入一个task future2 = pool.submit(return_future_result, ("world")) # 往线程池里面加入一个task print(future1.done()) # 判断task1是否结束 time.sleep(3) print(future2.done()) # 判断task2是否结束 print(future1.result()) # 查看task1返回的结果 print(future2.result()) # 查看task2返回的结果다음 코드를 사용하겠습니다. 스레드 풀의 개념을 이해하기 위해 분석해 보겠습니다. 스레드 풀에 작업을 추가하기 위해
submit 메소드를 사용합니다. submit은 Future 객체 를 반환합니다. 미래. 첫 번째 print 문에서는 메인 스레드를 일시 중지하기 위해 time.sleep(3)을 사용했기 때문에 time.sleep(2) 때문에 future1이 완료되지 않았음이 분명합니다. 따라서 두 번째 print 문에 관해서는 다음과 같습니다. 스레드 풀 여기의 모든 작업이 완료되었습니다.
ziwenxie :: ~ » python example1.py False True hello world # 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行 ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8361 7557 8361 3 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8362 0 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8363 0 3 19:45 pts/0 00:00:00 python example1.py위 코드를 프로세스 풀 형식으로 다시 작성할 수도 있습니다.
api는 스레드 풀과 완전히 동일하므로 장황하게 설명하지 않겠습니다.
# example2.py from concurrent.futures import ProcessPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ProcessPoolExecutor(max_workers=2) future1 = pool.submit(return_future_result, ("hello")) future2 = pool.submit(return_future_result, ("world")) print(future1.done()) time.sleep(3) print(future2.done()) print(future1.result()) print(future2.result())실행 결과는 다음과 같습니다
ziwenxie :: ~ » python example2.py False True hello world ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8560 7557 8560 3 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8563 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8564 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8561 8560 8561 0 1 19:53 pts/0 00:00:00 python example2.py ziwenxie 8562 8560 8562 0 1 19:53 pts/0 00:00:00 python example2.py
# example3.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
as_completed가 URLS 목록 요소 순서대로 반환되지 않습니다.
ziwenxie :: ~ » python example3.py 'http://example.com/' page is 1270 byte 'https://api.github.com/' page is 2039 bytes 'http://httpbin.org' page is 12150 bytesmap 사용
# example4.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: for url, data in zip(URLS, executor.map(load_url, URLS)): print('%r page is %d bytes' % (url, len(data)))
map은 URLS 목록의 순서대로 요소를 반환하며 작성된 코드가 더 간결해졌습니다. 그리고 직관적입니다. 귀하의 특정 필요에 따라 어느 하나를 선택할 수 있습니다.
ziwenxie :: ~ » python example4.py 'http://httpbin.org' page is 12150 bytes 'http://example.com/' page is 1270 bytes 'https://api.github.com/' page is 2039 bytes세 번째 옵션 waitwait 메서드는 튜플(tuple)을 반환합니다. 튜플에는 두 개의 다음 예시를 통해 세 매개변수의 차이점을 살펴보겠습니다.
from concurrent.futures import ThreadPoolExecutor, wait, as_completed from time import sleep from random import randint def return_after_random_secs(num): sleep(randint(1, 5)) return "Return of {}".format(num) pool = ThreadPoolExecutor(5) futures = [] for x in range(5): futures.append(pool.submit(return_after_random_secs, x)) print(wait(futures)) # print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))기본값인 ALL_COMPLETED를 사용하면 스레드 풀의 모든 작업이 완료될 때까지 프로그램이 차단됩니다.
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f0b06c9bc88 state=finished returned str>, <Future at 0x7f0b06cbaa90 state=finished returned str>, <Future at 0x7f0b06373898 state=finished returned str>, <Future at 0x7f0b06352ba8 state=finished returned str>, <Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())FIRST_COMPLETED 매개변수를 사용하면 프로그램은 스레드 풀의 모든 작업이 완료될 때까지 기다리지 않습니다.
아아아아
위 내용은 Python 동시 프로그래밍의 스레드 풀/프로세스 풀에 대한 자세한 소개의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!