Maison >développement back-end >Tutoriel Python >Pool de threads/pool de processus de programmation simultanée Python
La bibliothèque standard Python nous fournit des modules de threading et multitraitement pour écrire le code multi-threading/multi-processus correspondant. Cependant, lorsque le projet atteint une certaine échelle, création/destruction fréquente de processus ou. les threads nécessitent beaucoup de ressources. Oui, à l’heure actuelle, nous devons écrire notre propre pool de threads/pool de processus pour échanger de l’espace contre du temps. Mais à partir de Python 3.2, la bibliothèque standard nous fournit le module concurrent.futures, qui fournit deux classes : ThreadPoolExecutor et ProcessPoolExecutor, réalisant une abstraction plus poussée du threading et du multitraitement, fournit. prise en charge directe de l’écriture de pools de threads/pools de processus. Le module
concurrent.futures est basé sur Executor est une classe abstraite et ne peut pas être utilisé directement. Cependant, les deux sous-classes ThreadPoolExecutor et ProcessPoolExecutor qu'elle fournit sont très utiles, comme leurs noms l'indiquent, elles sont utilisées respectivement pour créer des codes de pool de threads et de pool de processus. Nous pouvons placer les tâches correspondantes directement dans le pool de threads/pool de processus, et il n'est pas nécessaire de maintenir la file d'attente pour nous soucier des blocages. Le pool de threads/pool de processus le planifiera automatiquement pour nous.
FuturJe crois que les amis qui ont une expérience en programmation sous Java et nodejs seront familiers avec ce concept Vous pouvez le comprendre comme une opération réalisée dans le futur. C'est la base de la programmation asynchrone. Dans le mode de programmation traditionnel, par exemple, lorsque nous utilisons queue.get, un blocage se produira avant d'attendre le retour du résultat, et le CPU ne pourra pas être libéré pour faire autre chose. Future nous aide à terminer la tâche pendant la période d'attente. Autres opérations. Concernant les IO asynchrones en Python, vous pouvez vous référer à ma coroutine de programmation simultanée Python/IO asynchrones après avoir lu cet article.
p.s : Si vous êtes toujours fidèle à Python2.x, veuillez d'abord installer le module futures.
pip install futures
Comprenons d'abord le concept de pool de threads à travers le code suivant
# example1.py from concurrent.futures import ThreadPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ThreadPoolExecutor(max_workers=2) # 创建一个最大可容纳2个task的线程池 future1 = pool.submit(return_future_result, ("hello")) # 往线程池里面加入一个task future2 = pool.submit(return_future_result, ("world")) # 往线程池里面加入一个task print(future1.done()) # 判断task1是否结束 time.sleep(3) print(future2.done()) # 判断task2是否结束 print(future1.result()) # 查看task1返回的结果 print(future2.result()) # 查看task2返回的结果
Analysons il est basé sur les résultats d'exécution. Nous utilisons la méthode submit pour ajouter une tâche au pool de threads, et submit renvoie un Objet Future L'objet Future peut être simplement compris comme une opération terminée dans le futur. Dans la première instruction print, il est évident que notre future1 n'est pas terminé à cause de time.sleep(2), car nous utilisons time.sleep(3) pour mettre le thread principal en pause, donc lorsque nous atteignons la deuxième instruction print, notre thread pool Toutes les tâches ici ont été terminées.
ziwenxie :: ~ » python example1.py False True hello world # 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行 ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8361 7557 8361 3 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8362 0 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8363 0 3 19:45 pts/0 00:00:00 python example1.py
Nous pouvons également réécrire le code ci-dessus dans un formulaire de pool de processus. L'API et le pool de threads sont exactement les mêmes, donc je ne serai pas verbeux.
# example2.py from concurrent.futures import ProcessPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ProcessPoolExecutor(max_workers=2) future1 = pool.submit(return_future_result, ("hello")) future2 = pool.submit(return_future_result, ("world")) print(future1.done()) time.sleep(3) print(future2.done()) print(future1.result()) print(future2.result())
Voici les résultats en cours d'exécution
ziwenxie :: ~ » python example2.py False True hello world ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8560 7557 8560 3 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8563 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8564 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8561 8560 8561 0 1 19:53 pts/0 00:00:00 python example2.py ziwenxie 8562 8560 8562 0 1 19:53 pts/0 00:00:00 python example2.py
En plus de soumettre, Executor fournit également Nous utilisons la méthode map , similaire à l'utilisation de la carte intégrée. Comparons la différence entre les deux à travers deux exemples.
# example3.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
Comme le montrent les résultats en cours d'exécution, as_completed n'est pas renvoyé dans l'ordre des éléments de la liste d'URL.
ziwenxie :: ~ » python example3.py 'http://example.com/' page is 1270 byte 'https://api.github.com/' page is 2039 bytes 'http://httpbin.org' page is 12150 bytes
# example4.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: for url, data in zip(URLS, executor.map(load_url, URLS)): print('%r page is %d bytes' % (url, len(data)))
Comme le montrent les résultats en cours d'exécution, map renvoie les éléments dans l'ordre de la liste des URL, et le code l'écrit est plus simple et intuitif, on peut en choisir un en fonction de besoins spécifiques.
ziwenxie :: ~ » python example4.py 'http://httpbin.org' page is 12150 bytes 'http://example.com/' page is 1270 bytes 'https://api.github.com/' page is 2039 bytes
La méthode wait renverra un tuple. Le tuple contient deux ensembles, l'un est terminé. L'un des avantages de l'utilisation de la méthode wait est de gagner une plus grande liberté. Elle reçoit trois paramètres : FIRST_COMPLETED, FIRST_EXCEPTION et ALL_COMPLETE. Le paramètre par défaut est ALL_COMPLETED.
Jetons un coup d'œil à la différence entre les trois paramètres à travers l'exemple suivant
from concurrent.futures import ThreadPoolExecutor, wait, as_completed from time import sleep from random import randint def return_after_random_secs(num): sleep(randint(1, 5)) return "Return of {}".format(num) pool = ThreadPoolExecutor(5) futures = [] for x in range(5): futures.append(pool.submit(return_after_random_secs, x)) print(wait(futures)) # print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
Si la valeur par défaut ALL_COMPLETED est utilisée, le programme se bloquera jusqu'à ce que toutes les tâches du pool de threads soient terminées .
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f0b06c9bc88 state=finished returned str>, <Future at 0x7f0b06cbaa90 state=finished returned str>, <Future at 0x7f0b06373898 state=finished returned str>, <Future at 0x7f0b06352ba8 state=finished returned str>, <Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())
Si le paramètre FIRST_COMPLETED est utilisé, le programme n'attendra pas que toutes les tâches du pool de threads soient terminées.
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f84109edb00 state=finished returned str>, <Future at 0x7f840e2e9320 state=finished returned str>, <Future at 0x7f840f25ccc0 state=finished returned str>}, not_done={<Future at 0x7f840e2e9ba8 state=running>, <Future at 0x7f840e2e9940 state=running>})
Écrivez un petit programme pour comparer l'écart d'efficacité d'exécution entre multiprocessing.pool (ThreadPool) et ProcessPollExecutor (ThreadPoolExecutor), et réfléchissez à la raison pour laquelle cela se produit en fonction de l'avenir mentionné ci-dessus Un tel résultat.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!