Maison >développement back-end >Tutoriel Python >Introduction à PoolExecutor dans la concurrence Python (avec exemples)
Le contenu de cet article est une introduction à PoolExecutor dans la concurrence Python (avec des exemples). Il a une certaine valeur de référence. Les amis dans le besoin peuvent s'y référer.
Utilisez le multithreading et le multi-traitement pour répondre aux exigences de concurrence habituelles. Les étapes telles que le démarrage et la jointure ne peuvent pas être omises au démarrage. Pour les exigences complexes, 1 à 2 files d'attente sont nécessaires.
À mesure que les exigences deviennent de plus en plus complexes, sans une bonne conception et une abstraction du niveau fonctionnel, plus il y a de code, plus il sera difficile à déboguer.
Pour les tâches qui nécessitent une exécution simultanée mais qui n'ont pas d'exigences élevées en temps réel, nous pouvons utiliser la classe PoolExecutor dans le package concurrent.futures pour l'implémenter.
Ce package fournit deux exécuteurs : l'exécuteur de pool de threads ThreadPoolExecutor et l'exécuteur de pool de processus ProcessPoolExecutor. Les deux exécuteurs fournissent la même API.
L'objectif principal du concept de pool est la réutilisation : permettre aux threads ou aux processus d'être utilisés plusieurs fois au cours de leur cycle de vie. Il réduit les frais généraux liés à la création de threads et de processus et améliore les performances du programme. La réutilisation n'est pas une règle obligatoire, mais c'est la principale raison pour laquelle les programmeurs utilisent des pools dans leurs applications.
Le pool n'a qu'un nombre fixe de threads/processus, spécifié par max_workers.
La tâche est soumise à la file d'attente des tâches de l'exécuteur via executor.submit, et un futur objet est renvoyé.
Future est un modèle de conception de concurrence courant.
Un objet Future représente des résultats qui ne sont pas encore prêts (complétés). Ce résultat peut être obtenu une fois qu'il est prêt à un certain moment dans le "futur".
Les tâches doivent être exécutées entre différents travailleurs.
Mais veuillez noter qu'une fois qu'une tâche est exécutée, le travailleur sera occupé jusqu'à ce que l'exécution soit terminée ! S’il n’y a pas assez de travailleurs, d’autres tâches continueront d’attendre ! PoolExecutor n’est donc pas adapté aux tâches en temps réel.
import concurrent.futures import time from itertools import count number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] def evaluate_item(x): for i in count(x): # count 是无限迭代器,会一直递增。 print(f"{x} - {i}") time.sleep(0.01) if __name__ == "__main__": # 进程池 start_time_2 = time.time() # 使用 with 在离开此代码块时,自动调用 executor.shutdown(wait=true) 释放 executor 资源 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # 将 10 个任务提交给 executor,并收集 futures futures = [executor.submit(evaluate_item, item) for item in number_list] # as_completed 方法等待 futures 中的 future 完成 # 一旦某个 future 完成,as_completed 就立即返回该 future # 这个方法,使每次返回的 future,总是最先完成的 future # 而不是先等待任务 1,再等待任务 2... for future in concurrent.futures.as_completed(futures): print(future.result()) print ("Thread pool execution in " + str(time.time() - start_time_2), "seconds")
Dans le code ci-dessus, les cinq tâches avec les éléments 1 2 3 4 5 occuperont toujours tous les travailleurs, tandis que les cinq tâches 6 7 8 9 10 attendront éternellement ! ! !
Description détaillée de l'API
concurrent.futures contient trois parties de l'API :
PoolExecutor : c'est-à-dire l'API des deux exécuteurs
constructeur : Le paramètre principal est max_workers, qui est utilisé pour spécifier la taille du pool de threads (ou le nombre de travailleurs)
submit(fn, *args, **kwargs) : Soumettez la fonction de tâche fn à l'exécuteur, args et les kwargs sont des paramètres fn requis.
Renvoie un futur pour obtenir des résultats
map(func, *iterables, timeout=None, chunksize=1) : Lorsque la tâche est la même et que seuls les paramètres sont différents, vous pouvez utiliser cette méthode au lieu de soumettre. Chaque élément des itérables correspond à un ensemble de paramètres de func.
Renvoie un itérateur de futures
shutdown(wait=True) : Arrêtez l'exécuteur, en utilisant généralement le gestionnaire with pour l'arrêter automatiquement.
Future : Une fois la tâche soumise à l'exécuteur, un futur sera renvoyé
future.result(timout=None) : La méthode la plus couramment utilisée, renvoie le résultat de la tâche. Si la tâche n’est pas encore terminée, cette méthode attendra une éternité !
timeout spécifie le délai d'attente. S'il vaut Aucun, il n'y a pas de limite de délai d'attente.
exception(timeout=None) : Donne l'exception levée par la tâche. Comme result(), il attendra également la fin de la tâche.
cancel() : Annuler cette tâche
add_done_callback(fn) : Une fois le futur terminé, fn(future) sera exécuté.
running() : s'il est en cours d'exécution
done() : si le futur est terminé, booléen
...Pour plus de détails, veuillez vous référer à la documentation officielle
module Avec fonction utilitaire
concurrent.futures.as_completed(fs, timeout=None) : Attendre que le futur dans fs (futures itérable) se termine
Une fois un futur dans fs est terminé, cette La fonction renvoie le futur immédiatement.
Cette méthode fait que le futur renvoyé à chaque fois soit toujours le premier à être terminé. Au lieu d'attendre d'abord la tâche 1, puis d'attendre la tâche 2...
Utilisez généralement cette fonction pour le futur dans as_completed(fs):.
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED) : Attendez que quelque chose spécifié par return_when se produise, ou timeout
return_when a trois options : ALL_COMPLETED (fs Tous les contrats à terme dans fs sont terminés), FIRST__COMPLETED (tout futur dans fs est terminé) et FIRST_EXCEPTION (une tâche lève une exception)
Future Design Pattern
La caractéristique de PoolExecutor ici est qu'il utilise la conception Future Le modèle fait de l’exécution des tâches et de l’acquisition des résultats un processus asynchrone.
Nous mettons d'abord la tâche dans la file d'attente des tâches via submit/map, puis la tâche a commencé à être exécutée ! Ensuite, lorsque nous en avons besoin, nous obtenons le résultat via future, ou directement add_done_callback(fn).
L'exécution de cette tâche se fait dans les nouveaux Workers. Le processus/thread principal ne sera pas bloqué, donc le thread principal peut faire autre chose. Cette approche est appelée programmation asynchrone.
Offscreen
concurrent.futures est implémenté sur la base de multiprocessing.pool, il est donc en fait un peu plus lent que d'utiliser directement Thread/Process Pool. Mais il fournit une API plus pratique et plus concise.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!