首頁 >後端開發 >Python教學 >掌握 Python 協程:為強大的並發應用程式建立自訂非同步工具

掌握 Python 協程:為強大的並發應用程式建立自訂非同步工具

DDD
DDD原創
2024-11-29 12:18:14995瀏覽

Master Python Coroutines: Create Custom Async Tools for Powerful Concurrent Apps

Python 中的協程是編寫非同步程式碼的強大工具。它們徹底改變了我們處理並發操作的方式,使建立可擴展且高效的應用程式變得更加容易。我花了很多時間使用協程,很高興能分享一些關於創建自訂非同步原語的見解。

讓我們從基礎開始。協程是可以暫停和恢復的特殊函數,允許協作式多工處理。它們是 Python 的 async/await 語法的基礎。當您定義協程時,您實際上正在建立一個函數,該函數可以將控制權交還給事件循環,從而允許其他任務運行。

要建立自訂可等待對象,您需要實作 await 方法。該方法應該傳回一個迭代器。這是一個簡單的例子:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42

這個CustomAwaitable類別可以與await關鍵字一起使用,就像內建的awaitables一樣。當等待時,它會產生一次控制,然後傳回它的值。

但是如果我們想創建更複雜的非同步原語怎麼辦?讓我們看看如何實現自訂信號量。信號量用於控制多個協程對共享資源的存取:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value <= 0:
            fut = asyncio.get_running_loop().create_future()
            self._waiters.append(fut)
            await fut
        self._value -= 1

    def release(self):
        self._value += 1
        if self._waiters:
            asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

async def worker(semaphore, num):
    async with semaphore:
        print(f"Worker {num} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"Worker {num} released the semaphore")

async def main():
    semaphore = CustomSemaphore(2)
    tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())

此 CustomSemaphore 類別實作取得和釋放方法,以及非同步上下文管理器協定(aenteraexit)。它允許最多兩個協程同時獲取信號量。

現在,我們來談談創造高效率的事件循環。雖然 Python 的 asyncio 提供了強大的事件循環實現,但在某些情況下您可能需要自訂一個。這是自訂事件循環的基本範例:

import time
from collections import deque

class CustomEventLoop:
    def __init__(self):
        self._ready = deque()
        self._stopping = False

    def call_soon(self, callback, *args):
        self._ready.append((callback, args))

    def run_forever(self):
        while not self._stopping:
            self._run_once()

    def _run_once(self):
        ntodo = len(self._ready)
        for _ in range(ntodo):
            callback, args = self._ready.popleft()
            callback(*args)

    def stop(self):
        self._stopping = True

    def run_until_complete(self, coro):
        def _done_callback(fut):
            self.stop()

        task = self.create_task(coro)
        task.add_done_callback(_done_callback)
        self.run_forever()
        return task.result()

    def create_task(self, coro):
        task = Task(coro, self)
        self.call_soon(task._step)
        return task

class Task:
    def __init__(self, coro, loop):
        self._coro = coro
        self._loop = loop
        self._done = False
        self._result = None
        self._callbacks = []

    def _step(self):
        try:
            if self._done:
                return
            result = self._coro.send(None)
            if isinstance(result, SleepHandle):
                result._task = self
                self._loop.call_soon(result._wake_up)
            else:
                self._loop.call_soon(self._step)
        except StopIteration as e:
            self.set_result(e.value)

    def set_result(self, result):
        self._result = result
        self._done = True
        for callback in self._callbacks:
            self._loop.call_soon(callback, self)

    def add_done_callback(self, callback):
        if self._done:
            self._loop.call_soon(callback, self)
        else:
            self._callbacks.append(callback)

    def result(self):
        if not self._done:
            raise RuntimeError('Task is not done')
        return self._result

class SleepHandle:
    def __init__(self, duration):
        self._duration = duration
        self._task = None
        self._start_time = time.time()

    def _wake_up(self):
        if time.time() - self._start_time >= self._duration:
            self._task._loop.call_soon(self._task._step)
        else:
            self._task._loop.call_soon(self._wake_up)

async def sleep(duration):
    return SleepHandle(duration)

async def example():
    print("Start")
    await sleep(1)
    print("After 1 second")
    await sleep(2)
    print("After 2 more seconds")
    return "Done"

loop = CustomEventLoop()
result = loop.run_until_complete(example())
print(result)

這個自訂事件循環實現了基本功能,例如運行任務、處理協程,甚至是簡單的睡眠功能。它不像 Python 的內建事件循環那樣功能豐富,但它演示了核心概念。

編寫非同步程式碼的挑戰之一是管理任務優先順序。雖然Python的asyncio沒有為任務提供內建的優先權隊列,但我們可以實現自己的:

import asyncio
import heapq

class PriorityEventLoop(asyncio.AbstractEventLoop):
    def __init__(self):
        self._ready = []
        self._stopping = False
        self._clock = 0

    def call_at(self, when, callback, *args, context=None):
        handle = asyncio.Handle(callback, args, self, context)
        heapq.heappush(self._ready, (when, handle))
        return handle

    def call_later(self, delay, callback, *args, context=None):
        return self.call_at(self._clock + delay, callback, *args, context=context)

    def call_soon(self, callback, *args, context=None):
        return self.call_at(self._clock, callback, *args, context=context)

    def time(self):
        return self._clock

    def stop(self):
        self._stopping = True

    def is_running(self):
        return not self._stopping

    def run_forever(self):
        while self._ready and not self._stopping:
            self._run_once()

    def _run_once(self):
        if not self._ready:
            return
        when, handle = heapq.heappop(self._ready)
        self._clock = when
        handle._run()

    def create_task(self, coro):
        return asyncio.Task(coro, loop=self)

    def run_until_complete(self, future):
        asyncio.futures._chain_future(future, self.create_future())
        self.run_forever()
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')
        return future.result()

    def create_future(self):
        return asyncio.Future(loop=self)

async def low_priority_task():
    print("Low priority task started")
    await asyncio.sleep(2)
    print("Low priority task finished")

async def high_priority_task():
    print("High priority task started")
    await asyncio.sleep(1)
    print("High priority task finished")

async def main():
    loop = asyncio.get_event_loop()
    loop.call_later(0.1, loop.create_task, low_priority_task())
    loop.call_later(0, loop.create_task, high_priority_task())
    await asyncio.sleep(3)

asyncio.run(main())

此 PriorityEventLoop 使用堆疊佇列根據任務的計畫執行時間來管理任務。您可以透過安排具有不同延遲的任務來分配優先順序。

優雅地處理取消是使用協程的另一個重要面向。以下是如何實現可取消任務的範例:

import asyncio

async def cancellable_operation():
    try:
        print("Operation started")
        await asyncio.sleep(5)
        print("Operation completed")
    except asyncio.CancelledError:
        print("Operation was cancelled")
        # Perform any necessary cleanup
        raise  # Re-raise the CancelledError

async def main():
    task = asyncio.create_task(cancellable_operation())
    await asyncio.sleep(2)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Main: task was cancelled")

asyncio.run(main())

在此範例中,cancellable_operation 捕捉 CancelledError,執行任何必要的清理,然後重新引發例外狀況。這允許優雅地處理取消,同時仍然傳播取消狀態。

讓我們來探索實作自訂非同步迭代器。這些對於創建可以非同步迭代的序列非常有用:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42

這個 AsyncRange 類別實作了非同步迭代器協議,允許它在非同步 for 迴圈中使用。

最後,讓我們看看如何實作自訂非同步上下文管理器。這些對於管理需要非同步取得和釋放的資源很有用:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value <= 0:
            fut = asyncio.get_running_loop().create_future()
            self._waiters.append(fut)
            await fut
        self._value -= 1

    def release(self):
        self._value += 1
        if self._waiters:
            asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

async def worker(semaphore, num):
    async with semaphore:
        print(f"Worker {num} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"Worker {num} released the semaphore")

async def main():
    semaphore = CustomSemaphore(2)
    tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())

這個 AsyncResource 類別實作了 aenteraexit 方法,允許它與 async with 語句一起使用。

總之,Python 的協程系統為建立自訂非同步原語提供了強大的基礎。透過了解底層機制和協議,您可以針對特定的非同步挑戰創建量身定制的解決方案,優化複雜並發場景中的效能,並擴展 Python 的非同步功能。請記住,雖然這些自訂實作非常適合學習和特定用例,但 Python 的內建 asyncio 程式庫經過了高度優化,應該成為大多數場景的首選。快樂編碼!


我們的創作

一定要看看我們的創作:

投資者中心 | 智能生活 | 時代與迴響 | 令人費解的謎團 | 印度教 | 精英開發 | JS學校


我們在媒體上

科技無尾熊洞察 | 時代與迴響世界 | 投資人中央媒體 | 令人費解的謎團 | | 令人費解的謎團 | >科學與時代媒介 |

現代印度教

以上是掌握 Python 協程:為強大的並發應用程式建立自訂非同步工具的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn