ホームページ >バックエンド開発 >Python チュートリアル >Python 同時プログラミングをマスターする: 高度なテクニックでパフォーマンスを向上させる

Python 同時プログラミングをマスターする: 高度なテクニックでパフォーマンスを向上させる

Patricia Arquette
Patricia Arquetteオリジナル
2024-12-13 20:39:54952ブラウズ

Mastering Python

Python の同時プログラミング機能は大幅に進化し、開発者に効率的な並列コードを作成するための強力なツールを提供します。私はこれらの高度なテクニックの探索にかなりの時間を費やしてきました。その洞察を皆さんと共有できることを楽しみにしています。

asyncio を使用した非同期プログラミングは、I/O バウンドのタスクにとって大きな変革をもたらします。これにより、スレッド化のオーバーヘッドなしで複数の操作を同時に処理できるノンブロッキング コードを作成できます。以下は、asyncio を使用して複数の URL から同時にデータをフェッチする方法の簡単な例です:

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for url, result in zip(urls, results):
            print(f"Content length of {url}: {len(result)}")

asyncio.run(main())

このコードは、複数のコルーチンを作成して、異なる URL から同時にデータをフェッチする方法を示しています。 asyncio.gather() 関数を使用すると、すべてのコルーチンが完了するのを待ち、その結果を収集できます。

asyncio は I/O バウンドのタスクには優れていますが、CPU バウンドの操作には適していません。これらについては、ThreadPoolExecutor と ProcessPoolExecutor の両方を提供する concurrent.futures モジュールに注目します。 ThreadPoolExecutor は GIL を解放しない I/O バウンドのタスクに最適ですが、ProcessPoolExecutor は CPU バウンドのタスクに最適です。

次に、ThreadPoolExecutor を使用して複数のファイルを同時にダウンロードする例を示します。

import concurrent.futures
import requests

def download_file(url):
    response = requests.get(url)
    filename = url.split('/')[-1]
    with open(filename, 'wb') as f:
        f.write(response.content)
    return f"Downloaded {filename}"

urls = [
    'https://example.com/file1.pdf',
    'https://example.com/file2.pdf',
    'https://example.com/file3.pdf'
]

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    future_to_url = {executor.submit(download_file, url): url for url in urls}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print(f"{url} generated an exception: {exc}")
        else:
            print(data)

このコードは 3 つのワーカーを持つスレッド プールを作成し、URL ごとにダウンロード タスクを送信します。 as_completed() 関数を使用すると、すべてのタスクが完了するのを待つのではなく、結果が利用可能になったときに結果を処理できます。

CPU に依存するタスクの場合、ProcessPoolExecutor を使用して複数の CPU コアを活用できます。以下は素数を並列計算する例です:

import concurrent.futures
import math

def is_prime(n):
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

def find_primes(start, end):
    return [n for n in range(start, end) if is_prime(n)]

ranges = [(1, 25000), (25001, 50000), (50001, 75000), (75001, 100000)]

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = executor.map(lambda r: find_primes(*r), ranges)

all_primes = [prime for sublist in results for prime in sublist]
print(f"Found {len(all_primes)} prime numbers")

このコードは、素数を見つけるタスクを 4 つの範囲に分割し、別々の Python プロセスを使用して並列処理します。 map() 関数は、find_primes() 関数を各範囲に適用し、結果を収集します。

複数のプロセスで作業する場合、多くの場合、プロセス間でデータを共有する必要があります。マルチプロセッシング モジュールは、共有メモリやキューなど、このためのいくつかのオプションを提供します。以下は共有メモリ配列を使用した例です:

from multiprocessing import Process, Array
import numpy as np

def worker(shared_array, start, end):
    for i in range(start, end):
        shared_array[i] = i * i

if __name__ == '__main__':
    size = 10000000
    shared_array = Array('d', size)

    # Create 4 processes
    processes = []
    chunk_size = size // 4
    for i in range(4):
        start = i * chunk_size
        end = start + chunk_size if i < 3 else size
        p = Process(target=worker, args=(shared_array, start, end))
        processes.append(p)
        p.start()

    # Wait for all processes to finish
    for p in processes:
        p.join()

    # Convert shared array to numpy array for easy manipulation
    np_array = np.frombuffer(shared_array.get_obj())
    print(f"Sum of squares: {np_array.sum()}")

このコードは共有メモリ配列を作成し、4 つのプロセスを使用して数値の 2 乗を並行して計算します。共有配列により、すべてのプロセスが同じメモリ空間に書き込むことができるため、プロセス間通信の必要がなくなります。

これらのテクニックは強力ですが、独自の課題も伴います。競合状態、デッドロック、過度のコンテキスト切り替えはすべて、パフォーマンスと正確性に影響を与える可能性があります。同時実行コードを慎重に設計し、必要に応じて適切な同期プリミティブを使用することが重要です。

たとえば、複数のスレッドまたはプロセスが共有リソースにアクセスする必要がある場合、ロックを使用してスレッドの安全性を確保できます。

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for url, result in zip(urls, results):
            print(f"Content length of {url}: {len(result)}")

asyncio.run(main())

このコードは、複数のスレッドが同時に共有カウンターをインクリメントしているときに、ロックを使用して共有カウンターを競合状態から保護する方法を示しています。

もう 1 つの高度なテクニックは、限られたリソースへのアクセスを制御するためのセマフォの使用です。同時ネットワーク接続数を制限する例を次に示します:

import concurrent.futures
import requests

def download_file(url):
    response = requests.get(url)
    filename = url.split('/')[-1]
    with open(filename, 'wb') as f:
        f.write(response.content)
    return f"Downloaded {filename}"

urls = [
    'https://example.com/file1.pdf',
    'https://example.com/file2.pdf',
    'https://example.com/file3.pdf'
]

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    future_to_url = {executor.submit(download_file, url): url for url in urls}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print(f"{url} generated an exception: {exc}")
        else:
            print(data)

このコードはセマフォを使用して同時ネットワーク接続数を 10 に制限し、ネットワークやサーバーに負荷がかかるのを防ぎます。

同時実行コードを使用する場合は、例外を適切に処理することも重要です。 asyncio モジュールは、asyncio.gather() 関数に return_Exceptions パラメータを提供します。これは次の場合に役立ちます。

import concurrent.futures
import math

def is_prime(n):
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

def find_primes(start, end):
    return [n for n in range(start, end) if is_prime(n)]

ranges = [(1, 25000), (25001, 50000), (50001, 75000), (75001, 100000)]

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = executor.map(lambda r: find_primes(*r), ranges)

all_primes = [prime for sublist in results for prime in sublist]
print(f"Found {len(all_primes)} prime numbers")

このコードは、他のタスクの実行を停止せずに同時タスクの例外を処理する方法を示しています。

同時プログラミングをさらに深く掘り下げると、イベント ループやコルーチン チェーンなどのより高度な概念に遭遇します。コルーチンをチェーンする方法を示す例を次に示します。

from multiprocessing import Process, Array
import numpy as np

def worker(shared_array, start, end):
    for i in range(start, end):
        shared_array[i] = i * i

if __name__ == '__main__':
    size = 10000000
    shared_array = Array('d', size)

    # Create 4 processes
    processes = []
    chunk_size = size // 4
    for i in range(4):
        start = i * chunk_size
        end = start + chunk_size if i < 3 else size
        p = Process(target=worker, args=(shared_array, start, end))
        processes.append(p)
        p.start()

    # Wait for all processes to finish
    for p in processes:
        p.join()

    # Convert shared array to numpy array for easy manipulation
    np_array = np.frombuffer(shared_array.get_obj())
    print(f"Sum of squares: {np_array.sum()}")

このコードは 3 つのコルーチン (fetch_data、process_data、save_result) をチェーンして、URL ごとにパイプラインを作成します。 asyncio.gather() 関数はこれらのパイプラインを同時に実行します。

長時間実行されるタスクを扱う場合、多くの場合、キャンセルおよびタイムアウトのメカニズムを実装する必要があります。以下は両方を示す例です:

from threading import Lock, Thread

class Counter:
    def __init__(self):
        self.count = 0
        self.lock = Lock()

    def increment(self):
        with self.lock:
            self.count += 1

def worker(counter, num_increments):
    for _ in range(num_increments):
        counter.increment()

counter = Counter()
threads = []
for _ in range(10):
    t = Thread(target=worker, args=(counter, 100000))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Final count: {counter.count}")

このコードは 5 つの長時間実行タスクを開始しますが、すべてのタスクが完了するまでに 5 秒のタイムアウトを設定します。タイムアウトに達すると、残りのタスクはすべてキャンセルされます。

結論として、Python の同時プログラミング機能は、効率的な並列コードを作成するための幅広いツールとテクニックを提供します。 asyncio を使用した非同期プログラミングから、CPU に依存するタスクのマルチプロセッシングに至るまで、これらの高度な技術により、アプリケーションのパフォーマンスを大幅に向上させることができます。ただし、基礎となる概念を理解し、各タスクに適切なツールを選択し、共有リソースと潜在的な競合状態を注意深く管理することが重要です。練習と慎重な設計により、Python での同時プログラミングの能力を最大限に活用して、高速でスケーラブルで応答性の高いアプリケーションを構築できます。


私たちの作品

私たちの作品をぜひチェックしてください:

インベスターセントラル | スマートな暮らし | エポックとエコー | 不可解な謎 | ヒンドゥーヴァ | エリート開発者 | JS スクール


私たちは中程度です

Tech Koala Insights | エポックズ&エコーズワールド | インベスター・セントラル・メディア | 不可解な謎 中 | 科学とエポックミディアム | 現代ヒンドゥーヴァ

以上がPython 同時プログラミングをマスターする: 高度なテクニックでパフォーマンスを向上させるの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。