ホームページ  >  記事  >  バックエンド開発  >  Python Asyncio ライブラリの同期プリミティブの一般的に使用される関数は何ですか?

Python Asyncio ライブラリの同期プリミティブの一般的に使用される関数は何ですか?

WBOY
WBOY転載
2023-05-09 10:35:041312ブラウズ

はじめに

Asyncio の同期プリミティブを使用すると、リソース競合コードの作成が簡素化され、リソース競合によって引き起こされるバグの発生を回避できます。ただし、コルーチンの特性により、ほとんどのビジネス コードではリソースの競合を考慮する必要がないため、Asyncio 同期プリミティブの使用頻度は低くなりますが、Asyncio# # を使用したい場合は、 #フレームワークを作成するときは、同期プリミティブの使用方法を学ぶ必要があります。

0. 基本

同期プリミティブは、特定の条件下で特定のリソースを競合するのに適しています。コード内のほとんどのリソースはコード ブロックに属し、

Pythonコード ブロック管理のベスト プラクティスは、with 構文を使用することです。with 構文は実際に __enter__ メソッドと __exit__ メソッドを呼び出します。次のコードのようなものです。

class Demo(object):
    def __enter__(self):
        return 
    def __exit__(self, exc_type, exc_val, exc_tb):
        return 
with Demo():
    pass

コード内の

Demo クラスは、__enter__ メソッドと __exit__ メソッドを実装します。その後、このメソッドを呼び出すことができます。 with 構文による。 __enter__ メソッドはコード ブロックの実行に入るロジックであり、__enxi__ メソッドはコード ブロックを終了するために使用されます (異常終了)ロジック。これら 2 つのメソッドは、同期プリミティブのリソースの競合と解放に準拠していますが、2 つのメソッド __enter____exit__await 呼び出しをサポートしていません。この問題を解決するために、Python では async with 構文を導入しました。

async with 構文は with 構文に似ており、__aenter____aexit__ を使用してクラスを作成するだけで済みます。このクラスは、次のような asyncio with 構文をサポートします。

import asyncio
class Demo(object):
    async def __aenter__(self):
        return
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        return
async def main():
    async with Demo():
        pass
asyncio.run(main())

このうち、クラス内の

__aenter__ メソッドは、コードを入力するときに実行されるメソッドです。ブロック、 __aexit__ は、コード ブロックを終了するときに実行されるメソッドです。

async with 構文の恩恵により、asyncio の同期プリミティブがより使いやすくなるため、asyncio でのリソースの競合が発生します。同期プリミティブは、_ContextManagerMixin クラス:

class _ContextManagerMixin:
    async def __aenter__(self):
        await self.acquire()
        # We have no use for the "as ..."  clause in the with
        # statement for locks.
        return None
    async def __aexit__(self, exc_type, exc, tb):
        self.release()

を継承し、

__aenter__ および __aenter__acquire メソッドと release メソッドを実装します。 __aexit__ メソッド呼び出しと同時に、同期プリミティブを使用するときは、リソース占有の解放忘れを防ぐために async with

構文を使用するようにします。

1.Lock

コルーチンの特性上、コルーチンのコードを書く際には基本的にロックを考慮することはできませんが、場合によってはロックを使用する必要があり、同時実行時のデータセキュリティが維持されます。

import asyncio
share_data = {}
async def sub(i):
    # 赋上相同的key和value
    share_data[i] = i
    await asyncio.sleep(0)
    print(i, share_data[i] == i)
async def sub_add(i):
    # 赋上的value值是原来的+1
    share_data[i] = i + 1
    await asyncio.sleep(0)
    print(i, share_data[i] == i + 1)
async def main():
    # 创建并发任务
    task_list = []
    for i in range(10):
        task_list.append(sub(i))
        task_list.append(sub_add(i))
    # 并发执行
    await asyncio.gather(*task_list)
if __name__ == "__main__":
    asyncio.run(main())
この例では、プログラムは sub 関数と sub_add 関数を同時に実行します。 ##asyncio.Task ドライバー。これは、そのようなシナリオが発生することを意味します。 sub(1) 関数の実行を担当する asyncio.Taskawait asyncio.sleep(( 0) を実行すると、積極的に制御を放棄して制御を返します。イベント ループ。イベント ループの次のディスパッチを待機します。ただし、イベント ループは空ではなく、すぐに次の asyncio.Task の実行を手配します。このとき、sub_add(1)# の share_data[i] ## 関数が最初に実行されます = i 1await asyncio.sleep(0) が実行されると同様にイベント ループに制御が移ります。この時点で、イベント ループから最初に sub(1) 関数を実行した asyncio.Task に制御が移ります。 ## 関数のロジックは続行されますが、share_data[i] のデータは share_data[i] = i 1 によって変更されているため、 の最終実行は終了します。 printshare_data[i] のデータは、本来意図されたデータではなく、ダーティ データになっています。 この問題を解決するには、次のように asyncio.Lock を使用してリソースの競合を解決します。
import asyncio
share_data = {}
# 存放对应资源的锁
lock_dict = {}
async def sub(i):
    async with lock_dict[i]:  # <-- 通过async with语句来控制锁的粒度
        share_data[i] = i
        await asyncio.sleep(0)
        print(i, share_data[i] == i)
async def sub_add(i):
    async with lock_dict[i]:
        share_data[i] = i + 1
        await asyncio.sleep(0)
        print(i, share_data[i] == i + 1)
async def main():
    task_list = []
    for i in range(10):
        lock_dict[i] = asyncio.Lock()
        task_list.append(sub(i))
        task_list.append(sub_add(i))
    await asyncio.gather(*task_list)
if __name__ == "__main__":
    asyncio.run(main())

例からわかるように

asyncio.Lock の使用方法はマルチスレッドの Lock

と同様で、

async with という構文でロックの取得と解放を行い、原理も非常にシンプルです。 #1. コルーチンがロックを取得した後の実行期間中、他のコルーチンは実行が完了してロックが解放されるまで待機する必要があることを確認します。

  • 2. コルーチンがロックを保持している場合、他のコルーチンは、ロックを保持しているコルーチンがロックを解放するまで待機する必要があります。

  • 2.すべてのコルーチンがロックを取得した順序で取得できることを確認します。

  • これは、現在ロックを保持しているコルーチンとロックを取得する次のコルーチンの間の関係を維持するためにデータ構造が必要であり、複数の取得を維持するためにキューも必要であることを意味します。 . ロック コルーチンのウェイクアップ シーケンス。

  • asyncio.Lock
は、他の

asyncio

関数と同じように使用されます。コルーチン間のロック ステータスを同期するには、

asyncio.Future を使用します。# を使用します。 ##deque コルーチン間のウェイクアップ シーケンスを維持する ソース コードは次のとおりです: <pre class="brush:py;">class Lockl(_ContextManagerMixin, mixins._LoopBoundMixin): def __init__(self): self._waiters = None self._locked = False def locked(self): return self._locked async def acquire(self): if (not self._locked and (self._waiters is None or all(w.cancelled() for w in self._waiters))): # 目前没有其他协程持有锁,当前协程可以运行 self._locked = True return True if self._waiters is None: self._waiters = collections.deque() # 创建属于自己的容器,并推送到`_waiters`这个双端队列中 fut = self._get_loop().create_future() self._waiters.append(fut) try: try: await fut finally: # 如果执行完毕,需要把自己移除,防止被`wake_up_first`调用 self._waiters.remove(fut) except exceptions.CancelledError: # 如果是等待的过程中被取消了,需要唤醒下一个调用`acquire` if not self._locked: self._wake_up_first() raise # 持有锁 self._locked = True return True def release(self): if self._locked: # 释放锁 self._locked = False self._wake_up_first() else: raise RuntimeError(&amp;#39;Lock is not acquired.&amp;#39;) def _wake_up_first(self): if not self._waiters: return # 获取还处于锁状态协程对应的容器 try: # 获取下一个等待获取锁的waiter fut = next(iter(self._waiters)) except StopIteration: return # 设置容器为True,这样对应协程就可以继续运行了。 if not fut.done(): fut.set_result(True)</pre> ソース コードから、ロックは主に取得と解放の機能を提供していることがわかります。ロックを取得するには状況を区別する必要があります: <ul class=" list-paddingleft-2"> <li><p>1:当有协程想要获取锁时会先判断锁是否被持有,如果当前锁没有被持有就直接返回,使协程能够正常运行。</p></li> <li><p>2:如果协程获取锁时,锁发现自己已经被其他协程持有则创建一个属于当前协程的<code>asyncio.Future,用来同步状态,并添加到deque中。

而对于释放锁就比较简单,只要获取deque中的第一个asyncio.Future,并通过fut.set_result(True)进行标记,使asyncio.Futurepeding状态变为done状态,这样一来,持有该asyncio.Future的协程就能继续运行,从而持有锁。

不过需要注意源码中acquire方法中对CancelledError异常进行捕获,再唤醒下一个锁,这是为了解决acquire方法执行异常导致锁一直被卡住的场景,通常情况下这能解决大部分的问题,但是如果遇到错误的封装时,我们需要亲自处理异常,并执行锁的唤醒。比如在通过继承asyncio.Lock编写一个超时锁时,最简单的实现代码如下:

import asyncio
class TimeoutLock(asyncio.Lock):
    def __init__(self, timeout, *, loop=None):
        self.timeout = timeout
        super().__init__(loop=loop)
    async def acquire(self) -> bool:
        return await asyncio.wait_for(super().acquire(), self.timeout)

这份代码非常简单,他只需要在__init__方法传入timeout参数,并在acuiqre方法中通过wait_for来实现锁超时即可,现在假设wait_for方法是一个无法传递协程cancel的方法,且编写的acquire没有进行捕获异常再释放锁的操作,当异常发生的时候会导致锁一直被卡住。 为了解决这个问题,只需要对TimeoutLockacquire方法添加异常捕获,并在捕获到异常时释放锁即可,代码如下:

class TimeoutLock(asyncio.Lock):
    def __init__(self, timeout, *, loop=None):
        self.timeout = timeout
        super().__init__(loop=loop)
    async def acquire(self) -> bool:
        try:
            return await asyncio.wait_for(super().acquire(), self.timeout)
        except Exception:
            self._wake_up_first()
            raise

2.Event

asyncio.Event也是一个简单的同步原语,但它跟asyncio.Lock不一样,asyncio.Lock是确保每个资源只能被一个协程操作,而asyncio.Event是确保某个资源何时可以被协程操作,可以认为asyncio.Lock锁的是资源,asyncio.Event锁的是协程,所以asyncio.Event并不需要acquire来锁资源,release释放资源,所以也用不到async with语法。

asyncio.Event的简单使用示例如下:

import asyncio
async def sub(event: asyncio.Event) -> None:
    await event.wait()
    print("I&#39;m Done")
async def main() -> None:
    event = asyncio.Event()
    for _ in range(10):
        asyncio.create_task(sub(event))
    await asyncio.sleep(1)
    event.set()
asyncio.run(main())

在这个例子中会先创建10个asyncio.Task来执行sub函数,但是所有sub函数都会在event.wait处等待,直到main函数中调用event.set后,所有的sub函数的event.wait会放行,使sub函数能继续执行。

可以看到asyncio.Event功能比较简单,它的源码实现也很简单,源码如下:

class Event(mixins._LoopBoundMixin):
    def __init__(self):
        self._waiters = collections.deque()
        self._value = False
    def is_set(self):
        return self._value
    def set(self):
        if not self._value:
            # 确保每次只能set一次
            self._value = True
            # 设置每个协程存放的容器为True,这样对应的协程就可以运行了
            for fut in self._waiters:
                if not fut.done():
                    fut.set_result(True)
    def clear(self):
        # 清理上一次的set
        self._value = False
    async def wait(self):
        if self._value:
            # 如果设置了,就不需要等待了
            return True
        # 否则需要创建一个容器,并需要等待容器完成
        fut = self._get_loop().create_future()
        self._waiters.append(fut)
        try:
            await fut
            return True
        finally:
            self._waiters.remove(fut)

通过源码可以看到wait方法主要是创建了一个asyncio.Future,并把它加入到deque队列后就一直等待着,而set方法被调用时会遍历整个deque队列,并把处于peding状态的asyncio.Future设置为done,这时其他在调用event.wait方法的协程就会得到放行。

通过源码也可以看出,asyncio.Event并没有继承于_ContextManagerMixin,这是因为它锁的是协程,而不是资源。

asyncio.Event的使用频率比asyncio.Lock多许多,不过通常都会让asyncio.Event和其他数据结构进行封装再使用,比如实现一个服务器的优雅关闭功能,这个功能会确保服务器在等待n秒后或者所有连接都关闭后才关闭服务器,这个功能就可以使用setasyncio.Event结合,如下:

import asyncio
class SetEvent(asyncio.Event):
    def __init__(self, *, loop=None):
        self._set = set()
        super().__init__(loop=loop)
    def add(self, value):
        self._set.add(value)
        self.clear()
    def remove(self, value):
        self._set.remove(value)
        if not self._set:
            self.set()

这个SetEvent结合了setSetEvent的功能,当set有数据的时候,会通过clear方法使SetEvent变为等待状态,而set没数据的时候,会通过set方法使SetEvent变为无需等待的状态,所有调用wait的协程都可以放行,通过这种结合,SetEvent拥有了等待资源为空的功能。 接下来就可以用于服务器的优雅退出功能:

async def mock_conn_io() -> None:
    await asyncio.sleep(1)
def conn_handle(set_event: SetEvent):
    task: asyncio.Task = asyncio.create_task(mock_conn_io())
    set_event.add(task)
    task.add_done_callback(lambda t: set_event.remove(t))
async def main():
    set_event: SetEvent = SetEvent()
    for _ in range(10):
        conn_handle(set_event)
	# 假设这里收到了退出信号
    await asyncio.wait(set_event.wait(), timeout=9)
asyncio.run(main())

在这个演示功能中,mock_conn_io用于模拟服务器的连接正在处理中,而conn_handle用于创建服务器连接,main则是先创建10个连接,并模拟在收到退出信号后等待资源为空或者超时才退出服务。

这只是简单的演示,实际上的优雅关闭功能要考虑的东西不仅仅是这些。

4.Condition

condition只做简单介绍

asyncio.Condition是同步原语中使用最少的一种,因为他使用情况很奇怪,而且大部分场景可以被其他写法代替,比如下面这个例子:

import asyncio
async def task(condition, work_list):
    await asyncio.sleep(1)
    work_list.append(33)
    print(&#39;Task sending notification...&#39;)
    async with condition:
        condition.notify()
async def main():
    condition = asyncio.Condition()
    work_list = list()
    print(&#39;Main waiting for data...&#39;)
    async with condition:
        _ = asyncio.create_task(task(condition, work_list))
        await condition.wait()
    print(f&#39;Got data: {work_list}&#39;)
asyncio.run(main())
# &gt;&gt;&gt; Main waiting for data...
# &gt;&gt;&gt; Task sending notification...
# &gt;&gt;&gt; Got data: [33]

在这个例子中可以看到,notifywait方法只能在async with condition中可以使用,如果没有在async with condition中使用则会报错,同时这个示例代码有点复杂,没办法一看就知道执行逻辑是什么,其实这个逻辑可以转变成一个更简单的写法:

import asyncio
async def task(work_list):
    await asyncio.sleep(1)
    work_list.append(33)
    print(&#39;Task sending notification...&#39;)
    return
async def main():
    work_list = list()
    print(&#39;Main waiting for data...&#39;)
    _task = asyncio.create_task(task(work_list))
    await _task
    print(f&#39;Got data: {work_list}&#39;)
asyncio.run(main())
# &gt;&gt;&gt; Main waiting for data...
# &gt;&gt;&gt; Task sending notification...
# &gt;&gt;&gt; Got data: [33]

通过这个代码可以看到这个写法更简单一点,而且更有逻辑性,而condition的写法却更有点Go协程写法/或者回调函数写法的感觉。 所以建议在认为自己的代码可能会用到asyncio.Conditon时需要先考虑到是否需要asyncio.Codition?是否有别的方案代替,如果没有才考虑去使用asyncio.Conditonk。

5.Semaphore

asyncio.Semaphore--信号量是同步原语中被使用最频繁的,大多数都是用在限流场景中,比如用在爬虫中和客户端网关中限制请求频率。

asyncio.Semaphore可以认为是一个延缓触发的asyncio.Lockasyncio.Semaphore内部会维护一个计数器,无论何时进行获取或释放,它都会递增或者递减(但不会超过边界值),当计数器归零时,就会进入到锁的逻辑,但是这个锁逻辑会在计数器大于0的时候释放j,它的用法如下:`

import asyncio
async def main():
    semaphore = asyncio.Semaphore(10):
    async with semaphore:
        pass
asyncio.run(main())

示例中代码通过async with来指明一个代码块(代码用pass代替),这个代码块是被asyncio.Semaphore管理的,每次协程在进入代码块时,asyncio.Semaphore的内部计数器就会递减一,而离开代码块则asyncio.Semaphore的内部计数器会递增一。

当有一个协程进入代码块时asyncio.Semaphore发现计数器已经为0了,则会使当前协程进入等待状态,直到某个协程离开这个代码块时,计数器会递增一,并唤醒等待的协程,使其能够进入代码块中继续执行。

asyncio.Semaphore的源码如下,需要注意的是由于asyncio.Semaphore是一个延缓的asyncio.Lock,所以当调用一次release后可能会导致被唤醒的协程和刚进入代码块的协程起冲突,所以在acquire方法中要通过一个while循环来解决这个问题:`

class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
    def __init__(self, value=1):
        if value < 0:
            raise ValueError("Semaphore initial value must be >= 0")
        self._value = value
        self._waiters = collections.deque()
        self._wakeup_scheduled = False
    def _wake_up_next(self):
        while self._waiters:
            # 按照放置顺序依次弹出容器 
            waiter = self._waiters.popleft()
            if not waiter.done():
                # 设置容器状态,使对应的协程可以继续执行
                waiter.set_result(None)
                # 设置标记 
                self._wakeup_scheduled = True
                return
    def locked(self):
        return self._value == 0
    async def acquire(self):
        # 如果`self._wakeup_scheduled`为True或者value小于0
        while self._wakeup_scheduled or self._value <= 0:
            # 创建容器并等待执行完成
            fut = self._get_loop().create_future()
            self._waiters.append(fut)
            try:
                await fut
                self._wakeup_scheduled = False
            except exceptions.CancelledError:
                # 如果被取消了,也要唤醒下一个协程
                self._wake_up_next()
                raise
        self._value -= 1
        return True
    def release(self):
        # 释放资源占用,唤醒下一个协程。
        self._value += 1
        self._wake_up_next()

针对asyncio.Semaphore进行修改可以实现很多功能,比如基于信号量可以实现一个简单的协程池,这个协程池可以限制创建协程的量,当协程池满的时候就无法继续创建协程,只有协程中的协程执行完毕后才能继续创建(当然无法控制在协程中创建新的协程),代码如下:

import asyncio
import time
from typing import Coroutine
class Pool(object):
    def __init__(self, max_concurrency: int):
        self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrency)
    async def create_task(self, coro: Coroutine) -> asyncio.Task:
        await  self._semaphore.acquire()
        task: asyncio.Task = asyncio.create_task(coro)
        task.add_done_callback(lambda t: self._semaphore.release())
        return task
async def demo(cnt: int) -> None:
    print(f"{int(time.time())} create {cnt} task...")
    await  asyncio.sleep(cnt)
async def main() -> None:
    pool: Pool = Pool(3)
    for i in range(10):
        await pool.create_task(demo(i))
asyncio.run(main())
# >>> 1677517996 create 0 task...
# >>> 1677517996 create 1 task...
# >>> 1677517996 create 2 task...
# >>> 1677517996 create 3 task...
# >>> 1677517997 create 4 task...
# >>> 1677517998 create 5 task...
# >>> 1677517999 create 6 task...
# >>> 1677518001 create 7 task...
# >>> 1677518003 create 8 task...
# >>> 1677518005 create 9 task...

以上がPython Asyncio ライブラリの同期プリミティブの一般的に使用される関数は何ですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。