Heim >Backend-Entwicklung >Python-Tutorial >Beherrschen Sie Python-Coroutinen: Erstellen Sie benutzerdefinierte asynchrone Tools für leistungsstarke gleichzeitige Apps

Beherrschen Sie Python-Coroutinen: Erstellen Sie benutzerdefinierte asynchrone Tools für leistungsstarke gleichzeitige Apps

DDD
DDDOriginal
2024-11-29 12:18:141063Durchsuche

Master Python Coroutines: Create Custom Async Tools for Powerful Concurrent Apps

Koroutinen in Python sind ein leistungsstarkes Werkzeug zum Schreiben von asynchronem Code. Sie haben die Art und Weise, wie wir gleichzeitige Vorgänge handhaben, revolutioniert und es einfacher gemacht, skalierbare und effiziente Anwendungen zu erstellen. Ich habe viel Zeit mit der Arbeit mit Coroutinen verbracht und freue mich, einige Einblicke in die Erstellung benutzerdefinierter asynchroner Grundelemente zu geben.

Beginnen wir mit den Grundlagen. Coroutinen sind spezielle Funktionen, die angehalten und fortgesetzt werden können und so kooperatives Multitasking ermöglichen. Sie bilden die Grundlage der Async/Await-Syntax von Python. Wenn Sie eine Coroutine definieren, erstellen Sie im Wesentlichen eine Funktion, die die Kontrolle an die Ereignisschleife zurückgeben kann, sodass andere Aufgaben ausgeführt werden können.

Um ein benutzerdefiniertes, erwartbares Objekt zu erstellen, müssen Sie die Methode await implementieren. Diese Methode sollte einen Iterator zurückgeben. Hier ist ein einfaches Beispiel:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42

Diese CustomAwaitable-Klasse kann mit dem Schlüsselwort „await“ verwendet werden, genau wie integrierte „awaitables“. Wenn darauf gewartet wird, gibt es einmal die Kontrolle ab und gibt dann seinen Wert zurück.

Aber was ist, wenn wir komplexere asynchrone Grundelemente erstellen möchten? Schauen wir uns die Implementierung eines benutzerdefinierten Semaphors an. Semaphoren werden verwendet, um den Zugriff mehrerer Coroutinen auf eine gemeinsam genutzte Ressource zu steuern:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value <= 0:
            fut = asyncio.get_running_loop().create_future()
            self._waiters.append(fut)
            await fut
        self._value -= 1

    def release(self):
        self._value += 1
        if self._waiters:
            asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

async def worker(semaphore, num):
    async with semaphore:
        print(f"Worker {num} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"Worker {num} released the semaphore")

async def main():
    semaphore = CustomSemaphore(2)
    tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())

Diese CustomSemaphore-Klasse implementiert die Erfassungs- und Freigabemethoden sowie das asynchrone Kontextmanagerprotokoll (aenter und aexit). Es ermöglicht maximal zwei Coroutinen, das Semaphor gleichzeitig zu erfassen.

Lassen Sie uns nun über die Erstellung effizienter Ereignisschleifen sprechen. Während Pythons Asyncio eine robuste Implementierung einer Ereignisschleife bietet, kann es Fälle geben, in denen Sie eine benutzerdefinierte Implementierung benötigen. Hier ist ein einfaches Beispiel einer benutzerdefinierten Ereignisschleife:

import time
from collections import deque

class CustomEventLoop:
    def __init__(self):
        self._ready = deque()
        self._stopping = False

    def call_soon(self, callback, *args):
        self._ready.append((callback, args))

    def run_forever(self):
        while not self._stopping:
            self._run_once()

    def _run_once(self):
        ntodo = len(self._ready)
        for _ in range(ntodo):
            callback, args = self._ready.popleft()
            callback(*args)

    def stop(self):
        self._stopping = True

    def run_until_complete(self, coro):
        def _done_callback(fut):
            self.stop()

        task = self.create_task(coro)
        task.add_done_callback(_done_callback)
        self.run_forever()
        return task.result()

    def create_task(self, coro):
        task = Task(coro, self)
        self.call_soon(task._step)
        return task

class Task:
    def __init__(self, coro, loop):
        self._coro = coro
        self._loop = loop
        self._done = False
        self._result = None
        self._callbacks = []

    def _step(self):
        try:
            if self._done:
                return
            result = self._coro.send(None)
            if isinstance(result, SleepHandle):
                result._task = self
                self._loop.call_soon(result._wake_up)
            else:
                self._loop.call_soon(self._step)
        except StopIteration as e:
            self.set_result(e.value)

    def set_result(self, result):
        self._result = result
        self._done = True
        for callback in self._callbacks:
            self._loop.call_soon(callback, self)

    def add_done_callback(self, callback):
        if self._done:
            self._loop.call_soon(callback, self)
        else:
            self._callbacks.append(callback)

    def result(self):
        if not self._done:
            raise RuntimeError('Task is not done')
        return self._result

class SleepHandle:
    def __init__(self, duration):
        self._duration = duration
        self._task = None
        self._start_time = time.time()

    def _wake_up(self):
        if time.time() - self._start_time >= self._duration:
            self._task._loop.call_soon(self._task._step)
        else:
            self._task._loop.call_soon(self._wake_up)

async def sleep(duration):
    return SleepHandle(duration)

async def example():
    print("Start")
    await sleep(1)
    print("After 1 second")
    await sleep(2)
    print("After 2 more seconds")
    return "Done"

loop = CustomEventLoop()
result = loop.run_until_complete(example())
print(result)

Diese benutzerdefinierte Ereignisschleife implementiert grundlegende Funktionen wie das Ausführen von Aufgaben, die Handhabung von Coroutinen und sogar eine einfache Schlaffunktion. Es ist nicht so funktionsreich wie die integrierte Ereignisschleife von Python, demonstriert aber die Kernkonzepte.

Eine der Herausforderungen beim Schreiben von asynchronem Code ist die Verwaltung von Aufgabenprioritäten. Während Pythons Asyncio keine integrierten Prioritätswarteschlangen für Aufgaben bereitstellt, können wir unsere eigenen implementieren:

import asyncio
import heapq

class PriorityEventLoop(asyncio.AbstractEventLoop):
    def __init__(self):
        self._ready = []
        self._stopping = False
        self._clock = 0

    def call_at(self, when, callback, *args, context=None):
        handle = asyncio.Handle(callback, args, self, context)
        heapq.heappush(self._ready, (when, handle))
        return handle

    def call_later(self, delay, callback, *args, context=None):
        return self.call_at(self._clock + delay, callback, *args, context=context)

    def call_soon(self, callback, *args, context=None):
        return self.call_at(self._clock, callback, *args, context=context)

    def time(self):
        return self._clock

    def stop(self):
        self._stopping = True

    def is_running(self):
        return not self._stopping

    def run_forever(self):
        while self._ready and not self._stopping:
            self._run_once()

    def _run_once(self):
        if not self._ready:
            return
        when, handle = heapq.heappop(self._ready)
        self._clock = when
        handle._run()

    def create_task(self, coro):
        return asyncio.Task(coro, loop=self)

    def run_until_complete(self, future):
        asyncio.futures._chain_future(future, self.create_future())
        self.run_forever()
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')
        return future.result()

    def create_future(self):
        return asyncio.Future(loop=self)

async def low_priority_task():
    print("Low priority task started")
    await asyncio.sleep(2)
    print("Low priority task finished")

async def high_priority_task():
    print("High priority task started")
    await asyncio.sleep(1)
    print("High priority task finished")

async def main():
    loop = asyncio.get_event_loop()
    loop.call_later(0.1, loop.create_task, low_priority_task())
    loop.call_later(0, loop.create_task, high_priority_task())
    await asyncio.sleep(3)

asyncio.run(main())

Dieser PriorityEventLoop verwendet eine Heap-Warteschlange, um Aufgaben basierend auf ihrer geplanten Ausführungszeit zu verwalten. Sie können Prioritäten zuweisen, indem Sie Aufgaben mit unterschiedlichen Verzögerungen planen.

Der ordnungsgemäße Umgang mit Stornierungen ist ein weiterer wichtiger Aspekt bei der Arbeit mit Coroutinen. Hier ein Beispiel für die Umsetzung stornierbarer Aufgaben:

import asyncio

async def cancellable_operation():
    try:
        print("Operation started")
        await asyncio.sleep(5)
        print("Operation completed")
    except asyncio.CancelledError:
        print("Operation was cancelled")
        # Perform any necessary cleanup
        raise  # Re-raise the CancelledError

async def main():
    task = asyncio.create_task(cancellable_operation())
    await asyncio.sleep(2)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Main: task was cancelled")

asyncio.run(main())

In diesem Beispiel fängt cancellable_operation den CancelledError ab, führt alle erforderlichen Bereinigungen durch und löst dann die Ausnahme erneut aus. Dies ermöglicht eine reibungslose Abwicklung der Stornierung, während gleichzeitig der Stornierungsstatus weitergegeben wird.

Sehen wir uns die Implementierung benutzerdefinierter asynchroner Iteratoren an. Diese sind nützlich zum Erstellen von Sequenzen, die asynchron iteriert werden können:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42

Diese AsyncRange-Klasse implementiert das asynchrone Iteratorprotokoll und ermöglicht so die Verwendung in asynchronen for-Schleifen.

Schließlich schauen wir uns die Implementierung benutzerdefinierter asynchroner Kontextmanager an. Diese sind nützlich für die Verwaltung von Ressourcen, die asynchron erworben und freigegeben werden müssen:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value <= 0:
            fut = asyncio.get_running_loop().create_future()
            self._waiters.append(fut)
            await fut
        self._value -= 1

    def release(self):
        self._value += 1
        if self._waiters:
            asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

async def worker(semaphore, num):
    async with semaphore:
        print(f"Worker {num} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"Worker {num} released the semaphore")

async def main():
    semaphore = CustomSemaphore(2)
    tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())

Diese AsyncResource-Klasse implementiert die Methoden aenter und aexit und ermöglicht so die Verwendung mit der async with-Anweisung.

Zusammenfassend lässt sich sagen, dass das Coroutine-System von Python eine leistungsstarke Grundlage für die Erstellung benutzerdefinierter asynchroner Grundelemente bietet. Durch das Verständnis der zugrunde liegenden Mechanismen und Protokolle können Sie maßgeschneiderte Lösungen für bestimmte asynchrone Herausforderungen erstellen, die Leistung in komplexen gleichzeitigen Szenarien optimieren und die asynchronen Funktionen von Python erweitern. Denken Sie daran, dass sich diese benutzerdefinierten Implementierungen zwar hervorragend zum Lernen und für bestimmte Anwendungsfälle eignen, die integrierte Asyncio-Bibliothek von Python jedoch hochgradig optimiert ist und für die meisten Szenarien Ihre erste Wahl sein sollte. Viel Spaß beim Codieren!


Unsere Kreationen

Schauen Sie sich unbedingt unsere Kreationen an:

Investor Central | Intelligentes Leben | Epochen & Echos | Rätselhafte Geheimnisse | Hindutva | Elite-Entwickler | JS-Schulen


Wir sind auf Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Wissenschaft & Epochen Medium | Modernes Hindutva

Das obige ist der detaillierte Inhalt vonBeherrschen Sie Python-Coroutinen: Erstellen Sie benutzerdefinierte asynchrone Tools für leistungsstarke gleichzeitige Apps. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn