Maison >développement back-end >Tutoriel Python >Maîtrisez les coroutines Python : créez des outils asynchrones personnalisés pour de puissantes applications simultanées

Maîtrisez les coroutines Python : créez des outils asynchrones personnalisés pour de puissantes applications simultanées

DDD
DDDoriginal
2024-11-29 12:18:141063parcourir

Master Python Coroutines: Create Custom Async Tools for Powerful Concurrent Apps

Les coroutines en Python sont un outil puissant pour écrire du code asynchrone. Ils ont révolutionné la façon dont nous gérons les opérations simultanées, facilitant ainsi la création d'applications évolutives et efficaces. J'ai passé beaucoup de temps à travailler avec des coroutines et je suis ravi de partager quelques idées sur la création de primitives asynchrones personnalisées.

Commençons par les bases. Les coroutines sont des fonctions spéciales qui peuvent être suspendues et reprises, permettant un multitâche coopératif. Ils constituent le fondement de la syntaxe async/wait de Python. Lorsque vous définissez une coroutine, vous créez essentiellement une fonction qui peut redonner le contrôle à la boucle d'événements, permettant ainsi à d'autres tâches de s'exécuter.

Pour créer un objet waitable personnalisé, vous devez implémenter la méthode await. Cette méthode doit renvoyer un itérateur. Voici un exemple simple :

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42

Cette classe CustomAwaitable peut être utilisée avec le mot-clé wait, tout comme les waitables intégrés. Lorsqu'il est attendu, il cède le contrôle une fois, puis renvoie sa valeur.

Mais que se passe-t-il si nous voulons créer des primitives asynchrones plus complexes ? Voyons implémenter un sémaphore personnalisé. Les sémaphores sont utilisés pour contrôler l'accès à une ressource partagée par plusieurs coroutines :

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value <= 0:
            fut = asyncio.get_running_loop().create_future()
            self._waiters.append(fut)
            await fut
        self._value -= 1

    def release(self):
        self._value += 1
        if self._waiters:
            asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

async def worker(semaphore, num):
    async with semaphore:
        print(f"Worker {num} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"Worker {num} released the semaphore")

async def main():
    semaphore = CustomSemaphore(2)
    tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())

Cette classe CustomSemaphore implémente les méthodes d'acquisition et de libération, ainsi que le protocole de gestion de contexte asynchrone (aenter et aexit). Il permet à un maximum de deux coroutines d'acquérir le sémaphore simultanément.

Parlons maintenant de la création de boucles d'événements efficaces. Bien que l'asyncio de Python fournisse une implémentation robuste de boucle d'événements, il peut y avoir des cas où vous en aurez besoin d'une personnalisée. Voici un exemple de base de boucle d'événement personnalisée :

import time
from collections import deque

class CustomEventLoop:
    def __init__(self):
        self._ready = deque()
        self._stopping = False

    def call_soon(self, callback, *args):
        self._ready.append((callback, args))

    def run_forever(self):
        while not self._stopping:
            self._run_once()

    def _run_once(self):
        ntodo = len(self._ready)
        for _ in range(ntodo):
            callback, args = self._ready.popleft()
            callback(*args)

    def stop(self):
        self._stopping = True

    def run_until_complete(self, coro):
        def _done_callback(fut):
            self.stop()

        task = self.create_task(coro)
        task.add_done_callback(_done_callback)
        self.run_forever()
        return task.result()

    def create_task(self, coro):
        task = Task(coro, self)
        self.call_soon(task._step)
        return task

class Task:
    def __init__(self, coro, loop):
        self._coro = coro
        self._loop = loop
        self._done = False
        self._result = None
        self._callbacks = []

    def _step(self):
        try:
            if self._done:
                return
            result = self._coro.send(None)
            if isinstance(result, SleepHandle):
                result._task = self
                self._loop.call_soon(result._wake_up)
            else:
                self._loop.call_soon(self._step)
        except StopIteration as e:
            self.set_result(e.value)

    def set_result(self, result):
        self._result = result
        self._done = True
        for callback in self._callbacks:
            self._loop.call_soon(callback, self)

    def add_done_callback(self, callback):
        if self._done:
            self._loop.call_soon(callback, self)
        else:
            self._callbacks.append(callback)

    def result(self):
        if not self._done:
            raise RuntimeError('Task is not done')
        return self._result

class SleepHandle:
    def __init__(self, duration):
        self._duration = duration
        self._task = None
        self._start_time = time.time()

    def _wake_up(self):
        if time.time() - self._start_time >= self._duration:
            self._task._loop.call_soon(self._task._step)
        else:
            self._task._loop.call_soon(self._wake_up)

async def sleep(duration):
    return SleepHandle(duration)

async def example():
    print("Start")
    await sleep(1)
    print("After 1 second")
    await sleep(2)
    print("After 2 more seconds")
    return "Done"

loop = CustomEventLoop()
result = loop.run_until_complete(example())
print(result)

Cette boucle d'événements personnalisée implémente des fonctionnalités de base telles que l'exécution de tâches, la gestion des coroutines et même une simple fonction de mise en veille. Ce n'est pas aussi riche en fonctionnalités que la boucle d'événements intégrée de Python, mais il démontre les concepts de base.

L'un des défis de l'écriture de code asynchrone est la gestion des priorités des tâches. Bien que l'asyncio de Python ne fournisse pas de files d'attente prioritaires intégrées pour les tâches, nous pouvons implémenter la nôtre :

import asyncio
import heapq

class PriorityEventLoop(asyncio.AbstractEventLoop):
    def __init__(self):
        self._ready = []
        self._stopping = False
        self._clock = 0

    def call_at(self, when, callback, *args, context=None):
        handle = asyncio.Handle(callback, args, self, context)
        heapq.heappush(self._ready, (when, handle))
        return handle

    def call_later(self, delay, callback, *args, context=None):
        return self.call_at(self._clock + delay, callback, *args, context=context)

    def call_soon(self, callback, *args, context=None):
        return self.call_at(self._clock, callback, *args, context=context)

    def time(self):
        return self._clock

    def stop(self):
        self._stopping = True

    def is_running(self):
        return not self._stopping

    def run_forever(self):
        while self._ready and not self._stopping:
            self._run_once()

    def _run_once(self):
        if not self._ready:
            return
        when, handle = heapq.heappop(self._ready)
        self._clock = when
        handle._run()

    def create_task(self, coro):
        return asyncio.Task(coro, loop=self)

    def run_until_complete(self, future):
        asyncio.futures._chain_future(future, self.create_future())
        self.run_forever()
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')
        return future.result()

    def create_future(self):
        return asyncio.Future(loop=self)

async def low_priority_task():
    print("Low priority task started")
    await asyncio.sleep(2)
    print("Low priority task finished")

async def high_priority_task():
    print("High priority task started")
    await asyncio.sleep(1)
    print("High priority task finished")

async def main():
    loop = asyncio.get_event_loop()
    loop.call_later(0.1, loop.create_task, low_priority_task())
    loop.call_later(0, loop.create_task, high_priority_task())
    await asyncio.sleep(3)

asyncio.run(main())

Ce PriorityEventLoop utilise une file d'attente de tas pour gérer les tâches en fonction de leur heure d'exécution planifiée. Vous pouvez attribuer des priorités en planifiant des tâches avec des délais différents.

Gérer l'annulation avec élégance est un autre aspect important du travail avec les coroutines. Voici un exemple de la façon de mettre en œuvre des tâches annulables :

import asyncio

async def cancellable_operation():
    try:
        print("Operation started")
        await asyncio.sleep(5)
        print("Operation completed")
    except asyncio.CancelledError:
        print("Operation was cancelled")
        # Perform any necessary cleanup
        raise  # Re-raise the CancelledError

async def main():
    task = asyncio.create_task(cancellable_operation())
    await asyncio.sleep(2)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Main: task was cancelled")

asyncio.run(main())

Dans cet exemple, CancelledError intercepte CancelledError, effectue tout nettoyage nécessaire, puis relance l'exception. Cela permet une gestion gracieuse de l'annulation tout en continuant à propager le statut d'annulation.

Explorons l'implémentation d'itérateurs asynchrones personnalisés. Ceux-ci sont utiles pour créer des séquences qui peuvent être itérées de manière asynchrone :

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42

Cette classe AsyncRange implémente le protocole d'itérateur asynchrone, lui permettant d'être utilisée en asynchrone pour les boucles.

Enfin, examinons la mise en œuvre de gestionnaires de contexte asynchrones personnalisés. Ceux-ci sont utiles pour gérer les ressources qui doivent être acquises et libérées de manière asynchrone :

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value <= 0:
            fut = asyncio.get_running_loop().create_future()
            self._waiters.append(fut)
            await fut
        self._value -= 1

    def release(self):
        self._value += 1
        if self._waiters:
            asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

async def worker(semaphore, num):
    async with semaphore:
        print(f"Worker {num} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"Worker {num} released the semaphore")

async def main():
    semaphore = CustomSemaphore(2)
    tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())

Cette classe AsyncResource implémente les méthodes aenter et aexit, lui permettant d'être utilisée avec l'instruction async with.

En conclusion, le système de coroutines de Python fournit une base puissante pour créer des primitives asynchrones personnalisées. En comprenant les mécanismes et protocoles sous-jacents, vous pouvez créer des solutions sur mesure pour des défis asynchrones spécifiques, optimiser les performances dans des scénarios simultanés complexes et étendre les capacités asynchrones de Python. N'oubliez pas que même si ces implémentations personnalisées sont idéales pour l'apprentissage et les cas d'utilisation spécifiques, la bibliothèque asyncio intégrée de Python est hautement optimisée et devrait être votre référence pour la plupart des scénarios. Bon codage !


Nos créations

N'oubliez pas de consulter nos créations :

Centre des investisseurs | Vie intelligente | Époques & Échos | Mystères déroutants | Hindutva | Développeur Élite | Écoles JS


Nous sommes sur Medium

Tech Koala Insights | Epoques & Echos Monde | Support Central des Investisseurs | Mystères déroutants Medium | Sciences & Epoques Medium | Hindutva moderne

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn