Maison  >  Article  >  développement back-end  >  Pool de threads/pool de processus de programmation simultanée Python

Pool de threads/pool de processus de programmation simultanée Python

巴扎黑
巴扎黑original
2017-03-30 14:11:301690parcourir

Introduction

La bibliothèque standard Python nous fournit des modules de threading et multitraitement pour écrire le code multi-threading/multi-processus correspondant. Cependant, lorsque le projet atteint une certaine échelle, création/destruction fréquente de processus ou. les threads nécessitent beaucoup de ressources. Oui, à l’heure actuelle, nous devons écrire notre propre pool de threads/pool de processus pour échanger de l’espace contre du temps. Mais à partir de Python 3.2, la bibliothèque standard nous fournit le module concurrent.futures, qui fournit deux classes : ThreadPoolExecutor et ProcessPoolExecutor, qui réalise une abstraction plus poussée du threading et du multitraitement, et facilite l'écriture de pools/processus de threads. Pool fournit un support direct. Le module

Executor and Future

concurrent.futures est basé sur Executor est une classe abstraite et ne peut pas être utilisé directement. Cependant, les deux sous-classes ThreadPoolExecutor et ProcessPoolExecutor qu'elle fournit sont très utiles, comme leurs noms l'indiquent, elles sont utilisées respectivement pour créer des codes de pool de threads et de pool de processus. Nous pouvons placer les tâches correspondantes directement dans le pool de threads/pool de processus, et il n'est pas nécessaire de maintenir la file d'attente pour nous soucier des blocages. Le pool de threads/pool de processus le planifiera automatiquement pour nous.

FuturJe crois que les amis qui ont une expérience en programmation sous Java et nodejs seront familiers avec ce concept Vous pouvez le comprendre comme une opération réalisée dans le futur. C'est la base de la programmation asynchrone. Dans le mode de programmation traditionnel, par exemple, lorsque nous utilisons queue.get, un blocage se produira avant d'attendre le retour du résultat, et le CPU ne pourra pas être libéré pour faire autre chose. Future nous aide à terminer la tâche pendant la période d'attente. Autres opérations. Concernant les IO asynchrones en Python, vous pouvez vous référer à ma coroutine de programmation simultanée Python/IO asynchrones après avoir lu cet article.

p.s : Si vous êtes toujours fidèle à Python2.x, veuillez d'abord installer le module futures.

pip installer les futurs

Utilisez submit pour faire fonctionner le pool de threads/pool de processus

Commençons par comprendre le concept de pool de threads à travers le code suivant

# example1.py
de concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
pool = ThreadPoolExecutor(max_workers=2) # Créer un pool pouvant accueillir jusqu'à 2 tâches Thread pool
future1 = pool.submit(return_future_result, ("hello")) # Ajouter une tâche au pool de threads
future2 = pool.submit(return_future_result, ("world")) # Ajouter au pool de threads A task
print(future1.done()) # Déterminer si la tâche 1 se termine
time.sleep(3)
print(future2.done()) # Déterminer si la tâche 2 se termine
print(future1. result ()) # Vérifiez les résultats renvoyés par la tâche 1
print(future2.result()) # Vérifiez les résultats renvoyés par la tâche 2
Analysons-le en fonction des résultats en cours d'exécution. Nous utilisons la méthode submit pour ajouter une tâche au pool de threads, et submit renvoie un objet Future. L'objet Future peut être simplement compris comme une opération effectuée dans le futur. Dans la première instruction print, il est évident que notre future1 n'a pas été terminé à cause de time.sleep(2), car nous avons utilisé time.sleep(3) pour mettre le thread principal en pause, donc quand il s'agit de la deuxième instruction print, notre pool de threads Toutes les tâches ici ont été terminées.

ziwenxie :: ~ » python example1.py
False
True
hello
world
# Lors de l'exécution du programme ci-dessus, on peut voir à travers la commande ps Trois threads s'exécutant en arrière-plan
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie 8361 7557 8361 3 3 19:45 pts/0 00:00:00 python example1.py
ziwenxie 8361 7557 8362 0 3 19:45 pts/0 00:00:00 exemple python1.py
ziwenxie 8361 7557 8363 0 3 19:45 pts/0 00:00:00 exemple python1.py
ci-dessus Le code de notre Il peut également être réécrit sous la forme d'un pool de processus. L'API et le pool de threads sont exactement les mêmes, donc je ne serai pas verbeux.

# example2.py
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(return_future_result, ("bonjour"))
future2 = pool.submit(return_future_result, ("world"))
print(future1.done ())
time.sleep(3)
print(future2.done())
print(future1.result())
print(future2.result())
Ce qui suit est en cours d'exécution

ziwenxie :: ~ » python exemple2.py
False
Vrai
bonjour
monde
ziwenxie :: ~ » ps -eLf | > ziwenxie 8560 7557 8560 3 3 19:53 pts/0 00:00:00 python exemple2.py
ziwenxie 8560 7557 8563 0 3 19:53 pts/0 00:00:00 python exemple2.py
ziwenxie 8560 7557 8564 0 3 19:53 pts/0 00:00:00 exemple python2.py
ziwenxie 8561 8560 8561 0 1 19:53 pts/0 00:00:00 exemple python2.py
zi wenxie 8562 8560 8562 0 1 19:53 pts/0 00:00:00 python example2.py

Utiliser map/wait pour faire fonctionner le pool de threads/pool de processus

En plus de soumettre, Exector nous fournit également la méthode map, qui est similaire à l'utilisation de la carte intégrée. Comparons la différence entre les deux à travers deux exemples.



使用submit操作回顾

# example3.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http: //example.com/', 'https://api.github.com/']
def load_url(url, timeout):
   avec urllib.request.urlopen(url, timeout=timeout) comme connexion :
       return conn.read()
# Nous pouvons utiliser une instruction with pour garantir que les threads sont nettoyés rapidement
avec concurrent.futures.ThreadPoolExecutor(max_workers=3) comme exécuteur :
   # Démarrez le charger les opérations et marquer chaque futur avec son URL
   future_to_url = {executor.submit(load_url, url, 60): url pour url dans URLS}
   pour futur dans concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
       try:
           data = future.result()
       sauf exception comme exc:
           print('%r a généré une exception : %s' % (url, exc) )
       else:
           print('%r page fait %d octets' % (url, len(data)))
从运行结果可以看出,as_completed不是按照URLS列表元素的顺序返回的

ziwenxie :: ~ » python example3.py
'http://example.com/' la page fait 1270 octets
'https://api.github La page .com/' fait 2039 octets
La page 'http://httpbin.org' fait 12150 octets

使用map

# example4.py
importation simultanée .futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url):
   with urllib.request.urlopen(url, timeout=60) as conn:
       return conn.read()
# Nous pouvons utiliser une instruction with pour nous assurer que les fils de discussion sont nettoyé rapidement
avec concurrent.futures.ThreadPoolExecutor(max_workers=3) comme exécuteur :
   pour l'url, les données dans zip(URLS, executor.map(load_url, URLS)):
       print('%r la page est %d octets' % (url, len(data)))的代码更加简洁直观,我们可以根据具体的需求任选一种。
ziwenxie ::: ~ » python example4.py'http://httpbin.org' la page fait 12150 octets
'http: //example.com/' la page fait 1270 octets
'https://api.github.com/' la page fait 2039 octets


第三种选择attendre

wait方法接会返回一个tuple(元组), tuple中包含两个set(集合),一个是completed(已完成的) et uncompleted(未完成的)。 Attendez.的自由度,它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默认设置为ALL_COMPLETED。

我们通过下面这个例子来看一下三个参数的区别


à partir de l'importation concurrent.futures ThreadPoolExecutor, attendez, as_completed
from time import sleep
from random import randint
def return_after_random_secs(num):
   sleep(randint(1, 5))
   return "Retour de {}" .format(num)
pool = ThreadPoolExecutor(5)
futures = []
pour x dans la plage(5):
   futures.append(pool.submit(return_after_random_secs, x))
print(wait(futures))
# print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
如果采用默认的ALL_COMPLETED,程序会阻塞直到线程池里面的所有任务都完成。

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
,
,
,
,
}, not_done=set())
如果采用FIRST_COMPLETED参数,程序并不会等到线程池里面所有的任务都完成。

ziwenxie :: ~ » python example5. py
DoneAndNotDoneFutures(done= {
,
,
},
not_done={,
})


思考题

写一个小程序对比multitraitement .pool(ThreadPool) et ProcessPollExecutor(ThreadPoolExecutor) sont également compatibles avec Future.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn