Maison  >  Article  >  développement back-end  >  Une ligne de code Python pour réaliser le parallélisme

Une ligne de code Python pour réaliser le parallélisme

WBOY
WBOYavant
2023-04-12 19:04:29859parcourir

Une ligne de code Python pour réaliser le parallélisme

Python est quelque peu connu pour la parallélisation de programmes. Laissant de côté les problèmes techniques, tels que la mise en œuvre des threads et le GIL, je pense que des orientations pédagogiques incorrectes constituent le principal problème. Les didacticiels multi-threading et multi-processus Python classiques courants ont tendance à être "lourds". Et cela effleure souvent la surface sans aborder le contenu le plus utile dans le travail quotidien.

Exemples traditionnels

Recherche simple de "Tutoriel multi-thread Python", il n'est pas difficile de trouver que presque tous les tutoriels donnent des exemples impliquant des classes et des files d'attente:

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if 'jpeg' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    pool.map(creat_thumbnail, images)
    pool.close()
    pool.join()

Ha, regarde Cela ressemble un peu à Java, n'est-ce pas ?

Je ne dis pas qu’utiliser le modèle producteur/consommateur pour des tâches multi-thread/multi-processus est une erreur (en fait, ce modèle a sa place). Cependant, nous pouvons utiliser un modèle plus efficace pour gérer les tâches de script quotidiennes.

Le problème est...

Tout d'abord, vous avez besoin d'une classe passe-partout ;
Deuxièmement, vous avez besoin d'une file d'attente pour transmettre les objets
De plus, vous devez également créer des méthodes correspondantes aux deux extrémités du ; canal pour l'aider à travailler (si vous devez effectuer une communication bidirectionnelle ou enregistrer les résultats, vous devez introduire une autre file d'attente).

Plus il y a de travailleurs, plus il y a de problèmes

Suite à cette idée, vous avez maintenant besoin d'un pool de threads de travailleurs. Ce qui suit est un exemple tiré d'un didacticiel IBM classique : accélération via le multithread lors de la récupération de pages Web.

#Example2.py
'''
A more realistic thread pool example 
'''

import time 
import threading 
import Queue 
import urllib2 

class Consumer(threading.Thread): 
    def __init__(self, queue): 
        threading.Thread.__init__(self)
        self._queue = queue 

    def run(self):
        while True: 
            content = self._queue.get() 
            if isinstance(content, str) and content == 'quit':
                break
            response = urllib2.urlopen(content)
        print 'Bye byes!'

def Producer():
    urls = [
        'http://www.python.org', 'http://www.yahoo.com'
        'http://www.scala.org', 'http://www.google.com'
        # etc.. 
    ]
    queue = Queue.Queue()
    worker_threads = build_worker_pool(queue, 4)
    start_time = time.time()

    # Add the urls to process
    for url in urls: 
        queue.put(url)  
    # Add the poison pillv
    for worker in worker_threads:
        queue.put('quit')
    for worker in worker_threads:
        worker.join()

    print 'Done! Time taken: {}'.format(time.time() - start_time)

def build_worker_pool(queue, size):
    workers = []
    for _ in range(size):
        worker = Consumer(queue)
        worker.start() 
        workers.append(worker)
    return workers

if __name__ == '__main__':
    Producer()

Ce code fonctionnera correctement, mais regardez de plus près ce que nous devons faire : construire différentes méthodes, suivre une série de threads, et afin de résoudre l'ennuyeux problème de blocage, nous devons effectuer une série de rejoindre l’opération. Ce n'est que le début...

Jusqu'à présent, nous avons passé en revue le tutoriel multi-threading classique, qui est un peu vide, n'est-ce pas ? Bouilli et sujet aux erreurs, ce style consistant à obtenir deux fois le résultat avec la moitié de l'effort n'est évidemment pas adapté à un usage quotidien. Heureusement, nous avons une meilleure solution.

Pourquoi ne pas essayer map

map Cette petite et exquise fonction est la clé pour paralléliser facilement les programmes Python. Map provient de langages de programmation fonctionnels comme Lisp. Il peut réaliser un mappage entre deux fonctions via une séquence.

    urls = ['http://www.yahoo.com', 'http://www.reddit.com']
    results = map(urllib2.urlopen, urls)

Les deux lignes de code ci-dessus transmettent chaque élément de la séquence d'URL en tant que paramètre à la méthode urlopen et enregistrent tous les résultats dans la liste des résultats. Le résultat est à peu près équivalent à :

results = []
for url in urls: 
    results.append(urllib2.urlopen(url))

La fonction map gère à elle seule une série d'opérations telles que les opérations de séquence, le passage de paramètres et le stockage des résultats.

Pourquoi est-ce important ? En effet, les opérations cartographiques peuvent être facilement parallélisées avec les bonnes bibliothèques.

Une ligne de code Python pour réaliser le parallélisme

Il existe deux bibliothèques en Python qui contiennent la fonction map : multiprocessing et sa sous-bibliothèque peu connue multiprocessing.dummy.

Quelques phrases supplémentaires ici : multiprocessing.dummy ? Clone threadé de la bibliothèque mltiprocessing ? Est-ce une crevette ? Même dans la documentation officielle de la bibliothèque multitraitement, il n'y a qu'une seule description pertinente sur cette sous-bibliothèque. Et cette description traduite en langage adulte signifie essentiellement : « Eh bien, une telle chose existe, sachez-le. » Croyez-moi, cette bibliothèque est sérieusement sous-estimée !

dummy est un clone complet du module multitraitement, la seule différence est que le multitraitement fonctionne sur les processus, tandis que le module factice fonctionne sur les threads (et inclut donc toutes les limitations multithread habituelles de Python).
Il est donc extrêmement simple de remplacer ces deux bibliothèques. Vous pouvez choisir différentes bibliothèques pour les tâches gourmandes en E/S et les tâches gourmandes en CPU. "Essayez-le vous-même" travail de 7 lignes de code dans la fonction build
worker

pool dans example2.py. Il génère une série de threads de travail et les initialise, les stockant dans des variables pour un accès facile. L'objet

Pool a quelques paramètres, et tout ce sur quoi je dois me concentrer ici est son premier paramètre : les processus. Ce paramètre est utilisé pour définir le nombre de threads dans le pool de threads. Sa valeur par défaut est le nombre de cœurs du processeur de la machine actuelle.

De manière générale, lors de l'exécution de tâches gourmandes en CPU, plus il y a de cœurs appelés, plus ce sera rapide. Mais lorsqu'il s'agit de tâches gourmandes en réseau, les choses deviennent un peu imprévisibles et il est sage d'expérimenter pour déterminer la taille du pool de threads.

pool = ThreadPool(4) # Sets the pool size to 4

线程数过多时,切换线程所消耗的时间甚至会超过实际工作时间。对于不同的工作,通过尝试来找到线程池大小的最优值是个不错的主意。

创建好 Pool 对象后,并行化的程序便呼之欲出了。我们来看看改写后的 example2.py

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 

urls = [
    'http://www.python.org', 
    'http://www.python.org/about/',
    'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
    'http://www.python.org/doc/',
    'http://www.python.org/download/',
    'http://www.python.org/getit/',
    'http://www.python.org/community/',
    'https://wiki.python.org/moin/',
    'http://planet.python.org/',
    'https://wiki.python.org/moin/LocalUserGroups',
    'http://www.python.org/psf/',
    'http://docs.python.org/devguide/',
    'http://www.python.org/community/awards/'
    # etc.. 
    ]

# Make the Pool of workers
pool = ThreadPool(4) 
# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
#close the pool and wait for the work to finish 
pool.close() 
pool.join()

实际起作用的代码只有 4 行,其中只有一行是关键的。map 函数轻而易举的取代了前文中超过 40 行的例子。为了更有趣一些,我统计了不同方法、不同线程池大小的耗时情况。

# results = [] 
# for url in urls:
#   result = urllib2.urlopen(url)
#   results.append(result)

# # ------- VERSUS ------- # 

# # ------- 4 Pool ------- # 
# pool = ThreadPool(4) 
# results = pool.map(urllib2.urlopen, urls)

# # ------- 8 Pool ------- # 

# pool = ThreadPool(8) 
# results = pool.map(urllib2.urlopen, urls)

# # ------- 13 Pool ------- # 

# pool = ThreadPool(13) 
# results = pool.map(urllib2.urlopen, urls)

结果:

#        Single thread:  14.4 Seconds 
#               4 Pool:   3.1 Seconds
#               8 Pool:   1.4 Seconds
#              13 Pool:   1.3 Seconds

很棒的结果不是吗?这一结果也说明了为什么要通过实验来确定线程池的大小。在我的机器上当线程池大小大于 9 带来的收益就十分有限了。

另一个真实的例子

生成上千张图片的缩略图 
这是一个 CPU 密集型的任务,并且十分适合进行并行化。

基础单进程版本

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if 'jpeg' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    for image in images:
        create_thumbnail(Image)

上边这段代码的主要工作就是将遍历传入的文件夹中的图片文件,一一生成缩略图,并将这些缩略图保存到特定文件夹中。

这我的机器上,用这一程序处理 6000 张图片需要花费 27.9 秒。

如果我们使用 map 函数来代替 for 循环:

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if 'jpeg' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    pool.map(creat_thumbnail, images)
    pool.close()
    pool.join()

5.6 秒!

虽然只改动了几行代码,我们却明显提高了程序的执行速度。在生产环境中,我们可以为 CPU 密集型任务和 IO 密集型任务分别选择多进程和多线程库来进一步提高执行速度——这也是解决死锁问题的良方。此外,由于 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。

到这里,我们就实现了(基本)通过一行 Python 实现并行化。

译者:caspar

译文:​https://www.php.cn/link/687fe34a901a03abed262a62e22f90db​​​​m/a/1190000000414339 

原文:https://medium.com/building-things-on-the-internet/40e9b2b36148

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer