Maison >développement back-end >Tutoriel Python >Introduction détaillée au pool de threads/pool de processus dans la programmation simultanée Python

Introduction détaillée au pool de threads/pool de processus dans la programmation simultanée Python

高洛峰
高洛峰original
2017-03-17 17:38:522140parcourir

Introduction

PythonLa bibliothèque standard nous fournit des modules de threading et multitraitement pour écrire le code multi-threading/multi-processus correspondant, mais lorsque le projet atteint à une certaine échelle, La création/la destruction fréquente de processus ou de threads consomme beaucoup de ressources. À l'heure actuelle, nous devons écrire notre propre pool de threads/pool de processus pour échanger de l'espace contre du temps. Mais à partir de Python3.2, la bibliothèque standard nous fournit le module concurrent.futures, qui fournit deux classes : ThreadPoolExecutor et ProcessPoolExecutor, qui implémentent Une abstraction plus poussée du threading et du multitraitement fournit une prise en charge directe de l'écriture de pools de threads/pools de processus. Le module

Executor and Future

concurrent.futures est basé sur Executor est une classe abstraite, qui ne peut pas être utilisée directement. Cependant, les deux sous-classes ThreadPoolExecutor et ProcessPoolExecutor qu'elle fournit sont très utiles, comme leurs noms l'indiquent, elles sont utilisées respectivement pour créer des codes de pool de threads et de pool de processus. Nous pouvons placer les tâches correspondantes directement dans le pool de threads/pool de processus, et il n'est pas nécessaire de maintenir la file d'attente pour nous soucier des blocages. Le pool de threads/pool de processus le planifiera automatiquement pour nous.

Futur Je crois que les amis qui ont de l'expérience Java et Nodejs en programmation connaissent certainement ce concept Vous pouvez. utilisez-le Compris comme une opération réalisée dans le futur , c'est la base de la programmation asynchrone, par exemple, lorsque nous opérons queue.get, un blocage se produira avant d'attendre le retour du résultat, et le CPU ne peut pas être libéré pour faire autre chose. L'introduction de Future nous aide à effectuer d'autres opérations en attendant. Concernant les IO asynchrones en Python, vous pouvez vous référer à ma coroutine de programmation simultanée Python/IO asynchrones après avoir lu cet article.

p.s : Si vous vous en tenez toujours à Python2.x, veuillez installer d'abord le module futures.

pip install futures

Utilisez submit pour faire fonctionner le pool de threads/pool de processus

Comprenons d'abord le concept de pool de threads à travers le code suivant

# example1.py
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ThreadPoolExecutor(max_workers=2)  # 创建一个最大可容纳2个task的线程池
future1 = pool.submit(return_future_result, ("hello"))  # 往线程池里面加入一个task
future2 = pool.submit(return_future_result, ("world"))  # 往线程池里面加入一个task
print(future1.done())  # 判断task1是否结束
time.sleep(3)
print(future2.done())  # 判断task2是否结束
print(future1.result())  # 查看task1返回的结果
print(future2.result())  # 查看task2返回的结果

Analysons il est basé sur les résultats d'exécution. Nous utilisons la méthode submit pour ajouter une tâche au pool de threads. submit renvoie un objet Future . le futur. Dans la première instruction print, il est évident que notre future1 n'est pas terminé à cause de time.sleep(2), car nous utilisons time.sleep(3) pour mettre le thread principal en pause, donc lorsque nous atteignons la deuxième instruction print, notre thread pool Toutes les tâches ici ont été terminées.

ziwenxie :: ~ » python example1.py
False
True
hello
world
# 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8361  7557  8361  3    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8362  0    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8363  0    3 19:45 pts/0    00:00:00 python example1.py

Nous pouvons également réécrire le code ci-dessus sous la forme d'un pool de processus. L'api est exactement la même que le pool de threads, donc je ne serai pas verbeux.

# example2.py
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(return_future_result, ("hello"))
future2 = pool.submit(return_future_result, ("world"))
print(future1.done())
time.sleep(3)
print(future2.done())
print(future1.result())
print(future2.result())

Voici les résultats en cours d'exécution

ziwenxie :: ~ » python example2.py
False
True
hello
world
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8560  7557  8560  3    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8563  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8564  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8561  8560  8561  0    1 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8562  8560  8562  0    1 19:53 pts/0    00:00:00 python example2.py

Utilisez map/attendez pour faire fonctionner le pool de threads/pool de processus

En plus Pour soumettre, Exectoruor nous fournit également la méthode map, qui est similaire à l'utilisation de la carte intégrée. Comparons la différence entre les deux à travers deux exemples.

Examen de l'utilisation de l'opération de soumission

# example3.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

Comme le montrent les résultats en cours d'exécution, as_completed n'est pas renvoyé dans l'ordre des éléments de la liste d'URL.

ziwenxie :: ~ » python example3.py
'http://example.com/' page is 1270 byte
'https://api.github.com/' page is 2039 bytes
'http://httpbin.org' page is 12150 bytes

Utiliser map

# example4.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for url, data in zip(URLS, executor.map(load_url, URLS)):
        print('%r page is %d bytes' % (url, len(data)))

Comme le montrent les résultats en cours d'exécution, map renvoie les éléments dans l'ordre de la liste des URL, et le code l'écrit est plus simple et intuitif, on peut en choisir un en fonction de besoins spécifiques.

ziwenxie :: ~ » python example4.py
'http://httpbin.org' page is 12150 bytes
'http://example.com/' page is 1270 bytes
'https://api.github.com/' page is 2039 bytes

La troisième option wait

la méthode wait renverra un tuple (tuple), qui contient deux set (set), un est terminé (terminé) et l'autre est inachevé (inachevé). L'un des avantages de l'utilisation de la méthode wait est de gagner une plus grande liberté. Elle reçoit trois paramètres : FIRST_COMPLETED, FIRST_EXCEPTION et ALL_COMPLETE. Le paramètre par défaut est ALL_COMPLETED.

Jetons un coup d'œil à la différence entre les trois paramètres à travers l'exemple suivant

from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_random_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_random_secs, x))
print(wait(futures))
# print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))

Si la valeur par défaut ALL_COMPLETED est utilisée, le programme se bloquera jusqu'à ce que toutes les tâches du pool de threads soient terminées .

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f0b06c9bc88 state=finished returned str>,
<Future at 0x7f0b06cbaa90 state=finished returned str>,
<Future at 0x7f0b06373898 state=finished returned str>,
<Future at 0x7f0b06352ba8 state=finished returned str>,
<Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())

Si le paramètre FIRST_COMPLETED est utilisé, le programme n'attendra pas que toutes les tâches du pool de threads soient terminées.

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f84109edb00 state=finished returned str>,
<Future at 0x7f840e2e9320 state=finished returned str>,
<Future at 0x7f840f25ccc0 state=finished returned str>},
not_done={<Future at 0x7f840e2e9ba8 state=running>,
<Future at 0x7f840e2e9940 state=running>})

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn