Maison >développement back-end >Tutoriel Python >Introduction détaillée au pool de threads/pool de processus dans la programmation simultanée Python
PythonLa bibliothèque standard nous fournit des modules de threading et multitraitement pour écrire le code multi-threading/multi-processus correspondant, mais lorsque le projet atteint à une certaine échelle, La création/la destruction fréquente de processus ou de threads consomme beaucoup de ressources. À l'heure actuelle, nous devons écrire notre propre pool de threads/pool de processus pour échanger de l'espace contre du temps. Mais à partir de Python3.2, la bibliothèque standard nous fournit le module concurrent.futures, qui fournit deux classes : ThreadPoolExecutor et ProcessPoolExecutor, qui implémentent Une abstraction plus poussée du threading et du multitraitement fournit une prise en charge directe de l'écriture de pools de threads/pools de processus. Le module
concurrent.futures est basé sur Executor est une classe abstraite, qui ne peut pas être utilisée directement. Cependant, les deux sous-classes ThreadPoolExecutor et ProcessPoolExecutor qu'elle fournit sont très utiles, comme leurs noms l'indiquent, elles sont utilisées respectivement pour créer des codes de pool de threads et de pool de processus. Nous pouvons placer les tâches correspondantes directement dans le pool de threads/pool de processus, et il n'est pas nécessaire de maintenir la file d'attente pour nous soucier des blocages. Le pool de threads/pool de processus le planifiera automatiquement pour nous.
Futur Je crois que les amis qui ont de l'expérience Java et Nodejs en programmation connaissent certainement ce concept Vous pouvez. utilisez-le Compris comme une opération réalisée dans le futur , c'est la base de la programmation asynchrone, par exemple, lorsque nous opérons queue.get, un blocage se produira avant d'attendre le retour du résultat, et le CPU ne peut pas être libéré pour faire autre chose. L'introduction de Future nous aide à effectuer d'autres opérations en attendant. Concernant les IO asynchrones en Python, vous pouvez vous référer à ma coroutine de programmation simultanée Python/IO asynchrones après avoir lu cet article.
p.s : Si vous vous en tenez toujours à Python2.x, veuillez installer d'abord le module futures.
pip install futures
Comprenons d'abord le concept de pool de threads à travers le code suivant
# example1.py from concurrent.futures import ThreadPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ThreadPoolExecutor(max_workers=2) # 创建一个最大可容纳2个task的线程池 future1 = pool.submit(return_future_result, ("hello")) # 往线程池里面加入一个task future2 = pool.submit(return_future_result, ("world")) # 往线程池里面加入一个task print(future1.done()) # 判断task1是否结束 time.sleep(3) print(future2.done()) # 判断task2是否结束 print(future1.result()) # 查看task1返回的结果 print(future2.result()) # 查看task2返回的结果
Analysons il est basé sur les résultats d'exécution. Nous utilisons la méthode submit pour ajouter une tâche au pool de threads. submit renvoie un objet Future . le futur. Dans la première instruction print, il est évident que notre future1 n'est pas terminé à cause de time.sleep(2), car nous utilisons time.sleep(3) pour mettre le thread principal en pause, donc lorsque nous atteignons la deuxième instruction print, notre thread pool Toutes les tâches ici ont été terminées.
ziwenxie :: ~ » python example1.py False True hello world # 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行 ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8361 7557 8361 3 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8362 0 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8363 0 3 19:45 pts/0 00:00:00 python example1.py
Nous pouvons également réécrire le code ci-dessus sous la forme d'un pool de processus. L'api est exactement la même que le pool de threads, donc je ne serai pas verbeux.
# example2.py from concurrent.futures import ProcessPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ProcessPoolExecutor(max_workers=2) future1 = pool.submit(return_future_result, ("hello")) future2 = pool.submit(return_future_result, ("world")) print(future1.done()) time.sleep(3) print(future2.done()) print(future1.result()) print(future2.result())
Voici les résultats en cours d'exécution
ziwenxie :: ~ » python example2.py False True hello world ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8560 7557 8560 3 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8563 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8564 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8561 8560 8561 0 1 19:53 pts/0 00:00:00 python example2.py ziwenxie 8562 8560 8562 0 1 19:53 pts/0 00:00:00 python example2.py
En plus Pour soumettre, Exectoruor nous fournit également la méthode map, qui est similaire à l'utilisation de la carte intégrée. Comparons la différence entre les deux à travers deux exemples.
# example3.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
Comme le montrent les résultats en cours d'exécution, as_completed n'est pas renvoyé dans l'ordre des éléments de la liste d'URL.
ziwenxie :: ~ » python example3.py 'http://example.com/' page is 1270 byte 'https://api.github.com/' page is 2039 bytes 'http://httpbin.org' page is 12150 bytes
# example4.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: for url, data in zip(URLS, executor.map(load_url, URLS)): print('%r page is %d bytes' % (url, len(data)))
Comme le montrent les résultats en cours d'exécution, map renvoie les éléments dans l'ordre de la liste des URL, et le code l'écrit est plus simple et intuitif, on peut en choisir un en fonction de besoins spécifiques.
ziwenxie :: ~ » python example4.py 'http://httpbin.org' page is 12150 bytes 'http://example.com/' page is 1270 bytes 'https://api.github.com/' page is 2039 bytes
la méthode wait renverra un tuple (tuple), qui contient deux set (set), un est terminé (terminé) et l'autre est inachevé (inachevé). L'un des avantages de l'utilisation de la méthode wait est de gagner une plus grande liberté. Elle reçoit trois paramètres : FIRST_COMPLETED, FIRST_EXCEPTION et ALL_COMPLETE. Le paramètre par défaut est ALL_COMPLETED.
Jetons un coup d'œil à la différence entre les trois paramètres à travers l'exemple suivant
from concurrent.futures import ThreadPoolExecutor, wait, as_completed from time import sleep from random import randint def return_after_random_secs(num): sleep(randint(1, 5)) return "Return of {}".format(num) pool = ThreadPoolExecutor(5) futures = [] for x in range(5): futures.append(pool.submit(return_after_random_secs, x)) print(wait(futures)) # print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
Si la valeur par défaut ALL_COMPLETED est utilisée, le programme se bloquera jusqu'à ce que toutes les tâches du pool de threads soient terminées .
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f0b06c9bc88 state=finished returned str>, <Future at 0x7f0b06cbaa90 state=finished returned str>, <Future at 0x7f0b06373898 state=finished returned str>, <Future at 0x7f0b06352ba8 state=finished returned str>, <Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())
Si le paramètre FIRST_COMPLETED est utilisé, le programme n'attendra pas que toutes les tâches du pool de threads soient terminées.
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f84109edb00 state=finished returned str>, <Future at 0x7f840e2e9320 state=finished returned str>, <Future at 0x7f840f25ccc0 state=finished returned str>}, not_done={<Future at 0x7f840e2e9ba8 state=running>, <Future at 0x7f840e2e9940 state=running>})
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!