Heim > Artikel > Backend-Entwicklung > Python-Thread-Pool/Prozess-Pool für gleichzeitige Programmierung
Die Python-Standardbibliothek stellt uns Threading- und Multiprocessing-Module zur Verfügung, um entsprechenden Multithreading-/Multiprozesscode zu schreiben. Wenn das Projekt jedoch eine bestimmte Größe erreicht, kommt es häufig zur Erstellung/Zerstörung von Prozessen oder Threads sind sehr ressourcenintensiv. Ja, zu diesem Zeitpunkt müssen wir unseren eigenen Thread-Pool/Prozesspool schreiben, um Platz gegen Zeit zu tauschen. Aber ab Python 3.2 stellt uns die Standardbibliothek das Modul concurrent.futures zur Verfügung, das zwei Klassen bereitstellt: ThreadPoolExecutor und ProcessPoolExecutor, die eine weitere Abstraktion von Threading und Multiprocessing ermöglichen direkte Unterstützung für das Schreiben von Thread-Pools/Prozess-Pools.
concurrent.futures-Modul basiert auf Executor ist eine abstrakte Klasse und kann nicht direkt verwendet werden. Allerdings sind die beiden bereitgestellten Unterklassen ThreadPoolExecutor und ProcessPoolExecutor sehr nützlich. Wie der Name schon sagt, werden sie zum Erstellen von Thread-Pool- bzw. Prozess-Pool-Codes verwendet. Wir können die entsprechenden Aufgaben direkt in den Thread-Pool/Prozesspool stellen und müssen uns nicht um Deadlocks kümmern. Der Thread-Pool/Prozesspool plant dies automatisch für uns.
ZukunftIch glaube, dass Freunde mit Programmiererfahrung in Java und NodeJS mit diesem Konzept vertraut sein werden Sie können es als eine in der Zukunft abgeschlossene Operation verstehen. Dies ist die Grundlage der asynchronen Programmierung. Wenn wir beispielsweise queue.get ausführen, kommt es zu einer Blockierung, bevor auf die Rückgabe des Ergebnisses gewartet wird, und die CPU kann nicht für andere Aufgaben freigegeben werden Future hilft uns, die Aufgabe während der Wartezeit abzuschließen. Bezüglich asynchroner E/A in Python können Sie nach dem Lesen dieses Artikels auf meine Coroutine/asynchrone E/A für die gleichzeitige Programmierung in Python verweisen.
ps.: Wenn Sie immer noch bei Python2.x bleiben, installieren Sie bitte zuerst das Futures-Modul.
pip install futures
Lassen Sie uns zunächst das Konzept des Thread-Pools anhand des folgenden Codes verstehen.
# example1.py from concurrent.futures import ThreadPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ThreadPoolExecutor(max_workers=2) # 创建一个最大可容纳2个task的线程池 future1 = pool.submit(return_future_result, ("hello")) # 往线程池里面加入一个task future2 = pool.submit(return_future_result, ("world")) # 往线程池里面加入一个task print(future1.done()) # 判断task1是否结束 time.sleep(3) print(future2.done()) # 判断task2是否结束 print(future1.result()) # 查看task1返回的结果 print(future2.result()) # 查看task2返回的结果
Lassen Sie uns analysieren es basiert auf den Laufergebnissen. Wir verwenden die Methode submit, um eine Aufgabe zum Thread-Pool hinzuzufügen, und „submit“ gibt ein Future-Objekt zurück. Das Future-Objekt kann einfach als eine in der Zukunft abgeschlossene Operation verstanden werden. In der ersten Druckanweisung ist es offensichtlich, dass unsere Zukunft1 aufgrund von time.sleep(2) nicht abgeschlossen wurde, da wir time.sleep(3) verwendet haben, um den Hauptthread anzuhalten. Wenn wir also zur zweiten Druckanweisung kommen, Unser Thread-Pool Alle Aufgaben hier wurden abgeschlossen.
ziwenxie :: ~ » python example1.py False True hello world # 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行 ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8361 7557 8361 3 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8362 0 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8363 0 3 19:45 pts/0 00:00:00 python example1.py
Wir können den obigen Code auch in eine Prozesspoolform umschreiben. Die API und der Threadpool sind genau gleich, daher werde ich nicht wortreich sein.
# example2.py from concurrent.futures import ProcessPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ProcessPoolExecutor(max_workers=2) future1 = pool.submit(return_future_result, ("hello")) future2 = pool.submit(return_future_result, ("world")) print(future1.done()) time.sleep(3) print(future2.done()) print(future1.result()) print(future2.result())
Das Folgende sind die laufenden Ergebnisse
ziwenxie :: ~ » python example2.py False True hello world ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8560 7557 8560 3 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8563 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8564 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8561 8560 8561 0 1 19:53 pts/0 00:00:00 python example2.py ziwenxie 8562 8560 8562 0 1 19:53 pts/0 00:00:00 python example2.py
Zusätzlich zum Senden bietet Executor auch Folgendes an Wir verwenden die Kartenmethode, ähnlich der integrierten Kartenverwendung. Vergleichen wir den Unterschied zwischen den beiden anhand von zwei Beispielen.
# example3.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
Wie aus den laufenden Ergebnissen ersichtlich ist, wird as_completed nicht in der Reihenfolge der URLS-Listenelemente zurückgegeben.
ziwenxie :: ~ » python example3.py 'http://example.com/' page is 1270 byte 'https://api.github.com/' page is 2039 bytes 'http://httpbin.org' page is 12150 bytes
# example4.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: for url, data in zip(URLS, executor.map(load_url, URLS)): print('%r page is %d bytes' % (url, len(data)))
Wie aus den laufenden Ergebnissen ersichtlich ist, gibt Karte die Elemente in der Reihenfolge der URL-Liste und den Code zurück Das Schreiben ist einfacher und intuitiver und wir können eines entsprechend den spezifischen Anforderungen auswählen.
ziwenxie :: ~ » python example4.py 'http://httpbin.org' page is 12150 bytes 'http://example.com/' page is 1270 bytes 'https://api.github.com/' page is 2039 bytes
Die Wartemethode gibt ein Tupel zurück. Das Tupel enthält zwei Sätze, einer ist abgeschlossen, der andere ist unvollständig. Ein Vorteil der Verwendung der Wartemethode besteht darin, dass sie mehr Freiheit erhält. Sie erhält drei Parameter: FIRST_COMPLETED, FIRST_EXCEPTION und ALL_COMPLETED.
Sehen wir uns den Unterschied zwischen den drei Parametern anhand des folgenden Beispiels an
from concurrent.futures import ThreadPoolExecutor, wait, as_completed from time import sleep from random import randint def return_after_random_secs(num): sleep(randint(1, 5)) return "Return of {}".format(num) pool = ThreadPoolExecutor(5) futures = [] for x in range(5): futures.append(pool.submit(return_after_random_secs, x)) print(wait(futures)) # print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
Wenn der Standardwert ALL_COMPLETED verwendet wird, blockiert das Programm, bis alle Aufgaben im Thread-Pool abgeschlossen sind .
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f0b06c9bc88 state=finished returned str>, <Future at 0x7f0b06cbaa90 state=finished returned str>, <Future at 0x7f0b06373898 state=finished returned str>, <Future at 0x7f0b06352ba8 state=finished returned str>, <Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())
Wenn der Parameter FIRST_COMPLETED verwendet wird, wartet das Programm nicht, bis alle Aufgaben im Thread-Pool abgeschlossen sind.
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f84109edb00 state=finished returned str>, <Future at 0x7f840e2e9320 state=finished returned str>, <Future at 0x7f840f25ccc0 state=finished returned str>}, not_done={<Future at 0x7f840e2e9ba8 state=running>, <Future at 0x7f840e2e9940 state=running>})
Schreiben Sie ein kleines Programm, um die Ausführungseffizienzlücke zwischen multiprocessing.pool (ThreadPool) und ProcessPollExecutor (ThreadPoolExecutor) zu vergleichen, und überlegen Sie, warum sie basierend auf der Zukunft auftritt oben erwähnt Ein solches Ergebnis.
Das obige ist der detaillierte Inhalt vonPython-Thread-Pool/Prozess-Pool für gleichzeitige Programmierung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!