>백엔드 개발 >파이썬 튜토리얼 >Python Asyncio 라이브러리에서 일반적으로 사용되는 동기화 기본 기능은 무엇입니까?

Python Asyncio 라이브러리에서 일반적으로 사용되는 동기화 기본 기능은 무엇입니까?

WBOY
WBOY앞으로
2023-05-09 10:35:041393검색

머리말

Asyncio의 동기화 프리미티브는 리소스 경쟁 코드 작성을 단순화하고 리소스 경쟁으로 인한 버그 발생을 방지할 수 있습니다. 하지만 코루틴의 특성상 대부분의 비즈니스 코드에서는 리소스 경쟁을 고려할 필요가 없으므로 Asyncio 동기화 프리미티브는 덜 자주 사용됩니다. >Asyncio 기반 프레임워크를 작성하려면 동기화 기본 요소의 사용을 배워야 합니다. Asyncio的同步原语可以简化我们编写资源竞争的代码和规避资源竞争导致的Bug的出现。 但是由于协程的特性,在大部分业务代码中并不需要去考虑资源竞争的出现,导致Asyncio同步原语被使用的频率比较低,但是如果想基于Asyncio编写框架则需要学习同步原语的使用。

0.基础

同步原语都是适用于某些条件下对某个资源的争夺,在代码中大部分的资源都是属于一个代码块,而Python对于代码块的管理的最佳实践是使用with语法,with语法实际上是调用了一个类中的__enter____exit__方法,比如下面的代码:

class Demo(object):
    def __enter__(self):
        return 
    def __exit__(self, exc_type, exc_val, exc_tb):
        return 
with Demo():
    pass

代码中的Demo类实现了__enter____exit__方法后,就可以被with语法调用,其中__enter__方法是进入代码块执行的逻辑,__enxi__方法是用于退出代码块(包括异常退出)的逻辑。这两个方法符合同步原语中对资源的争夺和释放,但是__enter____exit__两个方法都是不支持await调用的,为了解决这个问题,Python引入了async with语法。

async with语法和with语法类似 ,我们只要编写一个拥有__aenter____aexit__方法的类,那么这个类就支持asyncio with语法了,如下:

import asyncio
class Demo(object):
    async def __aenter__(self):
        return
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        return
async def main():
    async with Demo():
        pass
asyncio.run(main())

其中,类中的__aenter__方法是进入代码块时执行的方法,__aexit__是退出代码块时执行的方法。

有了async with语法的加持,asyncio的同步原语使用起来会比较方便,所以asyncio中对资源争夺的同步原语都会继承于_ContextManagerMixin类:

class _ContextManagerMixin:
    async def __aenter__(self):
        await self.acquire()
        # We have no use for the "as ..."  clause in the with
        # statement for locks.
        return None
    async def __aexit__(self, exc_type, exc, tb):
        self.release()

并实现了acquirerelease方法,供__aenter____aexit__方法调用,同时我们在使用同步原语的时候尽量用到async with语法防止忘记释放资源的占用。

1.Lock

由于协程的特性,在编写协程代码时基本上可以不考虑到锁的情况,但在一些情况下我们还是需要用到锁,并通过锁来维护并发时的数据安全性,如下例子:

import asyncio
share_data = {}
async def sub(i):
    # 赋上相同的key和value
    share_data[i] = i
    await asyncio.sleep(0)
    print(i, share_data[i] == i)
async def sub_add(i):
    # 赋上的value值是原来的+1
    share_data[i] = i + 1
    await asyncio.sleep(0)
    print(i, share_data[i] == i + 1)
async def main():
    # 创建并发任务
    task_list = []
    for i in range(10):
        task_list.append(sub(i))
        task_list.append(sub_add(i))
    # 并发执行
    await asyncio.gather(*task_list)
if __name__ == "__main__":
    asyncio.run(main())

在这个例子中程序会并发的执行subsub_add函数,他们是由不同的asyncio.Task驱动的,这意味着会出现这样一个场景。 当负责执行sub(1)函数的asyncio.Task在执行完share_data[i]=i后就执行await asyncio.sleep(0)从而主动让出控制权并交还给事件循环,等待事件循环的下一次调度。 不过事件循环不会空下来,而是马上安排下一个asyncio.Task执行,此时会先执行到sub_add(1)函数的share_data[i] = i + 1,并同样的在执行到await asyncio.sleep(0)的时候把控制权交会给事件循环。 这时候控制权会由事件循环转移给原先执行sub(1)函数的asyncio.Task,获取到控制权l后sub(1)函数的逻辑会继续走,但由于share_data[i]的数据已经被share_data[i] = i + 1修改了,导致最后执行print时,share_data[i]的数据已经变为脏数据,而不是原本想要的数据了。

为了解决这个问题,我们可以使用asyncio.Lock来解决资源的冲突,如下:

import asyncio
share_data = {}
# 存放对应资源的锁
lock_dict = {}
async def sub(i):
    async with lock_dict[i]:  # <-- 通过async with语句来控制锁的粒度
        share_data[i] = i
        await asyncio.sleep(0)
        print(i, share_data[i] == i)
async def sub_add(i):
    async with lock_dict[i]:
        share_data[i] = i + 1
        await asyncio.sleep(0)
        print(i, share_data[i] == i + 1)
async def main():
    task_list = []
    for i in range(10):
        lock_dict[i] = asyncio.Lock()
        task_list.append(sub(i))
        task_list.append(sub_add(i))
    await asyncio.gather(*task_list)
if __name__ == "__main__":
    asyncio.run(main())

从例子可以看到asyncio.Lock的使用方法跟多线程的Lock差不多,通过async with语法来获取和释放锁,它的原理也很简单,主要做了如下几件事:

  • 1.确保某一协程获取锁后的执行期间,别的协程在获取锁时需要一直等待,直到执行完成并释放锁。

  • 2.当有协程持有锁的时候,其他协程必须等待,直到持有锁的协程释放了锁。

  • 2.确保所有协程能够按照获取的顺序获取到锁。

这意味着需要有一个数据结构来维护当前持有锁的协程的和下一个获取锁协程的关系,同时也需要一个队列来维护多个获取锁的协程的唤醒顺序。

asyncio.Lock跟其它asyncio功能的用法一样,使用asyncio.Future来同步协程之间锁的状态,使用deque

0.Basic

동기화 프리미티브는 특정 조건에서 특정 리소스에 대한 경쟁에 적용 가능합니다. 코드에 포함된 리소스의 대부분은 코드 블록에 속하며 Python은 해당 코드에 적합하지 않습니다. 블록 관리에 대한 모범 사례는 with 구문을 사용하는 것입니다. with 구문은 실제로 클래스에서 __enter____exit__를 호출합니다. code> 메서드(예: 다음 코드): 🎜<pre class="brush:py;">class Lockl(_ContextManagerMixin, mixins._LoopBoundMixin): def __init__(self): self._waiters = None self._locked = False def locked(self): return self._locked async def acquire(self): if (not self._locked and (self._waiters is None or all(w.cancelled() for w in self._waiters))): # 目前没有其他协程持有锁,当前协程可以运行 self._locked = True return True if self._waiters is None: self._waiters = collections.deque() # 创建属于自己的容器,并推送到`_waiters`这个双端队列中 fut = self._get_loop().create_future() self._waiters.append(fut) try: try: await fut finally: # 如果执行完毕,需要把自己移除,防止被`wake_up_first`调用 self._waiters.remove(fut) except exceptions.CancelledError: # 如果是等待的过程中被取消了,需要唤醒下一个调用`acquire` if not self._locked: self._wake_up_first() raise # 持有锁 self._locked = True return True def release(self): if self._locked: # 释放锁 self._locked = False self._wake_up_first() else: raise RuntimeError(&amp;#39;Lock is not acquired.&amp;#39;) def _wake_up_first(self): if not self._waiters: return # 获取还处于锁状态协程对应的容器 try: # 获取下一个等待获取锁的waiter fut = next(iter(self._waiters)) except StopIteration: return # 设置容器为True,这样对应协程就可以继续运行了。 if not fut.done(): fut.set_result(True)</pre>🎜코드의 <code>Demo 클래스가 __enter____exit__ 메서드를 구현한 후 다음을 수행할 수 있습니다. with 구문에 의해 호출되는 __enter__ 메서드는 코드 블록 실행을 시작하는 논리이고 __enxi__ 메서드는 코드를 종료하는 데 사용됩니다. 차단(비정상 종료 포함) 논리. 이 두 메서드는 동기화 프리미티브의 리소스 경합 및 해제를 따르지만 __enter____exit__ 메서드는 순서대로 await 호출을 지원하지 않습니다. 이 문제를 해결하기 위해 Python에서는 async with 구문을 도입했습니다. 🎜🎜async with 구문은 with 구문과 유사합니다. __aenter____aexit__만 사용하여 클래스를 작성하면 됩니다. 그러면 이 클래스는 다음과 같은 asyncio with 구문을 지원합니다. 🎜
import asyncio
class TimeoutLock(asyncio.Lock):
    def __init__(self, timeout, *, loop=None):
        self.timeout = timeout
        super().__init__(loop=loop)
    async def acquire(self) -> bool:
        return await asyncio.wait_for(super().acquire(), self.timeout)
🎜 그 중 클래스의 __aenter__ 메서드는 코드 블록에 들어갈 때 실행되는 메서드인데, __aexit__는 코드 블록을 종료할 때 실행되는 메서드입니다. 🎜🎜 <code>async with 구문의 축복으로 asyncio의 동기화 프리미티브를 사용하는 것이 더 편리하므로 asyncio에서 리소스 경합의 동기화가 가능합니다. > 프리미티브는 _ContextManagerMixin 클래스인 🎜
class TimeoutLock(asyncio.Lock):
    def __init__(self, timeout, *, loop=None):
        self.timeout = timeout
        super().__init__(loop=loop)
    async def acquire(self) -> bool:
        try:
            return await asyncio.wait_for(super().acquire(), self.timeout)
        except Exception:
            self._wake_up_first()
            raise
🎜에서 상속하고 __aenter__에 대한 acquirerelease 메서드를 구현합니다. __aexit__ 메서드 호출 및 동기화 프리미티브를 사용할 때 리소스 점유 해제를 잊지 않도록 async with 구문을 사용하려고 합니다. 🎜🎜1.Lock🎜🎜코루틴의 특성상 코루틴 코드 작성 시 기본적으로 잠금을 고려할 수 없지만, 경우에 따라서는 잠금을 사용해야 하며, 동시성 데이터 보안을 유지하기 위해 잠금을 사용해야 하는 경우는 다음 예시와 같습니다. : 🎜
import asyncio
async def sub(event: asyncio.Event) -> None:
    await event.wait()
    print("I&#39;m Done")
async def main() -> None:
    event = asyncio.Event()
    for _ in range(10):
        asyncio.create_task(sub(event))
    await asyncio.sleep(1)
    event.set()
asyncio.run(main())
🎜이 예에서 프로그램은 subsub_add 함수를 동시에 실행하며 서로 다른 asyncio.Task code> 기반으로 구성됩니다. , 이는 그러한 시나리오가 발생함을 의미합니다. <code>sub(1) 함수 실행을 담당하는 asyncio.Taskshare_data[i]=i 실행 후 await asyncio를 실행하는 경우. sleep(0)따라서 제어를 적극적으로 포기하고 이벤트 루프로 반환하여 이벤트 루프의 다음 전달을 기다립니다. 그러나 이벤트 루프는 비어 있지 않고 다음 asyncio.Task 실행을 즉시 정렬합니다. 이때 sub_add(1)share_data[는 다음과 같습니다. /code> 함수가 먼저 실행됩니다. i] = i + 1, 마찬가지로 await asyncio.sleep(0)을 실행할 때 이벤트 루프로 제어를 전달합니다. 이때 제어권은 이벤트 루프에서 원래 sub(1) 함수를 실행했던 asyncio.Task로 전달됩니다. 제어권을 얻은 후, sub( 1)함수의 논리는 계속되지만 <code>share_data[i]의 데이터가 share_data[i] = i + 1에 의해 수정되었기 때문에, 인쇄할 때의 마지막 실행으로 share_data[i]의 데이터가 원래 의도했던 데이터가 아닌 더티 데이터가 되었습니다. 🎜🎜이 문제를 해결하려면 다음과 같이 asyncio.Lock을 사용하여 리소스 충돌을 해결할 수 있습니다. 🎜
class Event(mixins._LoopBoundMixin):
    def __init__(self):
        self._waiters = collections.deque()
        self._value = False
    def is_set(self):
        return self._value
    def set(self):
        if not self._value:
            # 确保每次只能set一次
            self._value = True
            # 设置每个协程存放的容器为True,这样对应的协程就可以运行了
            for fut in self._waiters:
                if not fut.done():
                    fut.set_result(True)
    def clear(self):
        # 清理上一次的set
        self._value = False
    async def wait(self):
        if self._value:
            # 如果设置了,就不需要等待了
            return True
        # 否则需要创建一个容器,并需要等待容器完成
        fut = self._get_loop().create_future()
        self._waiters.append(fut)
        try:
            await fut
            return True
        finally:
            self._waiters.remove(fut)
🎜예제에서 asyncio.Lock 다중 스레드 <code>Lockasync with 구문을 사용하여 잠금을 획득하고 해제하는 원리도 매우 간단합니다. 🎜
  • 🎜1. 코루틴이 잠금을 획득한 후 실행 기간 동안 다른 코루틴은 실행이 완료되고 잠금이 해제될 때까지 기다려야 합니다. 🎜
  • 🎜2. 코루틴이 잠금을 보유하면 다른 코루틴은 잠금을 보유하는 코루틴이 잠금을 해제할 때까지 기다려야 합니다. 🎜
  • 🎜2. 모든 코루틴이 획득된 순서대로 잠금을 획득할 수 있는지 확인하세요. 🎜
🎜즉, 현재 잠금을 보유하고 있는 코루틴과 잠금을 획득하는 다음 코루틴 간의 관계를 유지하려면 데이터 구조가 필요하고, 다중 잠금을 유지하려면 큐도 필요하다는 뜻입니다. 코루틴 획득. 🎜🎜asyncio.Lock은 다른 asyncio 함수와 동일하게 사용됩니다. 코루틴 간 잠금 상태를 동기화하려면 asyncio.Future를 사용하세요. >deque는 코루틴 간의 wake-up 순서를 유지합니다. 소스 코드는 다음과 같습니다. 🎜
import asyncio
class SetEvent(asyncio.Event):
    def __init__(self, *, loop=None):
        self._set = set()
        super().__init__(loop=loop)
    def add(self, value):
        self._set.add(value)
        self.clear()
    def remove(self, value):
        self._set.remove(value)
        if not self._set:
            self.set()
🎜 소스 코드를 보면 잠금이 주로 획득 및 해제 기능을 제공한다는 것을 알 수 있습니다. 자물쇠를 획득한 것으로 유명함: 🎜
  • 1:当有协程想要获取锁时会先判断锁是否被持有,如果当前锁没有被持有就直接返回,使协程能够正常运行。

  • 2:如果协程获取锁时,锁发现自己已经被其他协程持有则创建一个属于当前协程的asyncio.Future,用来同步状态,并添加到deque中。

而对于释放锁就比较简单,只要获取deque中的第一个asyncio.Future,并通过fut.set_result(True)进行标记,使asyncio.Futurepeding状态变为done状态,这样一来,持有该asyncio.Future的协程就能继续运行,从而持有锁。

不过需要注意源码中acquire方法中对CancelledError异常进行捕获,再唤醒下一个锁,这是为了解决acquire方法执行异常导致锁一直被卡住的场景,通常情况下这能解决大部分的问题,但是如果遇到错误的封装时,我们需要亲自处理异常,并执行锁的唤醒。比如在通过继承asyncio.Lock编写一个超时锁时,最简单的实现代码如下:

import asyncio
class TimeoutLock(asyncio.Lock):
    def __init__(self, timeout, *, loop=None):
        self.timeout = timeout
        super().__init__(loop=loop)
    async def acquire(self) -> bool:
        return await asyncio.wait_for(super().acquire(), self.timeout)

这份代码非常简单,他只需要在__init__方法传入timeout参数,并在acuiqre方法中通过wait_for来实现锁超时即可,现在假设wait_for方法是一个无法传递协程cancel的方法,且编写的acquire没有进行捕获异常再释放锁的操作,当异常发生的时候会导致锁一直被卡住。 为了解决这个问题,只需要对TimeoutLockacquire方法添加异常捕获,并在捕获到异常时释放锁即可,代码如下:

class TimeoutLock(asyncio.Lock):
    def __init__(self, timeout, *, loop=None):
        self.timeout = timeout
        super().__init__(loop=loop)
    async def acquire(self) -> bool:
        try:
            return await asyncio.wait_for(super().acquire(), self.timeout)
        except Exception:
            self._wake_up_first()
            raise

2.Event

asyncio.Event也是一个简单的同步原语,但它跟asyncio.Lock不一样,asyncio.Lock是确保每个资源只能被一个协程操作,而asyncio.Event是确保某个资源何时可以被协程操作,可以认为asyncio.Lock锁的是资源,asyncio.Event锁的是协程,所以asyncio.Event并不需要acquire来锁资源,release释放资源,所以也用不到async with语法。

asyncio.Event的简单使用示例如下:

import asyncio
async def sub(event: asyncio.Event) -> None:
    await event.wait()
    print("I&#39;m Done")
async def main() -> None:
    event = asyncio.Event()
    for _ in range(10):
        asyncio.create_task(sub(event))
    await asyncio.sleep(1)
    event.set()
asyncio.run(main())

在这个例子中会先创建10个asyncio.Task来执行sub函数,但是所有sub函数都会在event.wait处等待,直到main函数中调用event.set后,所有的sub函数的event.wait会放行,使sub函数能继续执行。

可以看到asyncio.Event功能比较简单,它的源码实现也很简单,源码如下:

class Event(mixins._LoopBoundMixin):
    def __init__(self):
        self._waiters = collections.deque()
        self._value = False
    def is_set(self):
        return self._value
    def set(self):
        if not self._value:
            # 确保每次只能set一次
            self._value = True
            # 设置每个协程存放的容器为True,这样对应的协程就可以运行了
            for fut in self._waiters:
                if not fut.done():
                    fut.set_result(True)
    def clear(self):
        # 清理上一次的set
        self._value = False
    async def wait(self):
        if self._value:
            # 如果设置了,就不需要等待了
            return True
        # 否则需要创建一个容器,并需要等待容器完成
        fut = self._get_loop().create_future()
        self._waiters.append(fut)
        try:
            await fut
            return True
        finally:
            self._waiters.remove(fut)

通过源码可以看到wait方法主要是创建了一个asyncio.Future,并把它加入到deque队列后就一直等待着,而set方法被调用时会遍历整个deque队列,并把处于peding状态的asyncio.Future设置为done,这时其他在调用event.wait方法的协程就会得到放行。

通过源码也可以看出,asyncio.Event并没有继承于_ContextManagerMixin,这是因为它锁的是协程,而不是资源。

asyncio.Event的使用频率比asyncio.Lock多许多,不过通常都会让asyncio.Event和其他数据结构进行封装再使用,比如实现一个服务器的优雅关闭功能,这个功能会确保服务器在等待n秒后或者所有连接都关闭后才关闭服务器,这个功能就可以使用setasyncio.Event结合,如下:

import asyncio
class SetEvent(asyncio.Event):
    def __init__(self, *, loop=None):
        self._set = set()
        super().__init__(loop=loop)
    def add(self, value):
        self._set.add(value)
        self.clear()
    def remove(self, value):
        self._set.remove(value)
        if not self._set:
            self.set()

这个SetEvent结合了setSetEvent的功能,当set有数据的时候,会通过clear方法使SetEvent变为等待状态,而set没数据的时候,会通过set方法使SetEvent变为无需等待的状态,所有调用wait的协程都可以放行,通过这种结合,SetEvent拥有了等待资源为空的功能。 接下来就可以用于服务器的优雅退出功能:

async def mock_conn_io() -> None:
    await asyncio.sleep(1)
def conn_handle(set_event: SetEvent):
    task: asyncio.Task = asyncio.create_task(mock_conn_io())
    set_event.add(task)
    task.add_done_callback(lambda t: set_event.remove(t))
async def main():
    set_event: SetEvent = SetEvent()
    for _ in range(10):
        conn_handle(set_event)
	# 假设这里收到了退出信号
    await asyncio.wait(set_event.wait(), timeout=9)
asyncio.run(main())

在这个演示功能中,mock_conn_io用于模拟服务器的连接正在处理中,而conn_handle用于创建服务器连接,main则是先创建10个连接,并模拟在收到退出信号后等待资源为空或者超时才退出服务。

这只是简单的演示,实际上的优雅关闭功能要考虑的东西不仅仅是这些。

4.Condition

condition只做简单介绍

asyncio.Condition是同步原语中使用最少的一种,因为他使用情况很奇怪,而且大部分场景可以被其他写法代替,比如下面这个例子:

import asyncio
async def task(condition, work_list):
    await asyncio.sleep(1)
    work_list.append(33)
    print(&#39;Task sending notification...&#39;)
    async with condition:
        condition.notify()
async def main():
    condition = asyncio.Condition()
    work_list = list()
    print(&#39;Main waiting for data...&#39;)
    async with condition:
        _ = asyncio.create_task(task(condition, work_list))
        await condition.wait()
    print(f&#39;Got data: {work_list}&#39;)
asyncio.run(main())
# &gt;&gt;&gt; Main waiting for data...
# &gt;&gt;&gt; Task sending notification...
# &gt;&gt;&gt; Got data: [33]

在这个例子中可以看到,notifywait方法只能在async with condition中可以使用,如果没有在async with condition中使用则会报错,同时这个示例代码有点复杂,没办法一看就知道执行逻辑是什么,其实这个逻辑可以转变成一个更简单的写法:

import asyncio
async def task(work_list):
    await asyncio.sleep(1)
    work_list.append(33)
    print(&#39;Task sending notification...&#39;)
    return
async def main():
    work_list = list()
    print(&#39;Main waiting for data...&#39;)
    _task = asyncio.create_task(task(work_list))
    await _task
    print(f&#39;Got data: {work_list}&#39;)
asyncio.run(main())
# &gt;&gt;&gt; Main waiting for data...
# &gt;&gt;&gt; Task sending notification...
# &gt;&gt;&gt; Got data: [33]

通过这个代码可以看到这个写法更简单一点,而且更有逻辑性,而condition的写法却更有点Go协程写法/或者回调函数写法的感觉。 所以建议在认为自己的代码可能会用到asyncio.Conditon时需要先考虑到是否需要asyncio.Codition?是否有别的方案代替,如果没有才考虑去使用asyncio.Conditonk。

5.Semaphore

asyncio.Semaphore--信号量是同步原语中被使用最频繁的,大多数都是用在限流场景中,比如用在爬虫中和客户端网关中限制请求频率。

asyncio.Semaphore可以认为是一个延缓触发的asyncio.Lockasyncio.Semaphore内部会维护一个计数器,无论何时进行获取或释放,它都会递增或者递减(但不会超过边界值),当计数器归零时,就会进入到锁的逻辑,但是这个锁逻辑会在计数器大于0的时候释放j,它的用法如下:`

import asyncio
async def main():
    semaphore = asyncio.Semaphore(10):
    async with semaphore:
        pass
asyncio.run(main())

示例中代码通过async with来指明一个代码块(代码用pass代替),这个代码块是被asyncio.Semaphore管理的,每次协程在进入代码块时,asyncio.Semaphore的内部计数器就会递减一,而离开代码块则asyncio.Semaphore的内部计数器会递增一。

当有一个协程进入代码块时asyncio.Semaphore发现计数器已经为0了,则会使当前协程进入等待状态,直到某个协程离开这个代码块时,计数器会递增一,并唤醒等待的协程,使其能够进入代码块中继续执行。

asyncio.Semaphore的源码如下,需要注意的是由于asyncio.Semaphore是一个延缓的asyncio.Lock,所以当调用一次release后可能会导致被唤醒的协程和刚进入代码块的协程起冲突,所以在acquire方法中要通过一个while循环来解决这个问题:`

class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
    def __init__(self, value=1):
        if value < 0:
            raise ValueError("Semaphore initial value must be >= 0")
        self._value = value
        self._waiters = collections.deque()
        self._wakeup_scheduled = False
    def _wake_up_next(self):
        while self._waiters:
            # 按照放置顺序依次弹出容器 
            waiter = self._waiters.popleft()
            if not waiter.done():
                # 设置容器状态,使对应的协程可以继续执行
                waiter.set_result(None)
                # 设置标记 
                self._wakeup_scheduled = True
                return
    def locked(self):
        return self._value == 0
    async def acquire(self):
        # 如果`self._wakeup_scheduled`为True或者value小于0
        while self._wakeup_scheduled or self._value <= 0:
            # 创建容器并等待执行完成
            fut = self._get_loop().create_future()
            self._waiters.append(fut)
            try:
                await fut
                self._wakeup_scheduled = False
            except exceptions.CancelledError:
                # 如果被取消了,也要唤醒下一个协程
                self._wake_up_next()
                raise
        self._value -= 1
        return True
    def release(self):
        # 释放资源占用,唤醒下一个协程。
        self._value += 1
        self._wake_up_next()

针对asyncio.Semaphore进行修改可以实现很多功能,比如基于信号量可以实现一个简单的协程池,这个协程池可以限制创建协程的量,当协程池满的时候就无法继续创建协程,只有协程中的协程执行完毕后才能继续创建(当然无法控制在协程中创建新的协程),代码如下:

import asyncio
import time
from typing import Coroutine
class Pool(object):
    def __init__(self, max_concurrency: int):
        self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrency)
    async def create_task(self, coro: Coroutine) -> asyncio.Task:
        await  self._semaphore.acquire()
        task: asyncio.Task = asyncio.create_task(coro)
        task.add_done_callback(lambda t: self._semaphore.release())
        return task
async def demo(cnt: int) -> None:
    print(f"{int(time.time())} create {cnt} task...")
    await  asyncio.sleep(cnt)
async def main() -> None:
    pool: Pool = Pool(3)
    for i in range(10):
        await pool.create_task(demo(i))
asyncio.run(main())
# >>> 1677517996 create 0 task...
# >>> 1677517996 create 1 task...
# >>> 1677517996 create 2 task...
# >>> 1677517996 create 3 task...
# >>> 1677517997 create 4 task...
# >>> 1677517998 create 5 task...
# >>> 1677517999 create 6 task...
# >>> 1677518001 create 7 task...
# >>> 1677518003 create 8 task...
# >>> 1677518005 create 9 task...

위 내용은 Python Asyncio 라이브러리에서 일반적으로 사용되는 동기화 기본 기능은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제