Heim  >  Artikel  >  Backend-Entwicklung  >  Python-Thread-Pool/Prozess-Pool für gleichzeitige Programmierung

Python-Thread-Pool/Prozess-Pool für gleichzeitige Programmierung

巴扎黑
巴扎黑Original
2017-03-18 11:37:101833Durchsuche

Einführung

Die Python-Standardbibliothek stellt uns Threading- und Multiprocessing-Module zur Verfügung, um entsprechenden Multithreading-/Multiprozesscode zu schreiben. Wenn das Projekt jedoch eine bestimmte Größe erreicht, kommt es häufig zur Erstellung/Zerstörung von Prozessen oder Threads sind sehr ressourcenintensiv. Ja, zu diesem Zeitpunkt müssen wir unseren eigenen Thread-Pool/Prozesspool schreiben, um Platz gegen Zeit zu tauschen. Aber ab Python 3.2 stellt uns die Standardbibliothek das Modul concurrent.futures zur Verfügung, das zwei Klassen bereitstellt: ThreadPoolExecutor und ProcessPoolExecutor, die eine weitere Abstraktion von Threading und Multiprocessing ermöglichen direkte Unterstützung für das Schreiben von Thread-Pools/Prozess-Pools.

Executor und Future

concurrent.futures-Modul basiert auf Executor ist eine abstrakte Klasse und kann nicht direkt verwendet werden. Allerdings sind die beiden bereitgestellten Unterklassen ThreadPoolExecutor und ProcessPoolExecutor sehr nützlich. Wie der Name schon sagt, werden sie zum Erstellen von Thread-Pool- bzw. Prozess-Pool-Codes verwendet. Wir können die entsprechenden Aufgaben direkt in den Thread-Pool/Prozesspool stellen und müssen uns nicht um Deadlocks kümmern. Der Thread-Pool/Prozesspool plant dies automatisch für uns.

ZukunftIch glaube, dass Freunde mit Programmiererfahrung in Java und NodeJS mit diesem Konzept vertraut sein werden Sie können es als eine in der Zukunft abgeschlossene Operation verstehen. Dies ist die Grundlage der asynchronen Programmierung. Wenn wir beispielsweise queue.get ausführen, kommt es zu einer Blockierung, bevor auf die Rückgabe des Ergebnisses gewartet wird, und die CPU kann nicht für andere Aufgaben freigegeben werden Future hilft uns, die Aufgabe während der Wartezeit abzuschließen. Bezüglich asynchroner E/A in Python können Sie nach dem Lesen dieses Artikels auf meine Coroutine/asynchrone E/A für die gleichzeitige Programmierung in Python verweisen.

ps.: Wenn Sie immer noch bei Python2.x bleiben, installieren Sie bitte zuerst das Futures-Modul.

pip install futures

Verwenden Sie „Submit“, um den Thread-Pool/Prozess-Pool zu betreiben.

Lassen Sie uns zunächst das Konzept des Thread-Pools anhand des folgenden Codes verstehen.

# example1.py
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ThreadPoolExecutor(max_workers=2)  # 创建一个最大可容纳2个task的线程池
future1 = pool.submit(return_future_result, ("hello"))  # 往线程池里面加入一个task
future2 = pool.submit(return_future_result, ("world"))  # 往线程池里面加入一个task
print(future1.done())  # 判断task1是否结束
time.sleep(3)
print(future2.done())  # 判断task2是否结束
print(future1.result())  # 查看task1返回的结果
print(future2.result())  # 查看task2返回的结果

Lassen Sie uns analysieren es basiert auf den Laufergebnissen. Wir verwenden die Methode submit, um eine Aufgabe zum Thread-Pool hinzuzufügen, und „submit“ gibt ein Future-Objekt zurück. Das Future-Objekt kann einfach als eine in der Zukunft abgeschlossene Operation verstanden werden. In der ersten Druckanweisung ist es offensichtlich, dass unsere Zukunft1 aufgrund von time.sleep(2) nicht abgeschlossen wurde, da wir time.sleep(3) verwendet haben, um den Hauptthread anzuhalten. Wenn wir also zur zweiten Druckanweisung kommen, Unser Thread-Pool Alle Aufgaben hier wurden abgeschlossen.

ziwenxie :: ~ » python example1.py
False
True
hello
world
# 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8361  7557  8361  3    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8362  0    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8363  0    3 19:45 pts/0    00:00:00 python example1.py

Wir können den obigen Code auch in eine Prozesspoolform umschreiben. Die API und der Threadpool sind genau gleich, daher werde ich nicht wortreich sein.

# example2.py
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(return_future_result, ("hello"))
future2 = pool.submit(return_future_result, ("world"))
print(future1.done())
time.sleep(3)
print(future2.done())
print(future1.result())
print(future2.result())

Das Folgende sind die laufenden Ergebnisse

ziwenxie :: ~ » python example2.py
False
True
hello
world
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8560  7557  8560  3    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8563  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8564  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8561  8560  8561  0    1 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8562  8560  8562  0    1 19:53 pts/0    00:00:00 python example2.py

Verwenden Sie Map/Wait, um den Thread-Pool/Prozesspool zu betreiben

Zusätzlich zum Senden bietet Executor auch Folgendes an Wir verwenden die Kartenmethode, ähnlich der integrierten Kartenverwendung. Vergleichen wir den Unterschied zwischen den beiden anhand von zwei Beispielen.

Überprüfung der Verwendung des Absendevorgangs

# example3.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

Wie aus den laufenden Ergebnissen ersichtlich ist, wird as_completed nicht in der Reihenfolge der URLS-Listenelemente zurückgegeben.

ziwenxie :: ~ » python example3.py
'http://example.com/' page is 1270 byte
'https://api.github.com/' page is 2039 bytes
'http://httpbin.org' page is 12150 bytes

Karte verwenden

# example4.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for url, data in zip(URLS, executor.map(load_url, URLS)):
        print('%r page is %d bytes' % (url, len(data)))

Wie aus den laufenden Ergebnissen ersichtlich ist, gibt Karte die Elemente in der Reihenfolge der URL-Liste und den Code zurück Das Schreiben ist einfacher und intuitiver und wir können eines entsprechend den spezifischen Anforderungen auswählen.

ziwenxie :: ~ » python example4.py
'http://httpbin.org' page is 12150 bytes
'http://example.com/' page is 1270 bytes
'https://api.github.com/' page is 2039 bytes

Die dritte Option ist Warten

Die Wartemethode gibt ein Tupel zurück. Das Tupel enthält zwei Sätze, einer ist abgeschlossen, der andere ist unvollständig. Ein Vorteil der Verwendung der Wartemethode besteht darin, dass sie mehr Freiheit erhält. Sie erhält drei Parameter: FIRST_COMPLETED, FIRST_EXCEPTION und ALL_COMPLETED.

Sehen wir uns den Unterschied zwischen den drei Parametern anhand des folgenden Beispiels an

from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_random_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_random_secs, x))
print(wait(futures))
# print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))

Wenn der Standardwert ALL_COMPLETED verwendet wird, blockiert das Programm, bis alle Aufgaben im Thread-Pool abgeschlossen sind .

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f0b06c9bc88 state=finished returned str>,
<Future at 0x7f0b06cbaa90 state=finished returned str>,
<Future at 0x7f0b06373898 state=finished returned str>,
<Future at 0x7f0b06352ba8 state=finished returned str>,
<Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())

Wenn der Parameter FIRST_COMPLETED verwendet wird, wartet das Programm nicht, bis alle Aufgaben im Thread-Pool abgeschlossen sind.

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f84109edb00 state=finished returned str>,
<Future at 0x7f840e2e9320 state=finished returned str>,
<Future at 0x7f840f25ccc0 state=finished returned str>},
not_done={<Future at 0x7f840e2e9ba8 state=running>,
<Future at 0x7f840e2e9940 state=running>})

Denkfragen

Schreiben Sie ein kleines Programm, um die Ausführungseffizienzlücke zwischen multiprocessing.pool (ThreadPool) und ProcessPollExecutor (ThreadPoolExecutor) zu vergleichen, und überlegen Sie, warum sie basierend auf der Zukunft auftritt oben erwähnt Ein solches Ergebnis.

Das obige ist der detaillierte Inhalt vonPython-Thread-Pool/Prozess-Pool für gleichzeitige Programmierung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn