Maison >développement back-end >Tutoriel Python >Maîtriser la programmation simultanée de Python : améliorer les performances avec des techniques avancées

Maîtriser la programmation simultanée de Python : améliorer les performances avec des techniques avancées

Patricia Arquette
Patricia Arquetteoriginal
2024-12-13 20:39:541018parcourir

Mastering Python

Les capacités de programmation simultanée de Python ont considérablement évolué, offrant aux développeurs des outils puissants pour écrire du code parallèle efficace. J'ai passé beaucoup de temps à explorer ces techniques avancées et je suis ravi de partager mes idées avec vous.

La programmation asynchrone avec asyncio change la donne pour les tâches liées aux E/S. Cela nous permet d'écrire du code non bloquant qui peut gérer plusieurs opérations simultanément sans la surcharge du threading. Voici un exemple simple de la façon dont nous pouvons utiliser asyncio pour récupérer des données de plusieurs URL simultanément :

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for url, result in zip(urls, results):
            print(f"Content length of {url}: {len(result)}")

asyncio.run(main())

Ce code montre comment nous pouvons créer plusieurs coroutines pour récupérer simultanément des données de différentes URL. La fonction asyncio.gather() nous permet d'attendre que toutes les coroutines soient terminées et de collecter leurs résultats.

Bien qu'asyncio soit excellent pour les tâches liées aux E/S, il ne convient pas aux opérations liées au CPU. Pour ceux-là, nous nous tournons vers le module concurrent.futures, qui fournit à la fois ThreadPoolExecutor et ProcessPoolExecutor. ThreadPoolExecutor est idéal pour les tâches liées aux E/S qui ne libèrent pas le GIL, tandis que ProcessPoolExecutor est parfait pour les tâches liées au CPU.

Voici un exemple utilisant ThreadPoolExecutor pour télécharger plusieurs fichiers simultanément :

import concurrent.futures
import requests

def download_file(url):
    response = requests.get(url)
    filename = url.split('/')[-1]
    with open(filename, 'wb') as f:
        f.write(response.content)
    return f"Downloaded {filename}"

urls = [
    'https://example.com/file1.pdf',
    'https://example.com/file2.pdf',
    'https://example.com/file3.pdf'
]

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    future_to_url = {executor.submit(download_file, url): url for url in urls}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print(f"{url} generated an exception: {exc}")
        else:
            print(data)

Ce code crée un pool de threads avec trois nœuds de calcul et soumet une tâche de téléchargement pour chaque URL. La fonction as_completed() nous permet de traiter les résultats dès qu'ils sont disponibles, plutôt que d'attendre la fin de toutes les tâches.

Pour les tâches liées au processeur, nous pouvons utiliser ProcessPoolExecutor pour exploiter plusieurs cœurs de processeur. Voici un exemple qui calcule des nombres premiers en parallèle :

import concurrent.futures
import math

def is_prime(n):
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

def find_primes(start, end):
    return [n for n in range(start, end) if is_prime(n)]

ranges = [(1, 25000), (25001, 50000), (50001, 75000), (75001, 100000)]

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = executor.map(lambda r: find_primes(*r), ranges)

all_primes = [prime for sublist in results for prime in sublist]
print(f"Found {len(all_primes)} prime numbers")

Ce code divise la tâche de recherche de nombres premiers en quatre plages et les traite en parallèle à l'aide de processus Python distincts. La fonction map() applique notre fonction find_primes() à chaque plage et collecte les résultats.

Lorsque nous travaillons avec plusieurs processus, nous devons souvent partager des données entre eux. Le module multitraitement propose plusieurs options pour cela, notamment la mémoire partagée et les files d'attente. Voici un exemple utilisant une matrice de mémoire partagée :

from multiprocessing import Process, Array
import numpy as np

def worker(shared_array, start, end):
    for i in range(start, end):
        shared_array[i] = i * i

if __name__ == '__main__':
    size = 10000000
    shared_array = Array('d', size)

    # Create 4 processes
    processes = []
    chunk_size = size // 4
    for i in range(4):
        start = i * chunk_size
        end = start + chunk_size if i < 3 else size
        p = Process(target=worker, args=(shared_array, start, end))
        processes.append(p)
        p.start()

    # Wait for all processes to finish
    for p in processes:
        p.join()

    # Convert shared array to numpy array for easy manipulation
    np_array = np.frombuffer(shared_array.get_obj())
    print(f"Sum of squares: {np_array.sum()}")

Ce code crée un tableau de mémoire partagée et utilise quatre processus pour calculer les carrés des nombres en parallèle. Le tableau partagé permet à tous les processus d'écrire dans le même espace mémoire, évitant ainsi le besoin de communication inter-processus.

Bien que ces techniques soient puissantes, elles comportent leur propre ensemble de défis. Les conditions de concurrence, les blocages et les changements de contexte excessifs peuvent tous avoir un impact sur les performances et l'exactitude. Il est crucial de concevoir soigneusement votre code concurrent et d'utiliser les primitives de synchronisation appropriées si nécessaire.

Par exemple, lorsque plusieurs threads ou processus doivent accéder à une ressource partagée, nous pouvons utiliser un verrou pour garantir la sécurité des threads :

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for url, result in zip(urls, results):
            print(f"Content length of {url}: {len(result)}")

asyncio.run(main())

Ce code montre comment utiliser un verrou pour protéger un compteur partagé des conditions de concurrence lorsque plusieurs threads l'incrémentent simultanément.

Une autre technique avancée est l'utilisation de sémaphores pour contrôler l'accès à une ressource limitée. Voici un exemple qui limite le nombre de connexions réseau simultanées :

import concurrent.futures
import requests

def download_file(url):
    response = requests.get(url)
    filename = url.split('/')[-1]
    with open(filename, 'wb') as f:
        f.write(response.content)
    return f"Downloaded {filename}"

urls = [
    'https://example.com/file1.pdf',
    'https://example.com/file2.pdf',
    'https://example.com/file3.pdf'
]

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    future_to_url = {executor.submit(download_file, url): url for url in urls}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print(f"{url} generated an exception: {exc}")
        else:
            print(data)

Ce code utilise un sémaphore pour limiter le nombre de connexions réseau simultanées à 10, évitant ainsi de surcharger le réseau ou le serveur.

Lorsque vous travaillez avec du code concurrent, il est également important de gérer correctement les exceptions. Le module asyncio fournit à la fonction asyncio.gather() un paramètre return_exceptions qui peut être utile pour cela :

import concurrent.futures
import math

def is_prime(n):
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

def find_primes(start, end):
    return [n for n in range(start, end) if is_prime(n)]

ranges = [(1, 25000), (25001, 50000), (50001, 75000), (75001, 100000)]

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = executor.map(lambda r: find_primes(*r), ranges)

all_primes = [prime for sublist in results for prime in sublist]
print(f"Found {len(all_primes)} prime numbers")

Ce code montre comment gérer les exceptions dans les tâches simultanées sans arrêter l'exécution d'autres tâches.

Au fur et à mesure que nous approfondissons la programmation simultanée, nous rencontrons des concepts plus avancés tels que les boucles d'événements et le chaînage de coroutines. Voici un exemple qui montre comment enchaîner des coroutines :

from multiprocessing import Process, Array
import numpy as np

def worker(shared_array, start, end):
    for i in range(start, end):
        shared_array[i] = i * i

if __name__ == '__main__':
    size = 10000000
    shared_array = Array('d', size)

    # Create 4 processes
    processes = []
    chunk_size = size // 4
    for i in range(4):
        start = i * chunk_size
        end = start + chunk_size if i < 3 else size
        p = Process(target=worker, args=(shared_array, start, end))
        processes.append(p)
        p.start()

    # Wait for all processes to finish
    for p in processes:
        p.join()

    # Convert shared array to numpy array for easy manipulation
    np_array = np.frombuffer(shared_array.get_obj())
    print(f"Sum of squares: {np_array.sum()}")

Ce code enchaîne trois coroutines (fetch_data, process_data et save_result) pour créer un pipeline pour chaque URL. La fonction asyncio.gather() exécute ensuite ces pipelines simultanément.

Lorsque vous travaillez sur des tâches de longue durée, il est souvent nécessaire de mettre en œuvre des mécanismes d'annulation et de délai d'attente. Voici un exemple qui démontre les deux :

from threading import Lock, Thread

class Counter:
    def __init__(self):
        self.count = 0
        self.lock = Lock()

    def increment(self):
        with self.lock:
            self.count += 1

def worker(counter, num_increments):
    for _ in range(num_increments):
        counter.increment()

counter = Counter()
threads = []
for _ in range(10):
    t = Thread(target=worker, args=(counter, 100000))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Final count: {counter.count}")

Ce code démarre cinq tâches de longue durée mais définit un délai d'attente de 5 secondes pour que toutes les tâches soient terminées. Si le délai d'attente est atteint, il annule toutes les tâches restantes.

En conclusion, les capacités de programmation simultanée de Python offrent un large éventail d'outils et de techniques pour écrire du code parallèle efficace. De la programmation asynchrone avec asyncio au multitraitement pour les tâches liées au processeur, ces techniques avancées peuvent améliorer considérablement les performances de nos applications. Cependant, il est crucial de comprendre les concepts sous-jacents, de choisir le bon outil pour chaque tâche et de gérer soigneusement les ressources partagées et les conditions de concurrence potentielles. Avec de la pratique et une conception soignée, nous pouvons exploiter toute la puissance de la programmation simultanée en Python pour créer des applications rapides, évolutives et réactives.


Nos créations

N'oubliez pas de consulter nos créations :

Centre des investisseurs | Vie intelligente | Époques & Échos | Mystères déroutants | Hindutva | Développeur Élite | Écoles JS


Nous sommes sur Medium

Tech Koala Insights | Epoques & Echos Monde | Support Central des Investisseurs | Mystères déroutants Medium | Sciences & Epoques Medium | Hindutva moderne

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn