Home  >  Article  >  Backend Development  >  One line of Python code to achieve parallelism

One line of Python code to achieve parallelism

WBOY
WBOYforward
2023-04-12 19:04:29886browse

One line of Python code to achieve parallelism

#Python has somewhat of a bad reputation when it comes to program parallelism. Leaving aside technical issues, such as thread implementation and the GIL, I feel that incorrect teaching guidance is the main problem. Common classic Python multi-threading and multi-process tutorials tend to be "heavy". And it often scratches the surface without delving into the most useful content in daily work.

Traditional examples

Simple search for "Python multi-threading tutorial", it is not difficult to find almost all tutorials Examples involving classes and queues are given:

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if 'jpeg' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    pool.map(creat_thumbnail, images)
    pool.close()
    pool.join()

Ha, it looks a bit like Java, doesn’t it?

I am not saying that using the producer/consumer model for multi-threaded/multi-process tasks is wrong (in fact, this model has its place). However, we can use a more efficient model when dealing with daily scripting tasks.

The problem is...

First, you need a boilerplate class;
Second, you need a queue To pass the object;
Moreover, you also need to build corresponding methods on both ends of the channel to assist its work (if you need to carry out two-way communication or save the results, you need to introduce a queue).

The more workers, the more problems

According to this idea, you now need a thread of worker threads Pool. The following is an example from a classic IBM tutorial - acceleration through multi-threading when retrieving web pages.

#Example2.py
'''
A more realistic thread pool example 
'''

import time 
import threading 
import Queue 
import urllib2 

class Consumer(threading.Thread): 
    def __init__(self, queue): 
        threading.Thread.__init__(self)
        self._queue = queue 

    def run(self):
        while True: 
            content = self._queue.get() 
            if isinstance(content, str) and content == 'quit':
                break
            response = urllib2.urlopen(content)
        print 'Bye byes!'

def Producer():
    urls = [
        'http://www.python.org', 'http://www.yahoo.com'
        'http://www.scala.org', 'http://www.google.com'
        # etc.. 
    ]
    queue = Queue.Queue()
    worker_threads = build_worker_pool(queue, 4)
    start_time = time.time()

    # Add the urls to process
    for url in urls: 
        queue.put(url)  
    # Add the poison pillv
    for worker in worker_threads:
        queue.put('quit')
    for worker in worker_threads:
        worker.join()

    print 'Done! Time taken: {}'.format(time.time() - start_time)

def build_worker_pool(queue, size):
    workers = []
    for _ in range(size):
        worker = Consumer(queue)
        worker.start() 
        workers.append(worker)
    return workers

if __name__ == '__main__':
    Producer()

This code will run correctly, but take a closer look at what we need to do: construct different methods, track a series of threads, and solve annoying deadlocks Problem, we need to perform a series of join operations. This is just the beginning...

So far we have reviewed the classic multi-threading tutorial, which is somewhat empty, isn't it? Boiled and error-prone, this style of getting twice the result with half the effort is obviously not suitable for daily use. Fortunately, we have a better way.

Why not try map

map This small and exquisite function is the key to easily parallelizing Python programs . Map originates from functional programming languages ​​like Lisp. It can achieve mapping between two functions through a sequence.

    urls = ['http://www.yahoo.com', 'http://www.reddit.com']
    results = map(urllib2.urlopen, urls)

The above two lines of code pass each element in the urls sequence as a parameter to the urlopen method, and save all results to the results list. The result is roughly equivalent to:

results = []
for url in urls: 
    results.append(urllib2.urlopen(url))

map function single-handedly handles a series of operations such as sequence operations, parameter passing, and result storage.

Why is this important? This is because map operations can be easily parallelized with the right libraries.

One line of Python code to achieve parallelism

There are two libraries in Python that contain the map function: multiprocessing and its little-known sub-library multiprocessing .dummy.

A few more sentences here: multiprocessing.dummy? Threaded clone of mltiprocessing library? Is this shrimp? Even in the official documentation of the multiprocessing library, there is only one relevant description about this sub-library. And this description translated into adult language basically means: "Well, there is such a thing, just know it." Believe me, this library is seriously underestimated!

dummy is a complete clone of the multiprocessing module, the only difference is that multiprocessing works on processes, while the dummy module works on threads (and thus includes all of Python's usual multithreading limitations).
So it is extremely easy to replace these two libraries. You can choose different libraries for IO-intensive tasks and CPU-intensive tasks.

Get your hands on it

Use the following two lines of code to reference the library containing the parallelized map function:

from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

Instantiate the Pool object:

pool = ThreadPool()

This simple statement replaces the buildworkerpool function in example2.py 7 lines of code work. It spawns a series of worker threads and initializes them, storing them in variables for easy access.

The Pool object has some parameters. All I need to pay attention to here is its first parameter: processes. This parameter is used to set the number of threads in the thread pool. Its default value is the number of cores of the current machine's CPU.

Generally speaking, when performing CPU-intensive tasks, the more cores called, the faster it will be. But when dealing with network-intensive tasks, things get a little unpredictable, and it's wise to experiment to determine the size of the thread pool.

pool = ThreadPool(4) # Sets the pool size to 4

线程数过多时,切换线程所消耗的时间甚至会超过实际工作时间。对于不同的工作,通过尝试来找到线程池大小的最优值是个不错的主意。

创建好 Pool 对象后,并行化的程序便呼之欲出了。我们来看看改写后的 example2.py

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 

urls = [
    'http://www.python.org', 
    'http://www.python.org/about/',
    'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
    'http://www.python.org/doc/',
    'http://www.python.org/download/',
    'http://www.python.org/getit/',
    'http://www.python.org/community/',
    'https://wiki.python.org/moin/',
    'http://planet.python.org/',
    'https://wiki.python.org/moin/LocalUserGroups',
    'http://www.python.org/psf/',
    'http://docs.python.org/devguide/',
    'http://www.python.org/community/awards/'
    # etc.. 
    ]

# Make the Pool of workers
pool = ThreadPool(4) 
# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
#close the pool and wait for the work to finish 
pool.close() 
pool.join()

实际起作用的代码只有 4 行,其中只有一行是关键的。map 函数轻而易举的取代了前文中超过 40 行的例子。为了更有趣一些,我统计了不同方法、不同线程池大小的耗时情况。

# results = [] 
# for url in urls:
#   result = urllib2.urlopen(url)
#   results.append(result)

# # ------- VERSUS ------- # 

# # ------- 4 Pool ------- # 
# pool = ThreadPool(4) 
# results = pool.map(urllib2.urlopen, urls)

# # ------- 8 Pool ------- # 

# pool = ThreadPool(8) 
# results = pool.map(urllib2.urlopen, urls)

# # ------- 13 Pool ------- # 

# pool = ThreadPool(13) 
# results = pool.map(urllib2.urlopen, urls)

结果:

#        Single thread:  14.4 Seconds 
#               4 Pool:   3.1 Seconds
#               8 Pool:   1.4 Seconds
#              13 Pool:   1.3 Seconds

很棒的结果不是吗?这一结果也说明了为什么要通过实验来确定线程池的大小。在我的机器上当线程池大小大于 9 带来的收益就十分有限了。

另一个真实的例子

生成上千张图片的缩略图 
这是一个 CPU 密集型的任务,并且十分适合进行并行化。

基础单进程版本

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if 'jpeg' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    for image in images:
        create_thumbnail(Image)

上边这段代码的主要工作就是将遍历传入的文件夹中的图片文件,一一生成缩略图,并将这些缩略图保存到特定文件夹中。

这我的机器上,用这一程序处理 6000 张图片需要花费 27.9 秒。

如果我们使用 map 函数来代替 for 循环:

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if 'jpeg' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    pool.map(creat_thumbnail, images)
    pool.close()
    pool.join()

5.6 秒!

虽然只改动了几行代码,我们却明显提高了程序的执行速度。在生产环境中,我们可以为 CPU 密集型任务和 IO 密集型任务分别选择多进程和多线程库来进一步提高执行速度——这也是解决死锁问题的良方。此外,由于 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。

到这里,我们就实现了(基本)通过一行 Python 实现并行化。

译者:caspar

译文:​https://www.php.cn/link/687fe34a901a03abed262a62e22f90db​​​​m/a/1190000000414339 

原文:https://medium.com/building-things-on-the-internet/40e9b2b36148

The above is the detailed content of One line of Python code to achieve parallelism. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:51cto.com. If there is any infringement, please contact admin@php.cn delete