Home  >  Article  >  Backend Development  >  What are the commonly used functions of synchronization primitives in Python Asyncio library?

What are the commonly used functions of synchronization primitives in Python Asyncio library?

WBOY
WBOYforward
2023-05-09 10:35:041266browse

Preface

Asyncio's synchronization primitives can simplify us in writing resource competition code and avoid the occurrence of bugs caused by resource competition. However, due to the characteristics of coroutines, there is no need to consider resource competition in most business codes, resulting in Asyncio synchronization primitives being used less frequently, but if you want to use AsyncioWhen writing a framework, you need to learn the use of synchronization primitives.

0. Basics

Synchronization primitives are suitable for competing for a certain resource under certain conditions. Most of the resources in the code belong to a code block, and PythonThe best practice for the management of code blocks is to use the with syntax. The with syntax actually calls __enter__ and __exit__ method, such as the following code:

class Demo(object):
    def __enter__(self):
        return 
    def __exit__(self, exc_type, exc_val, exc_tb):
        return 
with Demo():
    pass

The Demo class in the code implements the __enter__ and __exit__ methods After that, it can be called by the with syntax, where the __enter__ method is the logic to enter the code block execution, and the __enxi__ method is used to exit the code block (including abnormal exit ) logic. These two methods conform to the contention and release of resources in the synchronization primitive, but the two methods __enter__ and __exit__ do not support await calls. In order To solve this problem, Python introduced the async with syntax.

async with syntax is similar to with syntax, we only need to write a class with __aenter__ and __aexit__ methods, Then this class supports the asyncio with syntax, as follows:

import asyncio
class Demo(object):
    async def __aenter__(self):
        return
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        return
async def main():
    async with Demo():
        pass
asyncio.run(main())

Among them, the __aenter__ method in the class is the method executed when entering the code block, __aexit__ is the method executed when exiting the code block.

With the blessing of async with syntax, the synchronization primitives of asyncio will be more convenient to use, so the resource competition in asyncio Synchronization primitives will inherit from the _ContextManagerMixin class:

class _ContextManagerMixin:
    async def __aenter__(self):
        await self.acquire()
        # We have no use for the "as ..."  clause in the with
        # statement for locks.
        return None
    async def __aexit__(self, exc_type, exc, tb):
        self.release()

and implement the acquire and release methods for __aenter__ and __aexit__ method calls. At the same time, when using synchronization primitives, we try to use the async with syntax to prevent forgetting to release resource occupation.

1.Lock

Due to the characteristics of coroutines, locks can basically not be considered when writing coroutine code, but in some cases we still need to use locks and Data security during concurrency is maintained through locks, as shown in the following example:

import asyncio
share_data = {}
async def sub(i):
    # 赋上相同的key和value
    share_data[i] = i
    await asyncio.sleep(0)
    print(i, share_data[i] == i)
async def sub_add(i):
    # 赋上的value值是原来的+1
    share_data[i] = i + 1
    await asyncio.sleep(0)
    print(i, share_data[i] == i + 1)
async def main():
    # 创建并发任务
    task_list = []
    for i in range(10):
        task_list.append(sub(i))
        task_list.append(sub_add(i))
    # 并发执行
    await asyncio.gather(*task_list)
if __name__ == "__main__":
    asyncio.run(main())

In this example, the program will execute the sub and sub_add functions concurrently. They are composed of Different asyncio.Task drivers, which means that such a scenario will occur. When the asyncio.Task responsible for executing the sub(1) function executes await asyncio.sleep(( 0)Thus actively giving up control and returning it to the event loop, waiting for the next dispatch of the event loop. However, the event loop will not be empty, but will immediately arrange the execution of the next asyncio.Task. At this time, share_data[i] of the sub_add(1) function will be executed first. = i 1, and similarly transfer control to the event loop when executing await asyncio.sleep(0). At this time, control will be transferred from the event loop to asyncio.Task that originally executed the sub(1) function. After obtaining control lsub(1) The logic of the function will continue, but because the data of share_data[i] has been modified by share_data[i] = i 1, the final execution of print , the data in share_data[i] has become dirty data, not the originally intended data. In order to solve this problem, we can use

asyncio.Lock

to resolve resource conflicts, as follows: <pre class="brush:py;">import asyncio share_data = {} # 存放对应资源的锁 lock_dict = {} async def sub(i): async with lock_dict[i]: # &lt;-- 通过async with语句来控制锁的粒度 share_data[i] = i await asyncio.sleep(0) print(i, share_data[i] == i) async def sub_add(i): async with lock_dict[i]: share_data[i] = i + 1 await asyncio.sleep(0) print(i, share_data[i] == i + 1) async def main(): task_list = [] for i in range(10): lock_dict[i] = asyncio.Lock() task_list.append(sub(i)) task_list.append(sub_add(i)) await asyncio.gather(*task_list) if __name__ == &quot;__main__&quot;: asyncio.run(main())</pre>As you can see from the example

asyncio.Lock# The usage method of ## is similar to the multi-threaded

Lock. The lock is acquired and released through the async with syntax. Its principle is also very simple. It mainly does the following things:

1. Ensure that during the execution period after a coroutine acquires the lock, other coroutines need to wait until the execution is completed and the lock is released.
  • 2. When a coroutine holds a lock, other coroutines must wait until the coroutine holding the lock releases the lock.
  • 2. Ensure that all coroutines can acquire locks in the order they are acquired.
  • This means that a data structure is needed to maintain the relationship between the coroutine currently holding the lock and the next coroutine that acquires the lock, and a queue is also needed to maintain multiple acquisitions. The wake-up sequence of the lock coroutine.

asyncio.Lock

is used the same as other

asyncio functions. Use asyncio.Future to synchronize the lock status between coroutines. Use deque Maintain the wake-up sequence between coroutines. The source code is as follows:

class Lockl(_ContextManagerMixin, mixins._LoopBoundMixin):
    def __init__(self):
        self._waiters = None
        self._locked = False
    def locked(self):
        return self._locked
    async def acquire(self):
        if (not self._locked and (self._waiters is None or all(w.cancelled() for w in self._waiters))):
            # 目前没有其他协程持有锁,当前协程可以运行
            self._locked = True
            return True
        if self._waiters is None:
            self._waiters = collections.deque()
        # 创建属于自己的容器,并推送到`_waiters`这个双端队列中
        fut = self._get_loop().create_future()
        self._waiters.append(fut)
        try:
            try:
                await fut
            finally:
                # 如果执行完毕,需要把自己移除,防止被`wake_up_first`调用
                self._waiters.remove(fut)
        except exceptions.CancelledError:
            # 如果是等待的过程中被取消了,需要唤醒下一个调用`acquire`
            if not self._locked:
                self._wake_up_first()
            raise
        # 持有锁
        self._locked = True
        return True
    def release(self):
        if self._locked:
            # 释放锁
            self._locked = False
            self._wake_up_first()
        else:
            raise RuntimeError(&#39;Lock is not acquired.&#39;)
    def _wake_up_first(self):
        if not self._waiters:
            return
        # 获取还处于锁状态协程对应的容器
        try:
            # 获取下一个等待获取锁的waiter
            fut = next(iter(self._waiters))
        except StopIteration:
            return
        # 设置容器为True,这样对应协程就可以继续运行了。
        if not fut.done():
            fut.set_result(True)
From the source code, we can know that the lock mainly provides the functions of acquisition and release. Two situations need to be distinguished for acquiring the lock:

  • 1:当有协程想要获取锁时会先判断锁是否被持有,如果当前锁没有被持有就直接返回,使协程能够正常运行。

  • 2:如果协程获取锁时,锁发现自己已经被其他协程持有则创建一个属于当前协程的asyncio.Future,用来同步状态,并添加到deque中。

而对于释放锁就比较简单,只要获取deque中的第一个asyncio.Future,并通过fut.set_result(True)进行标记,使asyncio.Futurepeding状态变为done状态,这样一来,持有该asyncio.Future的协程就能继续运行,从而持有锁。

不过需要注意源码中acquire方法中对CancelledError异常进行捕获,再唤醒下一个锁,这是为了解决acquire方法执行异常导致锁一直被卡住的场景,通常情况下这能解决大部分的问题,但是如果遇到错误的封装时,我们需要亲自处理异常,并执行锁的唤醒。比如在通过继承asyncio.Lock编写一个超时锁时,最简单的实现代码如下:

import asyncio
class TimeoutLock(asyncio.Lock):
    def __init__(self, timeout, *, loop=None):
        self.timeout = timeout
        super().__init__(loop=loop)
    async def acquire(self) -> bool:
        return await asyncio.wait_for(super().acquire(), self.timeout)

这份代码非常简单,他只需要在__init__方法传入timeout参数,并在acuiqre方法中通过wait_for来实现锁超时即可,现在假设wait_for方法是一个无法传递协程cancel的方法,且编写的acquire没有进行捕获异常再释放锁的操作,当异常发生的时候会导致锁一直被卡住。 为了解决这个问题,只需要对TimeoutLockacquire方法添加异常捕获,并在捕获到异常时释放锁即可,代码如下:

class TimeoutLock(asyncio.Lock):
    def __init__(self, timeout, *, loop=None):
        self.timeout = timeout
        super().__init__(loop=loop)
    async def acquire(self) -> bool:
        try:
            return await asyncio.wait_for(super().acquire(), self.timeout)
        except Exception:
            self._wake_up_first()
            raise

2.Event

asyncio.Event也是一个简单的同步原语,但它跟asyncio.Lock不一样,asyncio.Lock是确保每个资源只能被一个协程操作,而asyncio.Event是确保某个资源何时可以被协程操作,可以认为asyncio.Lock锁的是资源,asyncio.Event锁的是协程,所以asyncio.Event并不需要acquire来锁资源,release释放资源,所以也用不到async with语法。

asyncio.Event的简单使用示例如下:

import asyncio
async def sub(event: asyncio.Event) -> None:
    await event.wait()
    print("I&#39;m Done")
async def main() -> None:
    event = asyncio.Event()
    for _ in range(10):
        asyncio.create_task(sub(event))
    await asyncio.sleep(1)
    event.set()
asyncio.run(main())

在这个例子中会先创建10个asyncio.Task来执行sub函数,但是所有sub函数都会在event.wait处等待,直到main函数中调用event.set后,所有的sub函数的event.wait会放行,使sub函数能继续执行。

可以看到asyncio.Event功能比较简单,它的源码实现也很简单,源码如下:

class Event(mixins._LoopBoundMixin):
    def __init__(self):
        self._waiters = collections.deque()
        self._value = False
    def is_set(self):
        return self._value
    def set(self):
        if not self._value:
            # 确保每次只能set一次
            self._value = True
            # 设置每个协程存放的容器为True,这样对应的协程就可以运行了
            for fut in self._waiters:
                if not fut.done():
                    fut.set_result(True)
    def clear(self):
        # 清理上一次的set
        self._value = False
    async def wait(self):
        if self._value:
            # 如果设置了,就不需要等待了
            return True
        # 否则需要创建一个容器,并需要等待容器完成
        fut = self._get_loop().create_future()
        self._waiters.append(fut)
        try:
            await fut
            return True
        finally:
            self._waiters.remove(fut)

通过源码可以看到wait方法主要是创建了一个asyncio.Future,并把它加入到deque队列后就一直等待着,而set方法被调用时会遍历整个deque队列,并把处于peding状态的asyncio.Future设置为done,这时其他在调用event.wait方法的协程就会得到放行。

通过源码也可以看出,asyncio.Event并没有继承于_ContextManagerMixin,这是因为它锁的是协程,而不是资源。

asyncio.Event的使用频率比asyncio.Lock多许多,不过通常都会让asyncio.Event和其他数据结构进行封装再使用,比如实现一个服务器的优雅关闭功能,这个功能会确保服务器在等待n秒后或者所有连接都关闭后才关闭服务器,这个功能就可以使用setasyncio.Event结合,如下:

import asyncio
class SetEvent(asyncio.Event):
    def __init__(self, *, loop=None):
        self._set = set()
        super().__init__(loop=loop)
    def add(self, value):
        self._set.add(value)
        self.clear()
    def remove(self, value):
        self._set.remove(value)
        if not self._set:
            self.set()

这个SetEvent结合了setSetEvent的功能,当set有数据的时候,会通过clear方法使SetEvent变为等待状态,而set没数据的时候,会通过set方法使SetEvent变为无需等待的状态,所有调用wait的协程都可以放行,通过这种结合,SetEvent拥有了等待资源为空的功能。 接下来就可以用于服务器的优雅退出功能:

async def mock_conn_io() -> None:
    await asyncio.sleep(1)
def conn_handle(set_event: SetEvent):
    task: asyncio.Task = asyncio.create_task(mock_conn_io())
    set_event.add(task)
    task.add_done_callback(lambda t: set_event.remove(t))
async def main():
    set_event: SetEvent = SetEvent()
    for _ in range(10):
        conn_handle(set_event)
	# 假设这里收到了退出信号
    await asyncio.wait(set_event.wait(), timeout=9)
asyncio.run(main())

在这个演示功能中,mock_conn_io用于模拟服务器的连接正在处理中,而conn_handle用于创建服务器连接,main则是先创建10个连接,并模拟在收到退出信号后等待资源为空或者超时才退出服务。

这只是简单的演示,实际上的优雅关闭功能要考虑的东西不仅仅是这些。

4.Condition

condition只做简单介绍

asyncio.Condition是同步原语中使用最少的一种,因为他使用情况很奇怪,而且大部分场景可以被其他写法代替,比如下面这个例子:

import asyncio
async def task(condition, work_list):
    await asyncio.sleep(1)
    work_list.append(33)
    print(&#39;Task sending notification...&#39;)
    async with condition:
        condition.notify()
async def main():
    condition = asyncio.Condition()
    work_list = list()
    print(&#39;Main waiting for data...&#39;)
    async with condition:
        _ = asyncio.create_task(task(condition, work_list))
        await condition.wait()
    print(f&#39;Got data: {work_list}&#39;)
asyncio.run(main())
# &gt;&gt;&gt; Main waiting for data...
# &gt;&gt;&gt; Task sending notification...
# &gt;&gt;&gt; Got data: [33]

在这个例子中可以看到,notifywait方法只能在async with condition中可以使用,如果没有在async with condition中使用则会报错,同时这个示例代码有点复杂,没办法一看就知道执行逻辑是什么,其实这个逻辑可以转变成一个更简单的写法:

import asyncio
async def task(work_list):
    await asyncio.sleep(1)
    work_list.append(33)
    print(&#39;Task sending notification...&#39;)
    return
async def main():
    work_list = list()
    print(&#39;Main waiting for data...&#39;)
    _task = asyncio.create_task(task(work_list))
    await _task
    print(f&#39;Got data: {work_list}&#39;)
asyncio.run(main())
# &gt;&gt;&gt; Main waiting for data...
# &gt;&gt;&gt; Task sending notification...
# &gt;&gt;&gt; Got data: [33]

通过这个代码可以看到这个写法更简单一点,而且更有逻辑性,而condition的写法却更有点Go协程写法/或者回调函数写法的感觉。 所以建议在认为自己的代码可能会用到asyncio.Conditon时需要先考虑到是否需要asyncio.Codition?是否有别的方案代替,如果没有才考虑去使用asyncio.Conditonk。

5.Semaphore

asyncio.Semaphore--信号量是同步原语中被使用最频繁的,大多数都是用在限流场景中,比如用在爬虫中和客户端网关中限制请求频率。

asyncio.Semaphore可以认为是一个延缓触发的asyncio.Lockasyncio.Semaphore内部会维护一个计数器,无论何时进行获取或释放,它都会递增或者递减(但不会超过边界值),当计数器归零时,就会进入到锁的逻辑,但是这个锁逻辑会在计数器大于0的时候释放j,它的用法如下:`

import asyncio
async def main():
    semaphore = asyncio.Semaphore(10):
    async with semaphore:
        pass
asyncio.run(main())

示例中代码通过async with来指明一个代码块(代码用pass代替),这个代码块是被asyncio.Semaphore管理的,每次协程在进入代码块时,asyncio.Semaphore的内部计数器就会递减一,而离开代码块则asyncio.Semaphore的内部计数器会递增一。

当有一个协程进入代码块时asyncio.Semaphore发现计数器已经为0了,则会使当前协程进入等待状态,直到某个协程离开这个代码块时,计数器会递增一,并唤醒等待的协程,使其能够进入代码块中继续执行。

asyncio.Semaphore的源码如下,需要注意的是由于asyncio.Semaphore是一个延缓的asyncio.Lock,所以当调用一次release后可能会导致被唤醒的协程和刚进入代码块的协程起冲突,所以在acquire方法中要通过一个while循环来解决这个问题:`

class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
    def __init__(self, value=1):
        if value < 0:
            raise ValueError("Semaphore initial value must be >= 0")
        self._value = value
        self._waiters = collections.deque()
        self._wakeup_scheduled = False
    def _wake_up_next(self):
        while self._waiters:
            # 按照放置顺序依次弹出容器 
            waiter = self._waiters.popleft()
            if not waiter.done():
                # 设置容器状态,使对应的协程可以继续执行
                waiter.set_result(None)
                # 设置标记 
                self._wakeup_scheduled = True
                return
    def locked(self):
        return self._value == 0
    async def acquire(self):
        # 如果`self._wakeup_scheduled`为True或者value小于0
        while self._wakeup_scheduled or self._value <= 0:
            # 创建容器并等待执行完成
            fut = self._get_loop().create_future()
            self._waiters.append(fut)
            try:
                await fut
                self._wakeup_scheduled = False
            except exceptions.CancelledError:
                # 如果被取消了,也要唤醒下一个协程
                self._wake_up_next()
                raise
        self._value -= 1
        return True
    def release(self):
        # 释放资源占用,唤醒下一个协程。
        self._value += 1
        self._wake_up_next()

针对asyncio.Semaphore进行修改可以实现很多功能,比如基于信号量可以实现一个简单的协程池,这个协程池可以限制创建协程的量,当协程池满的时候就无法继续创建协程,只有协程中的协程执行完毕后才能继续创建(当然无法控制在协程中创建新的协程),代码如下:

import asyncio
import time
from typing import Coroutine
class Pool(object):
    def __init__(self, max_concurrency: int):
        self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrency)
    async def create_task(self, coro: Coroutine) -> asyncio.Task:
        await  self._semaphore.acquire()
        task: asyncio.Task = asyncio.create_task(coro)
        task.add_done_callback(lambda t: self._semaphore.release())
        return task
async def demo(cnt: int) -> None:
    print(f"{int(time.time())} create {cnt} task...")
    await  asyncio.sleep(cnt)
async def main() -> None:
    pool: Pool = Pool(3)
    for i in range(10):
        await pool.create_task(demo(i))
asyncio.run(main())
# >>> 1677517996 create 0 task...
# >>> 1677517996 create 1 task...
# >>> 1677517996 create 2 task...
# >>> 1677517996 create 3 task...
# >>> 1677517997 create 4 task...
# >>> 1677517998 create 5 task...
# >>> 1677517999 create 6 task...
# >>> 1677518001 create 7 task...
# >>> 1677518003 create 8 task...
# >>> 1677518005 create 9 task...

The above is the detailed content of What are the commonly used functions of synchronization primitives in Python Asyncio library?. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete