Maison >développement back-end >Tutoriel Python >Pool de threads/pool de processus de programmation simultanée Python

Pool de threads/pool de processus de programmation simultanée Python

巴扎黑
巴扎黑original
2017-03-18 11:37:101899parcourir

Introduction

La bibliothèque standard Python nous fournit des modules de threading et multitraitement pour écrire le code multi-threading/multi-processus correspondant. Cependant, lorsque le projet atteint une certaine échelle, création/destruction fréquente de processus ou. les threads nécessitent beaucoup de ressources. Oui, à l’heure actuelle, nous devons écrire notre propre pool de threads/pool de processus pour échanger de l’espace contre du temps. Mais à partir de Python 3.2, la bibliothèque standard nous fournit le module concurrent.futures, qui fournit deux classes : ThreadPoolExecutor et ProcessPoolExecutor, réalisant une abstraction plus poussée du threading et du multitraitement, fournit. prise en charge directe de l’écriture de pools de threads/pools de processus. Le module

Executor and Future

concurrent.futures est basé sur Executor est une classe abstraite et ne peut pas être utilisé directement. Cependant, les deux sous-classes ThreadPoolExecutor et ProcessPoolExecutor qu'elle fournit sont très utiles, comme leurs noms l'indiquent, elles sont utilisées respectivement pour créer des codes de pool de threads et de pool de processus. Nous pouvons placer les tâches correspondantes directement dans le pool de threads/pool de processus, et il n'est pas nécessaire de maintenir la file d'attente pour nous soucier des blocages. Le pool de threads/pool de processus le planifiera automatiquement pour nous.

FuturJe crois que les amis qui ont une expérience en programmation sous Java et nodejs seront familiers avec ce concept Vous pouvez le comprendre comme une opération réalisée dans le futur. C'est la base de la programmation asynchrone. Dans le mode de programmation traditionnel, par exemple, lorsque nous utilisons queue.get, un blocage se produira avant d'attendre le retour du résultat, et le CPU ne pourra pas être libéré pour faire autre chose. Future nous aide à terminer la tâche pendant la période d'attente. Autres opérations. Concernant les IO asynchrones en Python, vous pouvez vous référer à ma coroutine de programmation simultanée Python/IO asynchrones après avoir lu cet article.

p.s : Si vous êtes toujours fidèle à Python2.x, veuillez d'abord installer le module futures.

pip install futures

Utilisez submit pour faire fonctionner le pool de threads/pool de processus

Comprenons d'abord le concept de pool de threads à travers le code suivant

# example1.py
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ThreadPoolExecutor(max_workers=2)  # 创建一个最大可容纳2个task的线程池
future1 = pool.submit(return_future_result, ("hello"))  # 往线程池里面加入一个task
future2 = pool.submit(return_future_result, ("world"))  # 往线程池里面加入一个task
print(future1.done())  # 判断task1是否结束
time.sleep(3)
print(future2.done())  # 判断task2是否结束
print(future1.result())  # 查看task1返回的结果
print(future2.result())  # 查看task2返回的结果

Analysons il est basé sur les résultats d'exécution. Nous utilisons la méthode submit pour ajouter une tâche au pool de threads, et submit renvoie un Objet Future L'objet Future peut être simplement compris comme une opération terminée dans le futur. Dans la première instruction print, il est évident que notre future1 n'est pas terminé à cause de time.sleep(2), car nous utilisons time.sleep(3) pour mettre le thread principal en pause, donc lorsque nous atteignons la deuxième instruction print, notre thread pool Toutes les tâches ici ont été terminées.

ziwenxie :: ~ » python example1.py
False
True
hello
world
# 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8361  7557  8361  3    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8362  0    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8363  0    3 19:45 pts/0    00:00:00 python example1.py

Nous pouvons également réécrire le code ci-dessus dans un formulaire de pool de processus. L'API et le pool de threads sont exactement les mêmes, donc je ne serai pas verbeux.

# example2.py
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(return_future_result, ("hello"))
future2 = pool.submit(return_future_result, ("world"))
print(future1.done())
time.sleep(3)
print(future2.done())
print(future1.result())
print(future2.result())

Voici les résultats en cours d'exécution

ziwenxie :: ~ » python example2.py
False
True
hello
world
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8560  7557  8560  3    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8563  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8564  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8561  8560  8561  0    1 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8562  8560  8562  0    1 19:53 pts/0    00:00:00 python example2.py

Utilisez map/wait pour faire fonctionner le pool de threads/pool de processus

En plus de soumettre, Executor fournit également Nous utilisons la méthode map , similaire à l'utilisation de la carte intégrée. Comparons la différence entre les deux à travers deux exemples.

Examen de l'utilisation de l'opération de soumission

# example3.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

Comme le montrent les résultats en cours d'exécution, as_completed n'est pas renvoyé dans l'ordre des éléments de la liste d'URL.

ziwenxie :: ~ » python example3.py
'http://example.com/' page is 1270 byte
'https://api.github.com/' page is 2039 bytes
'http://httpbin.org' page is 12150 bytes

Utiliser map

# example4.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for url, data in zip(URLS, executor.map(load_url, URLS)):
        print('%r page is %d bytes' % (url, len(data)))

Comme le montrent les résultats en cours d'exécution, map renvoie les éléments dans l'ordre de la liste des URL, et le code l'écrit est plus simple et intuitif, on peut en choisir un en fonction de besoins spécifiques.

ziwenxie :: ~ » python example4.py
'http://httpbin.org' page is 12150 bytes
'http://example.com/' page is 1270 bytes
'https://api.github.com/' page is 2039 bytes

La troisième option est wait

La méthode wait renverra un tuple. Le tuple contient deux ensembles, l'un est terminé. L'un des avantages de l'utilisation de la méthode wait est de gagner une plus grande liberté. Elle reçoit trois paramètres : FIRST_COMPLETED, FIRST_EXCEPTION et ALL_COMPLETE. Le paramètre par défaut est ALL_COMPLETED.

Jetons un coup d'œil à la différence entre les trois paramètres à travers l'exemple suivant

from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_random_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_random_secs, x))
print(wait(futures))
# print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))

Si la valeur par défaut ALL_COMPLETED est utilisée, le programme se bloquera jusqu'à ce que toutes les tâches du pool de threads soient terminées .

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f0b06c9bc88 state=finished returned str>,
<Future at 0x7f0b06cbaa90 state=finished returned str>,
<Future at 0x7f0b06373898 state=finished returned str>,
<Future at 0x7f0b06352ba8 state=finished returned str>,
<Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())

Si le paramètre FIRST_COMPLETED est utilisé, le programme n'attendra pas que toutes les tâches du pool de threads soient terminées.

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f84109edb00 state=finished returned str>,
<Future at 0x7f840e2e9320 state=finished returned str>,
<Future at 0x7f840f25ccc0 state=finished returned str>},
not_done={<Future at 0x7f840e2e9ba8 state=running>,
<Future at 0x7f840e2e9940 state=running>})

Questions de réflexion

Écrivez un petit programme pour comparer l'écart d'efficacité d'exécution entre multiprocessing.pool (ThreadPool) et ProcessPollExecutor (ThreadPoolExecutor), et réfléchissez à la raison pour laquelle cela se produit en fonction de l'avenir mentionné ci-dessus Un tel résultat.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn