Heim >Backend-Entwicklung >Python-Tutorial >Detaillierte Einführung in den Thread-Pool/Prozess-Pool in der gleichzeitigen Python-Programmierung

Detaillierte Einführung in den Thread-Pool/Prozess-Pool in der gleichzeitigen Python-Programmierung

高洛峰
高洛峰Original
2017-03-17 17:38:522162Durchsuche

Einführung

PythonDie Standardbibliothek stellt uns Threading- und Multiprocessing-Module zur Verfügung, um entsprechenden Multithreading-/Multiprozesscode zu schreiben, aber wenn das Projekt erreicht ist In einem bestimmten Umfang verbraucht das häufige Erstellen/Zerstören von Prozessen oder Threads viele Ressourcen. Zu diesem Zeitpunkt müssen wir unseren eigenen Thread-Pool/Prozesspool schreiben, um Platz gegen Zeit zu tauschen. Aber ab Python3.2 stellt uns die Standardbibliothek das Modul concurrent.futures zur Verfügung, das zwei Klassen bereitstellt: ThreadPoolExecutor und ProcessPoolExecutor, die implementieren Eine weitere Abstraktion von Threading und Multiprocessing bietet direkte Unterstützung für das Schreiben von Thread-Pools/Prozess-Pools.

Executor und Future

concurrent.futures-Modul basiert auf Executor ist eine abstrakte Klasse, die nicht direkt verwendet werden kann. Allerdings sind die beiden bereitgestellten Unterklassen ThreadPoolExecutor und ProcessPoolExecutor sehr nützlich. Wie der Name schon sagt, werden sie zum Erstellen von Thread-Pool- bzw. Prozess-Pool-Codes verwendet. Wir können die entsprechenden Aufgaben direkt in den Thread-Pool/Prozesspool stellen und müssen uns nicht um Deadlocks kümmern. Der Thread-Pool/Prozesspool plant dies automatisch für uns.

Zukunft Ich glaube, dass Freunde, die Java- und Nodejs Erfahrung in der Programmierung haben, mit diesem Konzept auf jeden Fall vertraut sind Das können Sie Verwenden Sie es als eine in der Zukunft abgeschlossene Operation , dies ist die Grundlage der asynchronen Programmierung. Wenn wir beispielsweise queue.get ausführen, erfolgt eine Blockierung, bevor auf die Rückgabe des Ergebnisses gewartet wird Die CPU kann nicht für andere Aufgaben freigegeben werden. Die Einführung von Future hilft uns, während des Wartens andere Vorgänge abzuschließen. Bezüglich asynchroner E/A in Python können Sie nach dem Lesen dieses Artikels auf meine Coroutine/asynchrone E/A für die gleichzeitige Programmierung in Python verweisen.

ps.: Wenn Sie immer noch bei Python2.x bleiben, installieren Sie bitte zuerst das Futures-Modul.

pip install futures

Verwenden Sie „Submit“, um den Thread-Pool/Prozess-Pool zu betreiben.

Lassen Sie uns zunächst das Konzept des Thread-Pools anhand des folgenden Codes verstehen.

# example1.py
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ThreadPoolExecutor(max_workers=2)  # 创建一个最大可容纳2个task的线程池
future1 = pool.submit(return_future_result, ("hello"))  # 往线程池里面加入一个task
future2 = pool.submit(return_future_result, ("world"))  # 往线程池里面加入一个task
print(future1.done())  # 判断task1是否结束
time.sleep(3)
print(future2.done())  # 判断task2是否结束
print(future1.result())  # 查看task1返回的结果
print(future2.result())  # 查看task2返回的结果

Lassen Sie uns analysieren es basiert auf den Laufergebnissen. Wir verwenden die Methode submit, um eine Aufgabe zum Thread-Pool hinzuzufügen. Submit gibt ein Future-Objekt zurück. Das Future-Objekt kann einfach als abgeschlossene Operation verstanden werden die Zukunft. In der ersten Druckanweisung ist es offensichtlich, dass unsere Zukunft1 aufgrund von time.sleep(2) nicht abgeschlossen wurde, da wir time.sleep(3) verwendet haben, um den Hauptthread anzuhalten. Wenn wir also zur zweiten Druckanweisung kommen, Unser Thread-Pool Alle Aufgaben hier wurden abgeschlossen.

ziwenxie :: ~ » python example1.py
False
True
hello
world
# 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8361  7557  8361  3    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8362  0    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8363  0    3 19:45 pts/0    00:00:00 python example1.py

Wir können den obigen Code auch in Form eines Prozesspools umschreiben. Die API ist genau die gleiche wie der Thread-Pool, daher werde ich nicht ausführlich sein.

# example2.py
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(return_future_result, ("hello"))
future2 = pool.submit(return_future_result, ("world"))
print(future1.done())
time.sleep(3)
print(future2.done())
print(future1.result())
print(future2.result())

Das Folgende sind die laufenden Ergebnisse

ziwenxie :: ~ » python example2.py
False
True
hello
world
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8560  7557  8560  3    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8563  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8564  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8561  8560  8561  0    1 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8562  8560  8562  0    1 19:53 pts/0    00:00:00 python example2.py

Verwenden Sie map/wait, um den Thread-Pool/Prozesspool zu betreiben

Zusätzlich Einreichen, Execuor auch Es stellt uns die Kartenmethode zur Verfügung, die der integrierten Kartenverwendung ähnelt. Vergleichen wir den Unterschied zwischen den beiden anhand von zwei Beispielen.

Überprüfung der Verwendung des Absendevorgangs

# example3.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

Wie aus den laufenden Ergebnissen ersichtlich ist, wird as_completed nicht in der Reihenfolge der URLS-Listenelemente zurückgegeben.

ziwenxie :: ~ » python example3.py
'http://example.com/' page is 1270 byte
'https://api.github.com/' page is 2039 bytes
'http://httpbin.org' page is 12150 bytes

Karte verwenden

# example4.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for url, data in zip(URLS, executor.map(load_url, URLS)):
        print('%r page is %d bytes' % (url, len(data)))

Wie aus den laufenden Ergebnissen ersichtlich ist, gibt Karte die Elemente in der Reihenfolge der URL-Liste und den Code zurück Das Schreiben ist einfacher und intuitiver und wir können eines entsprechend den spezifischen Anforderungen auswählen.

ziwenxie :: ~ » python example4.py
'http://httpbin.org' page is 12150 bytes
'http://example.com/' page is 1270 bytes
'https://api.github.com/' page is 2039 bytes

Die dritte Option warte

Wartemethode gibt ein Tupel (Tupel) zurück, das zwei Set (Set) enthält, eines ist abgeschlossen (abgeschlossen) und der andere ist unvollendet (unvollendet). Ein Vorteil der Verwendung der Wartemethode besteht darin, dass sie mehr Freiheit erhält. Sie erhält drei Parameter: FIRST_COMPLETED, FIRST_EXCEPTION und ALL_COMPLETED.

Sehen wir uns den Unterschied zwischen den drei Parametern anhand des folgenden Beispiels an

from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_random_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_random_secs, x))
print(wait(futures))
# print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))

Wenn der Standardwert ALL_COMPLETED verwendet wird, blockiert das Programm, bis alle Aufgaben im Thread-Pool abgeschlossen sind .

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f0b06c9bc88 state=finished returned str>,
<Future at 0x7f0b06cbaa90 state=finished returned str>,
<Future at 0x7f0b06373898 state=finished returned str>,
<Future at 0x7f0b06352ba8 state=finished returned str>,
<Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())

Wenn der Parameter FIRST_COMPLETED verwendet wird, wartet das Programm nicht, bis alle Aufgaben im Thread-Pool abgeschlossen sind.

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f84109edb00 state=finished returned str>,
<Future at 0x7f840e2e9320 state=finished returned str>,
<Future at 0x7f840f25ccc0 state=finished returned str>},
not_done={<Future at 0x7f840e2e9ba8 state=running>,
<Future at 0x7f840e2e9940 state=running>})

Das obige ist der detaillierte Inhalt vonDetaillierte Einführung in den Thread-Pool/Prozess-Pool in der gleichzeitigen Python-Programmierung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn