搜索
首页后端开发Python教程掌握 Python 协程:为强大的并发应用程序创建自定义异步工具

Master Python Coroutines: Create Custom Async Tools for Powerful Concurrent Apps

Python 中的协程是编写异步代码的强大工具。它们彻底改变了我们处理并发操作的方式,使构建可扩展且高效的应用程序变得更加容易。我花了很多时间使用协程,并且很高兴能分享一些关于创建自定义异步原语的见解。

让我们从基础开始。协程是可以暂停和恢复的特殊函数,允许协作式多任务处理。它们是 Python 的 async/await 语法的基础。当您定义协程时,您实际上是在创建一个函数,该函数可以将控制权交还给事件循环,从而允许其他任务运行。

要创建自定义可等待对象,您需要实现 await 方法。该方法应该返回一个迭代器。这是一个简单的例子:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42

这个CustomAwaitable类可以与await关键字一起使用,就像内置的awaitables一样。当等待时,它会产生一次控制,然后返回它的值。

但是如果我们想创建更复杂的异步原语怎么办?让我们看看如何实现自定义信号量。信号量用于控制多个协程对共享资源的访问:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value 



<p>此 CustomSemaphore 类实现获取和释放方法,以及异步上下文管理器协议(<strong>aenter</strong> 和 <strong>aexit</strong>)。它允许最多两个协程同时获取信号量。</p>

<p>现在,我们来谈谈创建高效的事件循环。虽然 Python 的 asyncio 提供了强大的事件循环实现,但在某些情况下您可能需要自定义一个。这是自定义事件循环的基本示例:<br>
</p>

<pre class="brush:php;toolbar:false">import time
from collections import deque

class CustomEventLoop:
    def __init__(self):
        self._ready = deque()
        self._stopping = False

    def call_soon(self, callback, *args):
        self._ready.append((callback, args))

    def run_forever(self):
        while not self._stopping:
            self._run_once()

    def _run_once(self):
        ntodo = len(self._ready)
        for _ in range(ntodo):
            callback, args = self._ready.popleft()
            callback(*args)

    def stop(self):
        self._stopping = True

    def run_until_complete(self, coro):
        def _done_callback(fut):
            self.stop()

        task = self.create_task(coro)
        task.add_done_callback(_done_callback)
        self.run_forever()
        return task.result()

    def create_task(self, coro):
        task = Task(coro, self)
        self.call_soon(task._step)
        return task

class Task:
    def __init__(self, coro, loop):
        self._coro = coro
        self._loop = loop
        self._done = False
        self._result = None
        self._callbacks = []

    def _step(self):
        try:
            if self._done:
                return
            result = self._coro.send(None)
            if isinstance(result, SleepHandle):
                result._task = self
                self._loop.call_soon(result._wake_up)
            else:
                self._loop.call_soon(self._step)
        except StopIteration as e:
            self.set_result(e.value)

    def set_result(self, result):
        self._result = result
        self._done = True
        for callback in self._callbacks:
            self._loop.call_soon(callback, self)

    def add_done_callback(self, callback):
        if self._done:
            self._loop.call_soon(callback, self)
        else:
            self._callbacks.append(callback)

    def result(self):
        if not self._done:
            raise RuntimeError('Task is not done')
        return self._result

class SleepHandle:
    def __init__(self, duration):
        self._duration = duration
        self._task = None
        self._start_time = time.time()

    def _wake_up(self):
        if time.time() - self._start_time >= self._duration:
            self._task._loop.call_soon(self._task._step)
        else:
            self._task._loop.call_soon(self._wake_up)

async def sleep(duration):
    return SleepHandle(duration)

async def example():
    print("Start")
    await sleep(1)
    print("After 1 second")
    await sleep(2)
    print("After 2 more seconds")
    return "Done"

loop = CustomEventLoop()
result = loop.run_until_complete(example())
print(result)

这个自定义事件循环实现了基本功能,例如运行任务、处理协程,甚至是简单的睡眠功能。它不像 Python 的内置事件循环那样功能丰富,但它演示了核心概念。

编写异步代码的挑战之一是管理任务优先级。虽然Python的asyncio没有为任务提供内置的优先级队列,但我们可以实现自己的:

import asyncio
import heapq

class PriorityEventLoop(asyncio.AbstractEventLoop):
    def __init__(self):
        self._ready = []
        self._stopping = False
        self._clock = 0

    def call_at(self, when, callback, *args, context=None):
        handle = asyncio.Handle(callback, args, self, context)
        heapq.heappush(self._ready, (when, handle))
        return handle

    def call_later(self, delay, callback, *args, context=None):
        return self.call_at(self._clock + delay, callback, *args, context=context)

    def call_soon(self, callback, *args, context=None):
        return self.call_at(self._clock, callback, *args, context=context)

    def time(self):
        return self._clock

    def stop(self):
        self._stopping = True

    def is_running(self):
        return not self._stopping

    def run_forever(self):
        while self._ready and not self._stopping:
            self._run_once()

    def _run_once(self):
        if not self._ready:
            return
        when, handle = heapq.heappop(self._ready)
        self._clock = when
        handle._run()

    def create_task(self, coro):
        return asyncio.Task(coro, loop=self)

    def run_until_complete(self, future):
        asyncio.futures._chain_future(future, self.create_future())
        self.run_forever()
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')
        return future.result()

    def create_future(self):
        return asyncio.Future(loop=self)

async def low_priority_task():
    print("Low priority task started")
    await asyncio.sleep(2)
    print("Low priority task finished")

async def high_priority_task():
    print("High priority task started")
    await asyncio.sleep(1)
    print("High priority task finished")

async def main():
    loop = asyncio.get_event_loop()
    loop.call_later(0.1, loop.create_task, low_priority_task())
    loop.call_later(0, loop.create_task, high_priority_task())
    await asyncio.sleep(3)

asyncio.run(main())

此 PriorityEventLoop 使用堆队列根据任务的计划执行时间来管理任务。您可以通过安排具有不同延迟的任务来分配优先级。

优雅地处理取消是使用协程的另一个重要方面。以下是如何实现可取消任务的示例:

import asyncio

async def cancellable_operation():
    try:
        print("Operation started")
        await asyncio.sleep(5)
        print("Operation completed")
    except asyncio.CancelledError:
        print("Operation was cancelled")
        # Perform any necessary cleanup
        raise  # Re-raise the CancelledError

async def main():
    task = asyncio.create_task(cancellable_operation())
    await asyncio.sleep(2)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Main: task was cancelled")

asyncio.run(main())

在此示例中,cancellable_operation 捕获 CancelledError,执行任何必要的清理,然后重新引发异常。这允许优雅地处理取消,同时仍然传播取消状态。

让我们探索实现自定义异步迭代器。这些对于创建可以异步迭代的序列非常有用:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42

这个 AsyncRange 类实现了异步迭代器协议,允许它在异步 for 循环中使用。

最后,让我们看看如何实现自定义异步上下文管理器。这些对于管理需要异步获取和释放的资源很有用:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value 



<p>这个 AsyncResource 类实现了 <strong>aenter</strong> 和 <strong>aexit</strong> 方法,允许它与 async with 语句一起使用。</p>

<p>总之,Python 的协程系统为构建自定义异步原语提供了强大的基础。通过了解底层机制和协议,您可以针对特定的异步挑战创建量身定制的解决方案,优化复杂并发场景中的性能,并扩展 Python 的异步功能。请记住,虽然这些自定义实现非常适合学习和特定用例,但 Python 的内置 asyncio 库经过了高度优化,应该成为大多数场景的首选。快乐编码!</p>


<hr>

<h2>
  
  
  我们的创作
</h2>

<p>一定要看看我们的创作:</p>

<p><strong>投资者中心</strong> | <strong>智能生活</strong> | <strong>时代与回响</strong> | <strong>令人费解的谜团</strong> | <strong>印度教</strong> | <strong>精英开发</strong> | <strong>JS学校</strong></p>


<hr>

<h3>
  
  
  我们在媒体上
</h3>

<p><strong>科技考拉洞察</strong> | <strong>时代与回响世界</strong> | <strong>投资者中央媒体</strong> | <strong>令人费解的谜团</strong> | <strong>科学与时代媒介</strong> | <strong>现代印度教</strong></p>


          

            
        

以上是掌握 Python 协程:为强大的并发应用程序创建自定义异步工具的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
Python中的合并列表:选择正确的方法Python中的合并列表:选择正确的方法May 14, 2025 am 12:11 AM

Tomergelistsinpython,YouCanusethe操作员,estextMethod,ListComprehension,Oritertools

如何在Python 3中加入两个列表?如何在Python 3中加入两个列表?May 14, 2025 am 12:09 AM

在Python3中,可以通过多种方法连接两个列表:1)使用 运算符,适用于小列表,但对大列表效率低;2)使用extend方法,适用于大列表,内存效率高,但会修改原列表;3)使用*运算符,适用于合并多个列表,不修改原列表;4)使用itertools.chain,适用于大数据集,内存效率高。

Python串联列表字符串Python串联列表字符串May 14, 2025 am 12:08 AM

使用join()方法是Python中从列表连接字符串最有效的方法。1)使用join()方法高效且易读。2)循环使用 运算符对大列表效率低。3)列表推导式与join()结合适用于需要转换的场景。4)reduce()方法适用于其他类型归约,但对字符串连接效率低。完整句子结束。

Python执行,那是什么?Python执行,那是什么?May 14, 2025 am 12:06 AM

pythonexecutionistheprocessoftransformingpypythoncodeintoExecutablestructions.1)InternterPreterReadSthecode,ConvertingTingitIntObyTecode,whepythonvirtualmachine(pvm)theglobalinterpreterpreterpreterpreterlock(gil)the thepythonvirtualmachine(pvm)

Python:关键功能是什么Python:关键功能是什么May 14, 2025 am 12:02 AM

Python的关键特性包括:1.语法简洁易懂,适合初学者;2.动态类型系统,提高开发速度;3.丰富的标准库,支持多种任务;4.强大的社区和生态系统,提供广泛支持;5.解释性,适合脚本和快速原型开发;6.多范式支持,适用于各种编程风格。

Python:编译器还是解释器?Python:编译器还是解释器?May 13, 2025 am 12:10 AM

Python是解释型语言,但也包含编译过程。1)Python代码先编译成字节码。2)字节码由Python虚拟机解释执行。3)这种混合机制使Python既灵活又高效,但执行速度不如完全编译型语言。

python用于循环与循环时:何时使用哪个?python用于循环与循环时:何时使用哪个?May 13, 2025 am 12:07 AM

useeAforloopWheniteratingOveraseQuenceOrforAspecificnumberoftimes; useAwhiLeLoopWhenconTinuingUntilAcIntiment.ForloopSareIdeAlforkNownsences,而WhileLeleLeleLeleLoopSituationSituationSituationsItuationSuationSituationswithUndEtermentersitations。

Python循环:最常见的错误Python循环:最常见的错误May 13, 2025 am 12:07 AM

pythonloopscanleadtoerrorslikeinfiniteloops,modifyingListsDuringteritation,逐个偏置,零indexingissues,andnestedloopineflinefficiencies

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

安全考试浏览器

安全考试浏览器

Safe Exam Browser是一个安全的浏览器环境,用于安全地进行在线考试。该软件将任何计算机变成一个安全的工作站。它控制对任何实用工具的访问,并防止学生使用未经授权的资源。

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

功能强大的PHP集成开发环境

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

DVWA

DVWA

Damn Vulnerable Web App (DVWA) 是一个PHP/MySQL的Web应用程序,非常容易受到攻击。它的主要目标是成为安全专业人员在合法环境中测试自己的技能和工具的辅助工具,帮助Web开发人员更好地理解保护Web应用程序的过程,并帮助教师/学生在课堂环境中教授/学习Web应用程序安全。DVWA的目标是通过简单直接的界面练习一些最常见的Web漏洞,难度各不相同。请注意,该软件中