Home > Article > Backend Development > What are the commonly used functions of synchronization primitives in Python Asyncio library?
Asyncio
's synchronization primitives can simplify us in writing resource competition code and avoid the occurrence of bugs caused by resource competition. However, due to the characteristics of coroutines, there is no need to consider resource competition in most business codes, resulting in Asyncio
synchronization primitives being used less frequently, but if you want to use Asyncio
When writing a framework, you need to learn the use of synchronization primitives.
Synchronization primitives are suitable for competing for a certain resource under certain conditions. Most of the resources in the code belong to a code block, and Python
The best practice for the management of code blocks is to use the with
syntax. The with
syntax actually calls __enter__
and __exit__
method, such as the following code:
class Demo(object): def __enter__(self): return def __exit__(self, exc_type, exc_val, exc_tb): return with Demo(): pass
The Demo
class in the code implements the __enter__
and __exit__
methods After that, it can be called by the with
syntax, where the __enter__
method is the logic to enter the code block execution, and the __enxi__
method is used to exit the code block (including abnormal exit ) logic. These two methods conform to the contention and release of resources in the synchronization primitive, but the two methods __enter__
and __exit__
do not support await
calls. In order To solve this problem, Python
introduced the async with
syntax.
async with
syntax is similar to with
syntax, we only need to write a class with __aenter__
and __aexit__
methods, Then this class supports the asyncio with
syntax, as follows:
import asyncio class Demo(object): async def __aenter__(self): return async def __aexit__(self, exc_type, exc_val, exc_tb): return async def main(): async with Demo(): pass asyncio.run(main())
Among them, the __aenter__
method in the class is the method executed when entering the code block, __aexit__
is the method executed when exiting the code block.
With the blessing of async with
syntax, the synchronization primitives of asyncio
will be more convenient to use, so the resource competition in asyncio
Synchronization primitives will inherit from the _ContextManagerMixin
class:
class _ContextManagerMixin: async def __aenter__(self): await self.acquire() # We have no use for the "as ..." clause in the with # statement for locks. return None async def __aexit__(self, exc_type, exc, tb): self.release()
and implement the acquire
and release
methods for __aenter__
and __aexit__
method calls. At the same time, when using synchronization primitives, we try to use the async with
syntax to prevent forgetting to release resource occupation.
Due to the characteristics of coroutines, locks can basically not be considered when writing coroutine code, but in some cases we still need to use locks and Data security during concurrency is maintained through locks, as shown in the following example:
import asyncio share_data = {} async def sub(i): # 赋上相同的key和value share_data[i] = i await asyncio.sleep(0) print(i, share_data[i] == i) async def sub_add(i): # 赋上的value值是原来的+1 share_data[i] = i + 1 await asyncio.sleep(0) print(i, share_data[i] == i + 1) async def main(): # 创建并发任务 task_list = [] for i in range(10): task_list.append(sub(i)) task_list.append(sub_add(i)) # 并发执行 await asyncio.gather(*task_list) if __name__ == "__main__": asyncio.run(main())
In this example, the program will execute the sub
and sub_add
functions concurrently. They are composed of Different asyncio.Task
drivers, which means that such a scenario will occur. When the asyncio.Task
responsible for executing the sub(1)
function executes await asyncio.sleep(( 0)
Thus actively giving up control and returning it to the event loop, waiting for the next dispatch of the event loop. However, the event loop will not be empty, but will immediately arrange the execution of the next asyncio.Task
. At this time, share_data[i] of the
sub_add(1) function will be executed first. = i 1
, and similarly transfer control to the event loop when executing await asyncio.sleep(0)
. At this time, control will be transferred from the event loop to asyncio.Task
that originally executed the sub(1)
function. After obtaining control lsub(1)
The logic of the function will continue, but because the data of share_data[i]
has been modified by share_data[i] = i 1
, the final execution of print
, the data in share_data[i]
has become dirty data, not the originally intended data. In order to solve this problem, we can use
to resolve resource conflicts, as follows: <pre class="brush:py;">import asyncio
share_data = {}
# 存放对应资源的锁
lock_dict = {}
async def sub(i):
async with lock_dict[i]: # <-- 通过async with语句来控制锁的粒度
share_data[i] = i
await asyncio.sleep(0)
print(i, share_data[i] == i)
async def sub_add(i):
async with lock_dict[i]:
share_data[i] = i + 1
await asyncio.sleep(0)
print(i, share_data[i] == i + 1)
async def main():
task_list = []
for i in range(10):
lock_dict[i] = asyncio.Lock()
task_list.append(sub(i))
task_list.append(sub_add(i))
await asyncio.gather(*task_list)
if __name__ == "__main__":
asyncio.run(main())</pre>
As you can see from the example
Lock. The lock is acquired and released through the
async with syntax. Its principle is also very simple. It mainly does the following things:
asyncio.Lock
is used the same as otherasyncio functions. Use
asyncio.Future to synchronize the lock status between coroutines. Use
deque Maintain the wake-up sequence between coroutines. The source code is as follows:
class Lockl(_ContextManagerMixin, mixins._LoopBoundMixin): def __init__(self): self._waiters = None self._locked = False def locked(self): return self._locked async def acquire(self): if (not self._locked and (self._waiters is None or all(w.cancelled() for w in self._waiters))): # 目前没有其他协程持有锁,当前协程可以运行 self._locked = True return True if self._waiters is None: self._waiters = collections.deque() # 创建属于自己的容器,并推送到`_waiters`这个双端队列中 fut = self._get_loop().create_future() self._waiters.append(fut) try: try: await fut finally: # 如果执行完毕,需要把自己移除,防止被`wake_up_first`调用 self._waiters.remove(fut) except exceptions.CancelledError: # 如果是等待的过程中被取消了,需要唤醒下一个调用`acquire` if not self._locked: self._wake_up_first() raise # 持有锁 self._locked = True return True def release(self): if self._locked: # 释放锁 self._locked = False self._wake_up_first() else: raise RuntimeError('Lock is not acquired.') def _wake_up_first(self): if not self._waiters: return # 获取还处于锁状态协程对应的容器 try: # 获取下一个等待获取锁的waiter fut = next(iter(self._waiters)) except StopIteration: return # 设置容器为True,这样对应协程就可以继续运行了。 if not fut.done(): fut.set_result(True)
From the source code, we can know that the lock mainly provides the functions of acquisition and release. Two situations need to be distinguished for acquiring the lock:
1:当有协程想要获取锁时会先判断锁是否被持有,如果当前锁没有被持有就直接返回,使协程能够正常运行。
2:如果协程获取锁时,锁发现自己已经被其他协程持有则创建一个属于当前协程的asyncio.Future
,用来同步状态,并添加到deque
中。
而对于释放锁就比较简单,只要获取deque
中的第一个asyncio.Future
,并通过fut.set_result(True)
进行标记,使asyncio.Future
从peding
状态变为done
状态,这样一来,持有该asyncio.Future
的协程就能继续运行,从而持有锁。
不过需要注意源码中acquire
方法中对CancelledError
异常进行捕获,再唤醒下一个锁,这是为了解决acquire
方法执行异常导致锁一直被卡住的场景,通常情况下这能解决大部分的问题,但是如果遇到错误的封装时,我们需要亲自处理异常,并执行锁的唤醒。比如在通过继承asyncio.Lock
编写一个超时锁时,最简单的实现代码如下:
import asyncio class TimeoutLock(asyncio.Lock): def __init__(self, timeout, *, loop=None): self.timeout = timeout super().__init__(loop=loop) async def acquire(self) -> bool: return await asyncio.wait_for(super().acquire(), self.timeout)
这份代码非常简单,他只需要在__init__
方法传入timeout
参数,并在acuiqre
方法中通过wait_for
来实现锁超时即可,现在假设wait_for
方法是一个无法传递协程cancel
的方法,且编写的acquire
没有进行捕获异常再释放锁的操作,当异常发生的时候会导致锁一直被卡住。 为了解决这个问题,只需要对TimeoutLock
的acquire
方法添加异常捕获,并在捕获到异常时释放锁即可,代码如下:
class TimeoutLock(asyncio.Lock): def __init__(self, timeout, *, loop=None): self.timeout = timeout super().__init__(loop=loop) async def acquire(self) -> bool: try: return await asyncio.wait_for(super().acquire(), self.timeout) except Exception: self._wake_up_first() raise
asyncio.Event
也是一个简单的同步原语,但它跟asyncio.Lock
不一样,asyncio.Lock
是确保每个资源只能被一个协程操作,而asyncio.Event
是确保某个资源何时可以被协程操作,可以认为asyncio.Lock
锁的是资源,asyncio.Event
锁的是协程,所以asyncio.Event
并不需要acquire
来锁资源,release
释放资源,所以也用不到async with
语法。
asyncio.Event
的简单使用示例如下:
import asyncio async def sub(event: asyncio.Event) -> None: await event.wait() print("I'm Done") async def main() -> None: event = asyncio.Event() for _ in range(10): asyncio.create_task(sub(event)) await asyncio.sleep(1) event.set() asyncio.run(main())
在这个例子中会先创建10个asyncio.Task
来执行sub
函数,但是所有sub
函数都会在event.wait
处等待,直到main
函数中调用event.set
后,所有的sub
函数的event.wait
会放行,使sub
函数能继续执行。
可以看到asyncio.Event
功能比较简单,它的源码实现也很简单,源码如下:
class Event(mixins._LoopBoundMixin): def __init__(self): self._waiters = collections.deque() self._value = False def is_set(self): return self._value def set(self): if not self._value: # 确保每次只能set一次 self._value = True # 设置每个协程存放的容器为True,这样对应的协程就可以运行了 for fut in self._waiters: if not fut.done(): fut.set_result(True) def clear(self): # 清理上一次的set self._value = False async def wait(self): if self._value: # 如果设置了,就不需要等待了 return True # 否则需要创建一个容器,并需要等待容器完成 fut = self._get_loop().create_future() self._waiters.append(fut) try: await fut return True finally: self._waiters.remove(fut)
通过源码可以看到wait
方法主要是创建了一个asyncio.Future
,并把它加入到deque
队列后就一直等待着,而set
方法被调用时会遍历整个deque
队列,并把处于peding
状态的asyncio.Future
设置为done
,这时其他在调用event.wait
方法的协程就会得到放行。
通过源码也可以看出,asyncio.Event
并没有继承于_ContextManagerMixin
,这是因为它锁的是协程,而不是资源。
asyncio.Event
的使用频率比asyncio.Lock
多许多,不过通常都会让asyncio.Event
和其他数据结构进行封装再使用,比如实现一个服务器的优雅关闭功能,这个功能会确保服务器在等待n秒后或者所有连接都关闭后才关闭服务器,这个功能就可以使用set
与asyncio.Event
结合,如下:
import asyncio class SetEvent(asyncio.Event): def __init__(self, *, loop=None): self._set = set() super().__init__(loop=loop) def add(self, value): self._set.add(value) self.clear() def remove(self, value): self._set.remove(value) if not self._set: self.set()
这个SetEvent
结合了set
和SetEvent
的功能,当set
有数据的时候,会通过clear
方法使SetEvent
变为等待状态,而set
没数据的时候,会通过set
方法使SetEvent
变为无需等待的状态,所有调用wait
的协程都可以放行,通过这种结合,SetEvent
拥有了等待资源为空的功能。 接下来就可以用于服务器的优雅退出功能:
async def mock_conn_io() -> None: await asyncio.sleep(1) def conn_handle(set_event: SetEvent): task: asyncio.Task = asyncio.create_task(mock_conn_io()) set_event.add(task) task.add_done_callback(lambda t: set_event.remove(t)) async def main(): set_event: SetEvent = SetEvent() for _ in range(10): conn_handle(set_event) # 假设这里收到了退出信号 await asyncio.wait(set_event.wait(), timeout=9) asyncio.run(main())
在这个演示功能中,mock_conn_io
用于模拟服务器的连接正在处理中,而conn_handle
用于创建服务器连接,main
则是先创建10个连接,并模拟在收到退出信号后等待资源为空或者超时才退出服务。
这只是简单的演示,实际上的优雅关闭功能要考虑的东西不仅仅是这些。
condition只做简单介绍
asyncio.Condition
是同步原语中使用最少的一种,因为他使用情况很奇怪,而且大部分场景可以被其他写法代替,比如下面这个例子:
import asyncio async def task(condition, work_list): await asyncio.sleep(1) work_list.append(33) print('Task sending notification...') async with condition: condition.notify() async def main(): condition = asyncio.Condition() work_list = list() print('Main waiting for data...') async with condition: _ = asyncio.create_task(task(condition, work_list)) await condition.wait() print(f'Got data: {work_list}') asyncio.run(main()) # >>> Main waiting for data... # >>> Task sending notification... # >>> Got data: [33]
在这个例子中可以看到,notify
和wait
方法只能在async with condition
中可以使用,如果没有在async with condition
中使用则会报错,同时这个示例代码有点复杂,没办法一看就知道执行逻辑是什么,其实这个逻辑可以转变成一个更简单的写法:
import asyncio async def task(work_list): await asyncio.sleep(1) work_list.append(33) print('Task sending notification...') return async def main(): work_list = list() print('Main waiting for data...') _task = asyncio.create_task(task(work_list)) await _task print(f'Got data: {work_list}') asyncio.run(main()) # >>> Main waiting for data... # >>> Task sending notification... # >>> Got data: [33]
通过这个代码可以看到这个写法更简单一点,而且更有逻辑性,而condition
的写法却更有点Go
协程写法/或者回调函数写法的感觉。 所以建议在认为自己的代码可能会用到asyncio.Conditon
时需要先考虑到是否需要asyncio.Codition
?是否有别的方案代替,如果没有才考虑去使用asyncio.Conditon
k。
asyncio.Semaphore
--信号量是同步原语中被使用最频繁的,大多数都是用在限流场景中,比如用在爬虫中和客户端网关中限制请求频率。
asyncio.Semaphore
可以认为是一个延缓触发的asyncio.Lock
,asyncio.Semaphore
内部会维护一个计数器,无论何时进行获取或释放,它都会递增或者递减(但不会超过边界值),当计数器归零时,就会进入到锁的逻辑,但是这个锁逻辑会在计数器大于0的时候释放j,它的用法如下:`
import asyncio async def main(): semaphore = asyncio.Semaphore(10): async with semaphore: pass asyncio.run(main())
示例中代码通过async with
来指明一个代码块(代码用pass
代替),这个代码块是被asyncio.Semaphore
管理的,每次协程在进入代码块时,asyncio.Semaphore
的内部计数器就会递减一,而离开代码块则asyncio.Semaphore
的内部计数器会递增一。
当有一个协程进入代码块时asyncio.Semaphore
发现计数器已经为0了,则会使当前协程进入等待状态,直到某个协程离开这个代码块时,计数器会递增一,并唤醒等待的协程,使其能够进入代码块中继续执行。
asyncio.Semaphore
的源码如下,需要注意的是由于asyncio.Semaphore
是一个延缓的asyncio.Lock
,所以当调用一次release
后可能会导致被唤醒的协程和刚进入代码块的协程起冲突,所以在acquire
方法中要通过一个while
循环来解决这个问题:`
class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin): def __init__(self, value=1): if value < 0: raise ValueError("Semaphore initial value must be >= 0") self._value = value self._waiters = collections.deque() self._wakeup_scheduled = False def _wake_up_next(self): while self._waiters: # 按照放置顺序依次弹出容器 waiter = self._waiters.popleft() if not waiter.done(): # 设置容器状态,使对应的协程可以继续执行 waiter.set_result(None) # 设置标记 self._wakeup_scheduled = True return def locked(self): return self._value == 0 async def acquire(self): # 如果`self._wakeup_scheduled`为True或者value小于0 while self._wakeup_scheduled or self._value <= 0: # 创建容器并等待执行完成 fut = self._get_loop().create_future() self._waiters.append(fut) try: await fut self._wakeup_scheduled = False except exceptions.CancelledError: # 如果被取消了,也要唤醒下一个协程 self._wake_up_next() raise self._value -= 1 return True def release(self): # 释放资源占用,唤醒下一个协程。 self._value += 1 self._wake_up_next()
针对asyncio.Semaphore
进行修改可以实现很多功能,比如基于信号量可以实现一个简单的协程池,这个协程池可以限制创建协程的量,当协程池满的时候就无法继续创建协程,只有协程中的协程执行完毕后才能继续创建(当然无法控制在协程中创建新的协程),代码如下:
import asyncio import time from typing import Coroutine class Pool(object): def __init__(self, max_concurrency: int): self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrency) async def create_task(self, coro: Coroutine) -> asyncio.Task: await self._semaphore.acquire() task: asyncio.Task = asyncio.create_task(coro) task.add_done_callback(lambda t: self._semaphore.release()) return task async def demo(cnt: int) -> None: print(f"{int(time.time())} create {cnt} task...") await asyncio.sleep(cnt) async def main() -> None: pool: Pool = Pool(3) for i in range(10): await pool.create_task(demo(i)) asyncio.run(main()) # >>> 1677517996 create 0 task... # >>> 1677517996 create 1 task... # >>> 1677517996 create 2 task... # >>> 1677517996 create 3 task... # >>> 1677517997 create 4 task... # >>> 1677517998 create 5 task... # >>> 1677517999 create 6 task... # >>> 1677518001 create 7 task... # >>> 1677518003 create 8 task... # >>> 1677518005 create 9 task...
The above is the detailed content of What are the commonly used functions of synchronization primitives in Python Asyncio library?. For more information, please follow other related articles on the PHP Chinese website!