Asyncio
의 동기화 프리미티브는 리소스 경쟁 코드 작성을 단순화하고 리소스 경쟁으로 인한 버그 발생을 방지할 수 있습니다. 하지만 코루틴의 특성상 대부분의 비즈니스 코드에서는 리소스 경쟁을 고려할 필요가 없으므로 Asyncio
동기화 프리미티브는 덜 자주 사용됩니다. >Asyncio 기반 프레임워크를 작성하려면 동기화 기본 요소의 사용을 배워야 합니다. Asyncio
的同步原语可以简化我们编写资源竞争的代码和规避资源竞争导致的Bug的出现。 但是由于协程的特性,在大部分业务代码中并不需要去考虑资源竞争的出现,导致Asyncio
同步原语被使用的频率比较低,但是如果想基于Asyncio
编写框架则需要学习同步原语的使用。
同步原语都是适用于某些条件下对某个资源的争夺,在代码中大部分的资源都是属于一个代码块,而Python
对于代码块的管理的最佳实践是使用with
语法,with
语法实际上是调用了一个类中的__enter__
和__exit__
方法,比如下面的代码:
class Demo(object): def __enter__(self): return def __exit__(self, exc_type, exc_val, exc_tb): return with Demo(): pass
代码中的Demo
类实现了__enter__
和__exit__
方法后,就可以被with
语法调用,其中__enter__
方法是进入代码块执行的逻辑,__enxi__
方法是用于退出代码块(包括异常退出)的逻辑。这两个方法符合同步原语中对资源的争夺和释放,但是__enter__
和__exit__
两个方法都是不支持await
调用的,为了解决这个问题,Python
引入了async with
语法。
async with
语法和with
语法类似 ,我们只要编写一个拥有__aenter__
和__aexit__
方法的类,那么这个类就支持asyncio with
语法了,如下:
import asyncio class Demo(object): async def __aenter__(self): return async def __aexit__(self, exc_type, exc_val, exc_tb): return async def main(): async with Demo(): pass asyncio.run(main())
其中,类中的__aenter__
方法是进入代码块时执行的方法,__aexit__
是退出代码块时执行的方法。
有了async with
语法的加持,asyncio
的同步原语使用起来会比较方便,所以asyncio
中对资源争夺的同步原语都会继承于_ContextManagerMixin
类:
class _ContextManagerMixin: async def __aenter__(self): await self.acquire() # We have no use for the "as ..." clause in the with # statement for locks. return None async def __aexit__(self, exc_type, exc, tb): self.release()
并实现了acquire
和release
方法,供__aenter__
和__aexit__
方法调用,同时我们在使用同步原语的时候尽量用到async with
语法防止忘记释放资源的占用。
由于协程的特性,在编写协程代码时基本上可以不考虑到锁的情况,但在一些情况下我们还是需要用到锁,并通过锁来维护并发时的数据安全性,如下例子:
import asyncio share_data = {} async def sub(i): # 赋上相同的key和value share_data[i] = i await asyncio.sleep(0) print(i, share_data[i] == i) async def sub_add(i): # 赋上的value值是原来的+1 share_data[i] = i + 1 await asyncio.sleep(0) print(i, share_data[i] == i + 1) async def main(): # 创建并发任务 task_list = [] for i in range(10): task_list.append(sub(i)) task_list.append(sub_add(i)) # 并发执行 await asyncio.gather(*task_list) if __name__ == "__main__": asyncio.run(main())
在这个例子中程序会并发的执行sub
和sub_add
函数,他们是由不同的asyncio.Task
驱动的,这意味着会出现这样一个场景。 当负责执行sub(1)
函数的asyncio.Task
在执行完share_data[i]=i
后就执行await asyncio.sleep(0)
从而主动让出控制权并交还给事件循环,等待事件循环的下一次调度。 不过事件循环不会空下来,而是马上安排下一个asyncio.Task
执行,此时会先执行到sub_add(1)
函数的share_data[i] = i + 1
,并同样的在执行到await asyncio.sleep(0)
的时候把控制权交会给事件循环。 这时候控制权会由事件循环转移给原先执行sub(1)
函数的asyncio.Task
,获取到控制权l后sub(1)
函数的逻辑会继续走,但由于share_data[i]
的数据已经被share_data[i] = i + 1
修改了,导致最后执行print
时,share_data[i]
的数据已经变为脏数据,而不是原本想要的数据了。
为了解决这个问题,我们可以使用asyncio.Lock
来解决资源的冲突,如下:
import asyncio share_data = {} # 存放对应资源的锁 lock_dict = {} async def sub(i): async with lock_dict[i]: # <-- 通过async with语句来控制锁的粒度 share_data[i] = i await asyncio.sleep(0) print(i, share_data[i] == i) async def sub_add(i): async with lock_dict[i]: share_data[i] = i + 1 await asyncio.sleep(0) print(i, share_data[i] == i + 1) async def main(): task_list = [] for i in range(10): lock_dict[i] = asyncio.Lock() task_list.append(sub(i)) task_list.append(sub_add(i)) await asyncio.gather(*task_list) if __name__ == "__main__": asyncio.run(main())
从例子可以看到asyncio.Lock
的使用方法跟多线程的Lock
差不多,通过async with
语法来获取和释放锁,它的原理也很简单,主要做了如下几件事:
1.确保某一协程获取锁后的执行期间,别的协程在获取锁时需要一直等待,直到执行完成并释放锁。
2.当有协程持有锁的时候,其他协程必须等待,直到持有锁的协程释放了锁。
2.确保所有协程能够按照获取的顺序获取到锁。
这意味着需要有一个数据结构来维护当前持有锁的协程的和下一个获取锁协程的关系,同时也需要一个队列来维护多个获取锁的协程的唤醒顺序。
asyncio.Lock
跟其它asyncio
功能的用法一样,使用asyncio.Future
来同步协程之间锁的状态,使用deque
Python
은 해당 코드에 적합하지 않습니다. 블록 관리에 대한 모범 사례는 with
구문을 사용하는 것입니다. with
구문은 실제로 클래스에서 __enter__
및 __exit__를 호출합니다. code> 메서드(예: 다음 코드): 🎜<pre class="brush:py;">class Lockl(_ContextManagerMixin, mixins._LoopBoundMixin):
def __init__(self):
self._waiters = None
self._locked = False
def locked(self):
return self._locked
async def acquire(self):
if (not self._locked and (self._waiters is None or all(w.cancelled() for w in self._waiters))):
# 目前没有其他协程持有锁,当前协程可以运行
self._locked = True
return True
if self._waiters is None:
self._waiters = collections.deque()
# 创建属于自己的容器,并推送到`_waiters`这个双端队列中
fut = self._get_loop().create_future()
self._waiters.append(fut)
try:
try:
await fut
finally:
# 如果执行完毕,需要把自己移除,防止被`wake_up_first`调用
self._waiters.remove(fut)
except exceptions.CancelledError:
# 如果是等待的过程中被取消了,需要唤醒下一个调用`acquire`
if not self._locked:
self._wake_up_first()
raise
# 持有锁
self._locked = True
return True
def release(self):
if self._locked:
# 释放锁
self._locked = False
self._wake_up_first()
else:
raise RuntimeError(&#39;Lock is not acquired.&#39;)
def _wake_up_first(self):
if not self._waiters:
return
# 获取还处于锁状态协程对应的容器
try:
# 获取下一个等待获取锁的waiter
fut = next(iter(self._waiters))
except StopIteration:
return
# 设置容器为True,这样对应协程就可以继续运行了。
if not fut.done():
fut.set_result(True)</pre>🎜코드의 <code>Demo
클래스가 __enter__
및 __exit__
메서드를 구현한 후 다음을 수행할 수 있습니다. with
구문에 의해 호출되는 __enter__
메서드는 코드 블록 실행을 시작하는 논리이고 __enxi__
메서드는 코드를 종료하는 데 사용됩니다. 차단(비정상 종료 포함) 논리. 이 두 메서드는 동기화 프리미티브의 리소스 경합 및 해제를 따르지만 __enter__
및 __exit__
메서드는 순서대로 await
호출을 지원하지 않습니다. 이 문제를 해결하기 위해 Python
에서는 async with
구문을 도입했습니다. 🎜🎜async with
구문은 with
구문과 유사합니다. __aenter__
및 __aexit__
만 사용하여 클래스를 작성하면 됩니다. 그러면 이 클래스는 다음과 같은 asyncio with
구문을 지원합니다. 🎜import asyncio class TimeoutLock(asyncio.Lock): def __init__(self, timeout, *, loop=None): self.timeout = timeout super().__init__(loop=loop) async def acquire(self) -> bool: return await asyncio.wait_for(super().acquire(), self.timeout)🎜 그 중 클래스의
__aenter__
메서드는 코드 블록에 들어갈 때 실행되는 메서드인데, __aexit__는 코드 블록을 종료할 때 실행되는 메서드입니다. 🎜🎜 <code>async with
구문의 축복으로 asyncio
의 동기화 프리미티브를 사용하는 것이 더 편리하므로 asyncio
에서 리소스 경합의 동기화가 가능합니다. > 프리미티브는 _ContextManagerMixin
클래스인 🎜class TimeoutLock(asyncio.Lock): def __init__(self, timeout, *, loop=None): self.timeout = timeout super().__init__(loop=loop) async def acquire(self) -> bool: try: return await asyncio.wait_for(super().acquire(), self.timeout) except Exception: self._wake_up_first() raise🎜에서 상속하고
__aenter__
에 대한 acquire
및 release
메서드를 구현합니다. __aexit__
메서드 호출 및 동기화 프리미티브를 사용할 때 리소스 점유 해제를 잊지 않도록 async with
구문을 사용하려고 합니다. 🎜🎜1.Lock🎜🎜코루틴의 특성상 코루틴 코드 작성 시 기본적으로 잠금을 고려할 수 없지만, 경우에 따라서는 잠금을 사용해야 하며, 동시성 데이터 보안을 유지하기 위해 잠금을 사용해야 하는 경우는 다음 예시와 같습니다. : 🎜import asyncio async def sub(event: asyncio.Event) -> None: await event.wait() print("I'm Done") async def main() -> None: event = asyncio.Event() for _ in range(10): asyncio.create_task(sub(event)) await asyncio.sleep(1) event.set() asyncio.run(main())🎜이 예에서 프로그램은
sub
및 sub_add
함수를 동시에 실행하며 서로 다른 asyncio.Task code> 기반으로 구성됩니다. , 이는 그러한 시나리오가 발생함을 의미합니다. <code>sub(1)
함수 실행을 담당하는 asyncio.Task
가 share_data[i]=i
실행 후 await asyncio를 실행하는 경우. sleep(0)
따라서 제어를 적극적으로 포기하고 이벤트 루프로 반환하여 이벤트 루프의 다음 전달을 기다립니다. 그러나 이벤트 루프는 비어 있지 않고 다음 asyncio.Task
실행을 즉시 정렬합니다. 이때 sub_add(1)share_data[는 다음과 같습니다. /code> 함수가 먼저 실행됩니다. i] = i + 1
, 마찬가지로 await asyncio.sleep(0)
을 실행할 때 이벤트 루프로 제어를 전달합니다. 이때 제어권은 이벤트 루프에서 원래 sub(1)
함수를 실행했던 asyncio.Task
로 전달됩니다. 제어권을 얻은 후, sub( 1)함수의 논리는 계속되지만 <code>share_data[i]
의 데이터가 share_data[i] = i + 1
에 의해 수정되었기 때문에, 인쇄할 때
의 마지막 실행으로 share_data[i]
의 데이터가 원래 의도했던 데이터가 아닌 더티 데이터가 되었습니다. 🎜🎜이 문제를 해결하려면 다음과 같이 asyncio.Lock
을 사용하여 리소스 충돌을 해결할 수 있습니다. 🎜class Event(mixins._LoopBoundMixin): def __init__(self): self._waiters = collections.deque() self._value = False def is_set(self): return self._value def set(self): if not self._value: # 确保每次只能set一次 self._value = True # 设置每个协程存放的容器为True,这样对应的协程就可以运行了 for fut in self._waiters: if not fut.done(): fut.set_result(True) def clear(self): # 清理上一次的set self._value = False async def wait(self): if self._value: # 如果设置了,就不需要等待了 return True # 否则需要创建一个容器,并需要等待容器完成 fut = self._get_loop().create_future() self._waiters.append(fut) try: await fut return True finally: self._waiters.remove(fut)🎜예제에서
asyncio.Lock 다중 스레드 <code>Lock
은 async with
구문을 사용하여 잠금을 획득하고 해제하는 원리도 매우 간단합니다. 🎜 asyncio.Lock
은 다른 asyncio
함수와 동일하게 사용됩니다. 코루틴 간 잠금 상태를 동기화하려면 asyncio.Future
를 사용하세요. >deque는 코루틴 간의 wake-up 순서를 유지합니다. 소스 코드는 다음과 같습니다. 🎜import asyncio class SetEvent(asyncio.Event): def __init__(self, *, loop=None): self._set = set() super().__init__(loop=loop) def add(self, value): self._set.add(value) self.clear() def remove(self, value): self._set.remove(value) if not self._set: self.set()🎜 소스 코드를 보면 잠금이 주로 획득 및 해제 기능을 제공한다는 것을 알 수 있습니다. 자물쇠를 획득한 것으로 유명함: 🎜
1:当有协程想要获取锁时会先判断锁是否被持有,如果当前锁没有被持有就直接返回,使协程能够正常运行。
2:如果协程获取锁时,锁发现自己已经被其他协程持有则创建一个属于当前协程的asyncio.Future
,用来同步状态,并添加到deque
中。
而对于释放锁就比较简单,只要获取deque
中的第一个asyncio.Future
,并通过fut.set_result(True)
进行标记,使asyncio.Future
从peding
状态变为done
状态,这样一来,持有该asyncio.Future
的协程就能继续运行,从而持有锁。
不过需要注意源码中acquire
方法中对CancelledError
异常进行捕获,再唤醒下一个锁,这是为了解决acquire
方法执行异常导致锁一直被卡住的场景,通常情况下这能解决大部分的问题,但是如果遇到错误的封装时,我们需要亲自处理异常,并执行锁的唤醒。比如在通过继承asyncio.Lock
编写一个超时锁时,最简单的实现代码如下:
import asyncio class TimeoutLock(asyncio.Lock): def __init__(self, timeout, *, loop=None): self.timeout = timeout super().__init__(loop=loop) async def acquire(self) -> bool: return await asyncio.wait_for(super().acquire(), self.timeout)
这份代码非常简单,他只需要在__init__
方法传入timeout
参数,并在acuiqre
方法中通过wait_for
来实现锁超时即可,现在假设wait_for
方法是一个无法传递协程cancel
的方法,且编写的acquire
没有进行捕获异常再释放锁的操作,当异常发生的时候会导致锁一直被卡住。 为了解决这个问题,只需要对TimeoutLock
的acquire
方法添加异常捕获,并在捕获到异常时释放锁即可,代码如下:
class TimeoutLock(asyncio.Lock): def __init__(self, timeout, *, loop=None): self.timeout = timeout super().__init__(loop=loop) async def acquire(self) -> bool: try: return await asyncio.wait_for(super().acquire(), self.timeout) except Exception: self._wake_up_first() raise
asyncio.Event
也是一个简单的同步原语,但它跟asyncio.Lock
不一样,asyncio.Lock
是确保每个资源只能被一个协程操作,而asyncio.Event
是确保某个资源何时可以被协程操作,可以认为asyncio.Lock
锁的是资源,asyncio.Event
锁的是协程,所以asyncio.Event
并不需要acquire
来锁资源,release
释放资源,所以也用不到async with
语法。
asyncio.Event
的简单使用示例如下:
import asyncio async def sub(event: asyncio.Event) -> None: await event.wait() print("I'm Done") async def main() -> None: event = asyncio.Event() for _ in range(10): asyncio.create_task(sub(event)) await asyncio.sleep(1) event.set() asyncio.run(main())
在这个例子中会先创建10个asyncio.Task
来执行sub
函数,但是所有sub
函数都会在event.wait
处等待,直到main
函数中调用event.set
后,所有的sub
函数的event.wait
会放行,使sub
函数能继续执行。
可以看到asyncio.Event
功能比较简单,它的源码实现也很简单,源码如下:
class Event(mixins._LoopBoundMixin): def __init__(self): self._waiters = collections.deque() self._value = False def is_set(self): return self._value def set(self): if not self._value: # 确保每次只能set一次 self._value = True # 设置每个协程存放的容器为True,这样对应的协程就可以运行了 for fut in self._waiters: if not fut.done(): fut.set_result(True) def clear(self): # 清理上一次的set self._value = False async def wait(self): if self._value: # 如果设置了,就不需要等待了 return True # 否则需要创建一个容器,并需要等待容器完成 fut = self._get_loop().create_future() self._waiters.append(fut) try: await fut return True finally: self._waiters.remove(fut)
通过源码可以看到wait
方法主要是创建了一个asyncio.Future
,并把它加入到deque
队列后就一直等待着,而set
方法被调用时会遍历整个deque
队列,并把处于peding
状态的asyncio.Future
设置为done
,这时其他在调用event.wait
方法的协程就会得到放行。
通过源码也可以看出,asyncio.Event
并没有继承于_ContextManagerMixin
,这是因为它锁的是协程,而不是资源。
asyncio.Event
的使用频率比asyncio.Lock
多许多,不过通常都会让asyncio.Event
和其他数据结构进行封装再使用,比如实现一个服务器的优雅关闭功能,这个功能会确保服务器在等待n秒后或者所有连接都关闭后才关闭服务器,这个功能就可以使用set
与asyncio.Event
结合,如下:
import asyncio class SetEvent(asyncio.Event): def __init__(self, *, loop=None): self._set = set() super().__init__(loop=loop) def add(self, value): self._set.add(value) self.clear() def remove(self, value): self._set.remove(value) if not self._set: self.set()
这个SetEvent
结合了set
和SetEvent
的功能,当set
有数据的时候,会通过clear
方法使SetEvent
变为等待状态,而set
没数据的时候,会通过set
方法使SetEvent
变为无需等待的状态,所有调用wait
的协程都可以放行,通过这种结合,SetEvent
拥有了等待资源为空的功能。 接下来就可以用于服务器的优雅退出功能:
async def mock_conn_io() -> None: await asyncio.sleep(1) def conn_handle(set_event: SetEvent): task: asyncio.Task = asyncio.create_task(mock_conn_io()) set_event.add(task) task.add_done_callback(lambda t: set_event.remove(t)) async def main(): set_event: SetEvent = SetEvent() for _ in range(10): conn_handle(set_event) # 假设这里收到了退出信号 await asyncio.wait(set_event.wait(), timeout=9) asyncio.run(main())
在这个演示功能中,mock_conn_io
用于模拟服务器的连接正在处理中,而conn_handle
用于创建服务器连接,main
则是先创建10个连接,并模拟在收到退出信号后等待资源为空或者超时才退出服务。
这只是简单的演示,实际上的优雅关闭功能要考虑的东西不仅仅是这些。
condition只做简单介绍
asyncio.Condition
是同步原语中使用最少的一种,因为他使用情况很奇怪,而且大部分场景可以被其他写法代替,比如下面这个例子:
import asyncio async def task(condition, work_list): await asyncio.sleep(1) work_list.append(33) print('Task sending notification...') async with condition: condition.notify() async def main(): condition = asyncio.Condition() work_list = list() print('Main waiting for data...') async with condition: _ = asyncio.create_task(task(condition, work_list)) await condition.wait() print(f'Got data: {work_list}') asyncio.run(main()) # >>> Main waiting for data... # >>> Task sending notification... # >>> Got data: [33]
在这个例子中可以看到,notify
和wait
方法只能在async with condition
中可以使用,如果没有在async with condition
中使用则会报错,同时这个示例代码有点复杂,没办法一看就知道执行逻辑是什么,其实这个逻辑可以转变成一个更简单的写法:
import asyncio async def task(work_list): await asyncio.sleep(1) work_list.append(33) print('Task sending notification...') return async def main(): work_list = list() print('Main waiting for data...') _task = asyncio.create_task(task(work_list)) await _task print(f'Got data: {work_list}') asyncio.run(main()) # >>> Main waiting for data... # >>> Task sending notification... # >>> Got data: [33]
通过这个代码可以看到这个写法更简单一点,而且更有逻辑性,而condition
的写法却更有点Go
协程写法/或者回调函数写法的感觉。 所以建议在认为自己的代码可能会用到asyncio.Conditon
时需要先考虑到是否需要asyncio.Codition
?是否有别的方案代替,如果没有才考虑去使用asyncio.Conditon
k。
asyncio.Semaphore
--信号量是同步原语中被使用最频繁的,大多数都是用在限流场景中,比如用在爬虫中和客户端网关中限制请求频率。
asyncio.Semaphore
可以认为是一个延缓触发的asyncio.Lock
,asyncio.Semaphore
内部会维护一个计数器,无论何时进行获取或释放,它都会递增或者递减(但不会超过边界值),当计数器归零时,就会进入到锁的逻辑,但是这个锁逻辑会在计数器大于0的时候释放j,它的用法如下:`
import asyncio async def main(): semaphore = asyncio.Semaphore(10): async with semaphore: pass asyncio.run(main())
示例中代码通过async with
来指明一个代码块(代码用pass
代替),这个代码块是被asyncio.Semaphore
管理的,每次协程在进入代码块时,asyncio.Semaphore
的内部计数器就会递减一,而离开代码块则asyncio.Semaphore
的内部计数器会递增一。
当有一个协程进入代码块时asyncio.Semaphore
发现计数器已经为0了,则会使当前协程进入等待状态,直到某个协程离开这个代码块时,计数器会递增一,并唤醒等待的协程,使其能够进入代码块中继续执行。
asyncio.Semaphore
的源码如下,需要注意的是由于asyncio.Semaphore
是一个延缓的asyncio.Lock
,所以当调用一次release
后可能会导致被唤醒的协程和刚进入代码块的协程起冲突,所以在acquire
方法中要通过一个while
循环来解决这个问题:`
class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin): def __init__(self, value=1): if value < 0: raise ValueError("Semaphore initial value must be >= 0") self._value = value self._waiters = collections.deque() self._wakeup_scheduled = False def _wake_up_next(self): while self._waiters: # 按照放置顺序依次弹出容器 waiter = self._waiters.popleft() if not waiter.done(): # 设置容器状态,使对应的协程可以继续执行 waiter.set_result(None) # 设置标记 self._wakeup_scheduled = True return def locked(self): return self._value == 0 async def acquire(self): # 如果`self._wakeup_scheduled`为True或者value小于0 while self._wakeup_scheduled or self._value <= 0: # 创建容器并等待执行完成 fut = self._get_loop().create_future() self._waiters.append(fut) try: await fut self._wakeup_scheduled = False except exceptions.CancelledError: # 如果被取消了,也要唤醒下一个协程 self._wake_up_next() raise self._value -= 1 return True def release(self): # 释放资源占用,唤醒下一个协程。 self._value += 1 self._wake_up_next()
针对asyncio.Semaphore
进行修改可以实现很多功能,比如基于信号量可以实现一个简单的协程池,这个协程池可以限制创建协程的量,当协程池满的时候就无法继续创建协程,只有协程中的协程执行完毕后才能继续创建(当然无法控制在协程中创建新的协程),代码如下:
import asyncio import time from typing import Coroutine class Pool(object): def __init__(self, max_concurrency: int): self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrency) async def create_task(self, coro: Coroutine) -> asyncio.Task: await self._semaphore.acquire() task: asyncio.Task = asyncio.create_task(coro) task.add_done_callback(lambda t: self._semaphore.release()) return task async def demo(cnt: int) -> None: print(f"{int(time.time())} create {cnt} task...") await asyncio.sleep(cnt) async def main() -> None: pool: Pool = Pool(3) for i in range(10): await pool.create_task(demo(i)) asyncio.run(main()) # >>> 1677517996 create 0 task... # >>> 1677517996 create 1 task... # >>> 1677517996 create 2 task... # >>> 1677517996 create 3 task... # >>> 1677517997 create 4 task... # >>> 1677517998 create 5 task... # >>> 1677517999 create 6 task... # >>> 1677518001 create 7 task... # >>> 1677518003 create 8 task... # >>> 1677518005 create 9 task...
위 내용은 Python Asyncio 라이브러리에서 일반적으로 사용되는 동기화 기본 기능은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!