Maison >développement back-end >Tutoriel Python >Maîtrisez les coroutines Python : créez des outils asynchrones personnalisés pour de puissantes applications simultanées
Les coroutines en Python sont un outil puissant pour écrire du code asynchrone. Ils ont révolutionné la façon dont nous gérons les opérations simultanées, facilitant ainsi la création d'applications évolutives et efficaces. J'ai passé beaucoup de temps à travailler avec des coroutines et je suis ravi de partager quelques idées sur la création de primitives asynchrones personnalisées.
Commençons par les bases. Les coroutines sont des fonctions spéciales qui peuvent être suspendues et reprises, permettant un multitâche coopératif. Ils constituent le fondement de la syntaxe async/wait de Python. Lorsque vous définissez une coroutine, vous créez essentiellement une fonction qui peut redonner le contrôle à la boucle d'événements, permettant ainsi à d'autres tâches de s'exécuter.
Pour créer un objet waitable personnalisé, vous devez implémenter la méthode await. Cette méthode doit renvoyer un itérateur. Voici un exemple simple :
class CustomAwaitable: def __init__(self, value): self.value = value def __await__(self): yield return self.value async def use_custom_awaitable(): result = await CustomAwaitable(42) print(result) # Output: 42
Cette classe CustomAwaitable peut être utilisée avec le mot-clé wait, tout comme les waitables intégrés. Lorsqu'il est attendu, il cède le contrôle une fois, puis renvoie sa valeur.
Mais que se passe-t-il si nous voulons créer des primitives asynchrones plus complexes ? Voyons implémenter un sémaphore personnalisé. Les sémaphores sont utilisés pour contrôler l'accès à une ressource partagée par plusieurs coroutines :
import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._value <= 0: fut = asyncio.get_running_loop().create_future() self._waiters.append(fut) await fut self._value -= 1 def release(self): self._value += 1 if self._waiters: asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None) async def __aenter__(self): await self.acquire() return self async def __aexit__(self, exc_type, exc, tb): self.release() async def worker(semaphore, num): async with semaphore: print(f"Worker {num} acquired the semaphore") await asyncio.sleep(1) print(f"Worker {num} released the semaphore") async def main(): semaphore = CustomSemaphore(2) tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main())
Cette classe CustomSemaphore implémente les méthodes d'acquisition et de libération, ainsi que le protocole de gestion de contexte asynchrone (aenter et aexit). Il permet à un maximum de deux coroutines d'acquérir le sémaphore simultanément.
Parlons maintenant de la création de boucles d'événements efficaces. Bien que l'asyncio de Python fournisse une implémentation robuste de boucle d'événements, il peut y avoir des cas où vous en aurez besoin d'une personnalisée. Voici un exemple de base de boucle d'événement personnalisée :
import time from collections import deque class CustomEventLoop: def __init__(self): self._ready = deque() self._stopping = False def call_soon(self, callback, *args): self._ready.append((callback, args)) def run_forever(self): while not self._stopping: self._run_once() def _run_once(self): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) def stop(self): self._stopping = True def run_until_complete(self, coro): def _done_callback(fut): self.stop() task = self.create_task(coro) task.add_done_callback(_done_callback) self.run_forever() return task.result() def create_task(self, coro): task = Task(coro, self) self.call_soon(task._step) return task class Task: def __init__(self, coro, loop): self._coro = coro self._loop = loop self._done = False self._result = None self._callbacks = [] def _step(self): try: if self._done: return result = self._coro.send(None) if isinstance(result, SleepHandle): result._task = self self._loop.call_soon(result._wake_up) else: self._loop.call_soon(self._step) except StopIteration as e: self.set_result(e.value) def set_result(self, result): self._result = result self._done = True for callback in self._callbacks: self._loop.call_soon(callback, self) def add_done_callback(self, callback): if self._done: self._loop.call_soon(callback, self) else: self._callbacks.append(callback) def result(self): if not self._done: raise RuntimeError('Task is not done') return self._result class SleepHandle: def __init__(self, duration): self._duration = duration self._task = None self._start_time = time.time() def _wake_up(self): if time.time() - self._start_time >= self._duration: self._task._loop.call_soon(self._task._step) else: self._task._loop.call_soon(self._wake_up) async def sleep(duration): return SleepHandle(duration) async def example(): print("Start") await sleep(1) print("After 1 second") await sleep(2) print("After 2 more seconds") return "Done" loop = CustomEventLoop() result = loop.run_until_complete(example()) print(result)
Cette boucle d'événements personnalisée implémente des fonctionnalités de base telles que l'exécution de tâches, la gestion des coroutines et même une simple fonction de mise en veille. Ce n'est pas aussi riche en fonctionnalités que la boucle d'événements intégrée de Python, mais il démontre les concepts de base.
L'un des défis de l'écriture de code asynchrone est la gestion des priorités des tâches. Bien que l'asyncio de Python ne fournisse pas de files d'attente prioritaires intégrées pour les tâches, nous pouvons implémenter la nôtre :
import asyncio import heapq class PriorityEventLoop(asyncio.AbstractEventLoop): def __init__(self): self._ready = [] self._stopping = False self._clock = 0 def call_at(self, when, callback, *args, context=None): handle = asyncio.Handle(callback, args, self, context) heapq.heappush(self._ready, (when, handle)) return handle def call_later(self, delay, callback, *args, context=None): return self.call_at(self._clock + delay, callback, *args, context=context) def call_soon(self, callback, *args, context=None): return self.call_at(self._clock, callback, *args, context=context) def time(self): return self._clock def stop(self): self._stopping = True def is_running(self): return not self._stopping def run_forever(self): while self._ready and not self._stopping: self._run_once() def _run_once(self): if not self._ready: return when, handle = heapq.heappop(self._ready) self._clock = when handle._run() def create_task(self, coro): return asyncio.Task(coro, loop=self) def run_until_complete(self, future): asyncio.futures._chain_future(future, self.create_future()) self.run_forever() if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result() def create_future(self): return asyncio.Future(loop=self) async def low_priority_task(): print("Low priority task started") await asyncio.sleep(2) print("Low priority task finished") async def high_priority_task(): print("High priority task started") await asyncio.sleep(1) print("High priority task finished") async def main(): loop = asyncio.get_event_loop() loop.call_later(0.1, loop.create_task, low_priority_task()) loop.call_later(0, loop.create_task, high_priority_task()) await asyncio.sleep(3) asyncio.run(main())
Ce PriorityEventLoop utilise une file d'attente de tas pour gérer les tâches en fonction de leur heure d'exécution planifiée. Vous pouvez attribuer des priorités en planifiant des tâches avec des délais différents.
Gérer l'annulation avec élégance est un autre aspect important du travail avec les coroutines. Voici un exemple de la façon de mettre en œuvre des tâches annulables :
import asyncio async def cancellable_operation(): try: print("Operation started") await asyncio.sleep(5) print("Operation completed") except asyncio.CancelledError: print("Operation was cancelled") # Perform any necessary cleanup raise # Re-raise the CancelledError async def main(): task = asyncio.create_task(cancellable_operation()) await asyncio.sleep(2) task.cancel() try: await task except asyncio.CancelledError: print("Main: task was cancelled") asyncio.run(main())
Dans cet exemple, CancelledError intercepte CancelledError, effectue tout nettoyage nécessaire, puis relance l'exception. Cela permet une gestion gracieuse de l'annulation tout en continuant à propager le statut d'annulation.
Explorons l'implémentation d'itérateurs asynchrones personnalisés. Ceux-ci sont utiles pour créer des séquences qui peuvent être itérées de manière asynchrone :
class CustomAwaitable: def __init__(self, value): self.value = value def __await__(self): yield return self.value async def use_custom_awaitable(): result = await CustomAwaitable(42) print(result) # Output: 42
Cette classe AsyncRange implémente le protocole d'itérateur asynchrone, lui permettant d'être utilisée en asynchrone pour les boucles.
Enfin, examinons la mise en œuvre de gestionnaires de contexte asynchrones personnalisés. Ceux-ci sont utiles pour gérer les ressources qui doivent être acquises et libérées de manière asynchrone :
import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._value <= 0: fut = asyncio.get_running_loop().create_future() self._waiters.append(fut) await fut self._value -= 1 def release(self): self._value += 1 if self._waiters: asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None) async def __aenter__(self): await self.acquire() return self async def __aexit__(self, exc_type, exc, tb): self.release() async def worker(semaphore, num): async with semaphore: print(f"Worker {num} acquired the semaphore") await asyncio.sleep(1) print(f"Worker {num} released the semaphore") async def main(): semaphore = CustomSemaphore(2) tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main())
Cette classe AsyncResource implémente les méthodes aenter et aexit, lui permettant d'être utilisée avec l'instruction async with.
En conclusion, le système de coroutines de Python fournit une base puissante pour créer des primitives asynchrones personnalisées. En comprenant les mécanismes et protocoles sous-jacents, vous pouvez créer des solutions sur mesure pour des défis asynchrones spécifiques, optimiser les performances dans des scénarios simultanés complexes et étendre les capacités asynchrones de Python. N'oubliez pas que même si ces implémentations personnalisées sont idéales pour l'apprentissage et les cas d'utilisation spécifiques, la bibliothèque asyncio intégrée de Python est hautement optimisée et devrait être votre référence pour la plupart des scénarios. Bon codage !
N'oubliez pas de consulter nos créations :
Centre des investisseurs | Vie intelligente | Époques & Échos | Mystères déroutants | Hindutva | Développeur Élite | Écoles JS
Tech Koala Insights | Epoques & Echos Monde | Support Central des Investisseurs | Mystères déroutants Medium | Sciences & Epoques Medium | Hindutva moderne
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!