>  기사  >  백엔드 개발  >  병렬성을 달성하기 위한 Python 코드 한 줄

병렬성을 달성하기 위한 Python 코드 한 줄

WBOY
WBOY앞으로
2023-04-12 19:04:29859검색

병렬성을 달성하기 위한 Python 코드 한 줄

Python은 프로그램 병렬화로 다소 악명이 높습니다. 스레드 구현 및 GIL과 같은 기술적인 문제는 차치하더라도, 잘못된 교육 지침이 주요 문제라고 생각합니다. 일반적인 클래식 Python 멀티스레딩 및 멀티프로세스 튜토리얼은 "무거운" 경향이 있습니다. 그리고 일상 업무에서 가장 유용한 콘텐츠를 다루지 않고 표면만 긁는 경우가 많습니다.

전통적인 예제

"Python 멀티 스레딩 튜토리얼"로 간단히 검색해 보면 거의 모든 튜토리얼에서 클래스와 큐에 관련된 예제를 제공한다는 것을 어렵지 않게 찾을 수 있습니다:

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if 'jpeg' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    pool.map(creat_thumbnail, images)
    pool.close()
    pool.join()

하, 보세요 약간 자바와 비슷해 보이지 않나요?

멀티 스레드/멀티 프로세스 작업에 생산자/소비자 모델을 사용하는 것이 잘못되었다고 말하는 것은 아닙니다(사실 이 모델이 그 자리를 차지합니다). 그러나 일상적인 스크립팅 작업을 처리할 때는 보다 효율적인 모델을 사용할 수 있습니다.

문제는...

먼저 상용구 클래스가 필요합니다.
둘째, 객체를 전달하기 위한 대기열이 필요합니다.
또한 해당 메서드를 양쪽 끝에 구축해야 합니다. 작업을 지원하는 채널입니다(양방향 통신을 수행하거나 결과를 저장해야 하는 경우 다른 대기열을 도입해야 합니다).

작업자가 많을수록 문제도 많아집니다.

이 아이디어에 따르면 이제 작업자 스레드의 스레드 풀이 필요합니다. 다음은 웹 페이지를 검색할 때 멀티스레딩을 통한 가속이라는 고전적인 IBM 튜토리얼의 예입니다.

#Example2.py
'''
A more realistic thread pool example 
'''

import time 
import threading 
import Queue 
import urllib2 

class Consumer(threading.Thread): 
    def __init__(self, queue): 
        threading.Thread.__init__(self)
        self._queue = queue 

    def run(self):
        while True: 
            content = self._queue.get() 
            if isinstance(content, str) and content == 'quit':
                break
            response = urllib2.urlopen(content)
        print 'Bye byes!'

def Producer():
    urls = [
        'http://www.python.org', 'http://www.yahoo.com'
        'http://www.scala.org', 'http://www.google.com'
        # etc.. 
    ]
    queue = Queue.Queue()
    worker_threads = build_worker_pool(queue, 4)
    start_time = time.time()

    # Add the urls to process
    for url in urls: 
        queue.put(url)  
    # Add the poison pillv
    for worker in worker_threads:
        queue.put('quit')
    for worker in worker_threads:
        worker.join()

    print 'Done! Time taken: {}'.format(time.time() - start_time)

def build_worker_pool(queue, size):
    workers = []
    for _ in range(size):
        worker = Consumer(queue)
        worker.start() 
        workers.append(worker)
    return workers

if __name__ == '__main__':
    Producer()

이 코드는 올바르게 실행되지만 수행해야 할 작업을 자세히 살펴보세요. 다양한 메서드를 구성하고 일련의 스레드를 추적하며 성가신 교착 상태 문제를 해결하려면 일련의 작업을 수행해야 합니다. 조인 작업. 이건 시작에 불과합니다...

지금까지 다소 공허한 고전적인 멀티스레딩 튜토리얼을 살펴보았습니다. 그렇지 않나요? 단순하고 오류가 발생하기 쉬운 이러한 스타일은 절반의 노력으로 두 배의 결과를 얻는 방식은 분명히 일상적인 사용에는 적합하지 않습니다. 다행히도 더 나은 방법이 있습니다.

map

map 이 작고 정교한 함수는 Python 프로그램을 쉽게 병렬화하는 핵심입니다. Map은 Lisp와 같은 함수형 프로그래밍 언어에서 유래되었습니다. 시퀀스를 통해 두 기능 간의 매핑을 달성할 수 있습니다.

    urls = ['http://www.yahoo.com', 'http://www.reddit.com']
    results = map(urllib2.urlopen, urls)

위 두 줄의 코드는 urls 시퀀스의 각 요소를 urlopen 메소드에 대한 매개변수로 전달하고 모든 결과를 결과 목록에 저장합니다. 결과는 대략 다음과 같습니다.

results = []
for url in urls: 
    results.append(urllib2.urlopen(url))

맵 함수는 시퀀스 작업, 매개변수 전달 및 결과 저장과 같은 일련의 작업을 단독으로 처리합니다.

이것이 왜 중요한가요? 이는 올바른 라이브러리를 사용하여 맵 작업을 쉽게 병렬화할 수 있기 때문입니다.

병렬성을 달성하기 위한 Python 코드 한 줄

Python에는 맵 함수를 포함하는 두 개의 라이브러리가 있습니다: multiprocessing과 잘 알려지지 않은 하위 라이브러리 multiprocessing.dummy.

여기에 몇 가지 문장이 더 있습니다: multiprocessing.dummy? mltiprocessing 라이브러리의 스레드 복제? 이게 새우야? 다중 처리 라이브러리의 공식 문서에도 이 하위 라이브러리에 대한 관련 설명은 하나만 있습니다. 그리고 이 설명을 성인용 언어로 번역하면 기본적으로 다음과 같습니다: "글쎄, 그런 것이 있으니 그냥 알아두세요." 저를 믿으세요, 이 도서관은 심각하게 과소평가되어 있습니다!

dummy는 멀티프로세싱 모듈의 완전한 복제입니다. 유일한 차이점은 멀티프로세싱이 프로세스에서 작동하는 반면 더미 모듈은 스레드에서 작동한다는 것입니다(따라서 Python의 모든 일반적인 멀티스레딩 제한 사항 포함).
그래서 이 두 라이브러리를 교체하는 것은 매우 쉽습니다. IO 집약적 작업과 CPU 집약적 작업에 대해 서로 다른 라이브러리를 선택할 수 있습니다.

직접 시도해 보세요.

다음 두 줄의 코드를 사용하여 병렬화된 맵 함수가 포함된 라이브러리를 참조하세요.

from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

풀 개체 인스턴스화:

pool = ThreadPool()

이 간단한 문은 example2.py의 buildworkerpool 함수에 있는 7줄의 코드 작업입니다. 일련의 작업자 스레드를 생성하고 초기화하여 쉽게 액세스할 수 있도록 변수에 저장합니다.

Pool 개체에는 몇 가지 매개 변수가 있으며 여기서 집중해야 할 것은 첫 번째 매개 변수인 프로세스입니다. 이 매개 변수는 스레드 풀의 스레드 수를 설정하는 데 사용됩니다. 기본값은 현재 시스템 CPU의 코어 수입니다.

일반적으로 CPU 집약적인 작업을 수행할 때 더 많은 코어를 호출할수록 속도가 빨라집니다. 그러나 네트워크 집약적인 작업을 처리할 때는 상황이 다소 예측 불가능하므로 스레드 풀의 크기를 결정하기 위해 실험을 해보는 것이 현명합니다.

pool = ThreadPool(4) # Sets the pool size to 4

线程数过多时,切换线程所消耗的时间甚至会超过实际工作时间。对于不同的工作,通过尝试来找到线程池大小的最优值是个不错的主意。

创建好 Pool 对象后,并行化的程序便呼之欲出了。我们来看看改写后的 example2.py

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 

urls = [
    'http://www.python.org', 
    'http://www.python.org/about/',
    'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
    'http://www.python.org/doc/',
    'http://www.python.org/download/',
    'http://www.python.org/getit/',
    'http://www.python.org/community/',
    'https://wiki.python.org/moin/',
    'http://planet.python.org/',
    'https://wiki.python.org/moin/LocalUserGroups',
    'http://www.python.org/psf/',
    'http://docs.python.org/devguide/',
    'http://www.python.org/community/awards/'
    # etc.. 
    ]

# Make the Pool of workers
pool = ThreadPool(4) 
# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
#close the pool and wait for the work to finish 
pool.close() 
pool.join()

实际起作用的代码只有 4 行,其中只有一行是关键的。map 函数轻而易举的取代了前文中超过 40 行的例子。为了更有趣一些,我统计了不同方法、不同线程池大小的耗时情况。

# results = [] 
# for url in urls:
#   result = urllib2.urlopen(url)
#   results.append(result)

# # ------- VERSUS ------- # 

# # ------- 4 Pool ------- # 
# pool = ThreadPool(4) 
# results = pool.map(urllib2.urlopen, urls)

# # ------- 8 Pool ------- # 

# pool = ThreadPool(8) 
# results = pool.map(urllib2.urlopen, urls)

# # ------- 13 Pool ------- # 

# pool = ThreadPool(13) 
# results = pool.map(urllib2.urlopen, urls)

结果:

#        Single thread:  14.4 Seconds 
#               4 Pool:   3.1 Seconds
#               8 Pool:   1.4 Seconds
#              13 Pool:   1.3 Seconds

很棒的结果不是吗?这一结果也说明了为什么要通过实验来确定线程池的大小。在我的机器上当线程池大小大于 9 带来的收益就十分有限了。

另一个真实的例子

生成上千张图片的缩略图 
这是一个 CPU 密集型的任务,并且十分适合进行并行化。

基础单进程版本

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if 'jpeg' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    for image in images:
        create_thumbnail(Image)

上边这段代码的主要工作就是将遍历传入的文件夹中的图片文件,一一生成缩略图,并将这些缩略图保存到特定文件夹中。

这我的机器上,用这一程序处理 6000 张图片需要花费 27.9 秒。

如果我们使用 map 函数来代替 for 循环:

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if 'jpeg' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    pool.map(creat_thumbnail, images)
    pool.close()
    pool.join()

5.6 秒!

虽然只改动了几行代码,我们却明显提高了程序的执行速度。在生产环境中,我们可以为 CPU 密集型任务和 IO 密集型任务分别选择多进程和多线程库来进一步提高执行速度——这也是解决死锁问题的良方。此外,由于 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。

到这里,我们就实现了(基本)通过一行 Python 实现并行化。

译者:caspar

译文:​https://www.php.cn/link/687fe34a901a03abed262a62e22f90db​​​​m/a/1190000000414339 

原文:https://medium.com/building-things-on-the-internet/40e9b2b36148

위 내용은 병렬성을 달성하기 위한 Python 코드 한 줄의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 51cto.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제