Rumah  >  Artikel  >  pembangunan bahagian belakang  >  Apakah fungsi primitif penyegerakan yang biasa digunakan dalam perpustakaan Python Asyncio?

Apakah fungsi primitif penyegerakan yang biasa digunakan dalam perpustakaan Python Asyncio?

WBOY
WBOYke hadapan
2023-05-09 10:35:041319semak imbas

Prakata

Asyncio Primitif penyegerakan boleh memudahkan kita menulis kod untuk persaingan sumber dan mengelakkan berlakunya pepijat yang disebabkan oleh persaingan sumber. Walau bagaimanapun, disebabkan oleh ciri-ciri coroutine, tidak perlu mempertimbangkan persaingan sumber dalam kebanyakan kod perniagaan Akibatnya, penyegerakan primitif Asyncio kurang kerap digunakan Walau bagaimanapun, jika anda ingin menulis rangka kerja berdasarkan Asyncio, anda perlu mempelajari penggunaan primitif.

0. Asas

Primitif penyegerakan sesuai untuk bersaing untuk sumber tertentu dalam keadaan tertentu Kebanyakan sumber dalam kod adalah milik blok kod, dan PythonAmalan terbaik untuk pengurusan blok kod adalah menggunakan sintaks with Sintaks with sebenarnya memanggil kaedah __enter__ dan __exit__ dalam kelas, seperti kod berikut:

class Demo(object):
    def __enter__(self):
        return 
    def __exit__(self, exc_type, exc_val, exc_tb):
        return 
with Demo():
    pass

Selepas <.> kelas dalam kod melaksanakan kaedah Demo dan __enter__, ia boleh dipanggil dengan sintaks __exit__ Kaedah with ialah logik untuk memasukkan pelaksanaan blok kod, dan kaedah __enter__ ialah. digunakan untuk keluar Logik untuk blok kod (termasuk keluar pengecualian). Kedua-dua kaedah ini konsisten dengan pertikaian dan pelepasan sumber dalam primitif penyegerakan, tetapi kedua-dua kaedah __enxi__ dan __enter__ tidak menyokong panggilan __exit__ Untuk menyelesaikan masalah ini, await memperkenalkan tatabahasa Python. async with

sintaks adalah serupa dengan sintaks async with Kita hanya perlu menulis kelas dengan kaedah with dan __aenter__, maka kelas ini akan menyokong sintaks __aexit__, seperti berikut: asyncio with

import asyncio
class Demo(object):
    async def __aenter__(self):
        return
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        return
async def main():
    async with Demo():
        pass
asyncio.run(main())

Antaranya, kaedah

dalam kelas ialah kaedah yang dilaksanakan apabila memasuki blok kod, dan __aenter__ ialah kaedah yang dilaksanakan apabila keluar dari blok kod. __aexit__

Dengan berkat sintaks

, primitif penyegerakan async with akan lebih mudah digunakan, jadi primitif penyegerakan untuk persaingan sumber dalam asyncio akan diwarisi daripada kelas asyncio: _ContextManagerMixin

class _ContextManagerMixin:
    async def __aenter__(self):
        await self.acquire()
        # We have no use for the "as ..."  clause in the with
        # statement for locks.
        return None
    async def __aexit__(self, exc_type, exc, tb):
        self.release()

dan melaksanakan kaedah

dan acquire untuk panggilan dengan kaedah release dan __aenter__ Pada masa yang sama, kami cuba menggunakan sintaks __aexit__ apabila menggunakan primitif penyegerakan untuk. mengelakkan lupa untuk melepaskan pekerjaan sumber. async with

1.Kunci

Disebabkan ciri coroutine, kunci pada asasnya tidak boleh dipertimbangkan semasa menulis kod coroutine, tetapi dalam beberapa kes kita masih perlu menggunakan kunci dan Gunakan kunci untuk mengekalkan keselamatan data semasa konkurensi, seperti yang ditunjukkan dalam contoh berikut:

import asyncio
share_data = {}
async def sub(i):
    # 赋上相同的key和value
    share_data[i] = i
    await asyncio.sleep(0)
    print(i, share_data[i] == i)
async def sub_add(i):
    # 赋上的value值是原来的+1
    share_data[i] = i + 1
    await asyncio.sleep(0)
    print(i, share_data[i] == i + 1)
async def main():
    # 创建并发任务
    task_list = []
    for i in range(10):
        task_list.append(sub(i))
        task_list.append(sub_add(i))
    # 并发执行
    await asyncio.gather(*task_list)
if __name__ == "__main__":
    asyncio.run(main())

Dalam contoh ini, program akan melaksanakan fungsi

dan sub secara serentak, dan ia didorong oleh sub_add yang berbeza Ini bermakna a senario seperti ini akan berlaku. Apabila asyncio.Task bertanggungjawab untuk melaksanakan fungsi sub(1) melaksanakan asyncio.Task selepas melaksanakan share_data[i]=i, ia secara aktif melepaskan kawalan dan mengembalikannya ke gelung acara, menunggu penghantaran seterusnya bagi gelung acara. Walau bagaimanapun, gelung acara tidak akan kosong, tetapi akan segera mengatur pelaksanaan await asyncio.sleep(0) seterusnya Pada masa ini, fungsi asyncio.Task sub_add(1) akan dilaksanakan terlebih dahulu dan kawalan akan diserahkan kepada. fungsi yang sama apabila share_data[i] = i + 1 dilaksanakan. Pada masa ini, kawalan akan dipindahkan daripada gelung peristiwa ke await asyncio.sleep(0) yang pada asalnya melaksanakan fungsi sub(1) Selepas mendapatkan kawalan l, logik fungsi asyncio.Task akan diteruskan, tetapi kerana data sub(1) mempunyai. telah diubah suai oleh share_data[i] Akibatnya, apabila share_data[i] = i + 1 akhirnya dilaksanakan, data print telah menjadi data kotor dan bukannya data asalnya. share_data[i]

Untuk menyelesaikan masalah ini, kita boleh menggunakan

untuk menyelesaikan konflik sumber, seperti berikut: asyncio.Lock

import asyncio
share_data = {}
# 存放对应资源的锁
lock_dict = {}
async def sub(i):
    async with lock_dict[i]:  # <-- 通过async with语句来控制锁的粒度
        share_data[i] = i
        await asyncio.sleep(0)
        print(i, share_data[i] == i)
async def sub_add(i):
    async with lock_dict[i]:
        share_data[i] = i + 1
        await asyncio.sleep(0)
        print(i, share_data[i] == i + 1)
async def main():
    task_list = []
    for i in range(10):
        lock_dict[i] = asyncio.Lock()
        task_list.append(sub(i))
        task_list.append(sub_add(i))
    await asyncio.gather(*task_list)
if __name__ == "__main__":
    asyncio.run(main())

Daripada contoh, anda dapat melihat bahawa penggunaan

adalah serupa kepada asyncio.Lock berbilang benang, sintaks Lock digunakan untuk memperoleh dan melepaskan kunci Prinsipnya juga sangat mudah: async with

  • 1. Pastikan coroutine memperoleh kunci Semasa pelaksanaan, coroutine lain perlu menunggu sehingga pelaksanaan selesai dan kunci dilepaskan apabila memperoleh kunci.

  • 2. Apabila coroutine memegang kunci, coroutine lain mesti menunggu sehingga coroutine yang memegang kunci melepaskan kunci.

  • 2. Pastikan semua coroutine boleh memperoleh kunci mengikut susunan yang diperolehi.

Ini bermakna struktur data diperlukan untuk mengekalkan hubungan antara coroutine yang sedang memegang kunci dan coroutine seterusnya yang memperoleh kunci, dan baris gilir juga diperlukan untuk mengekalkan berbilang pemerolehan . Urutan bangun coroutine kunci.

digunakan dengan cara yang sama seperti fungsi asyncio.Lock yang lain Gunakan asyncio untuk menyegerakkan status kunci antara coroutine dan gunakan asyncio.Future untuk mengekalkan urutan bangun antara coroutine kod sumber adalah seperti berikut: deque

class Lockl(_ContextManagerMixin, mixins._LoopBoundMixin):
    def __init__(self):
        self._waiters = None
        self._locked = False
    def locked(self):
        return self._locked
    async def acquire(self):
        if (not self._locked and (self._waiters is None or all(w.cancelled() for w in self._waiters))):
            # 目前没有其他协程持有锁,当前协程可以运行
            self._locked = True
            return True
        if self._waiters is None:
            self._waiters = collections.deque()
        # 创建属于自己的容器,并推送到`_waiters`这个双端队列中
        fut = self._get_loop().create_future()
        self._waiters.append(fut)
        try:
            try:
                await fut
            finally:
                # 如果执行完毕,需要把自己移除,防止被`wake_up_first`调用
                self._waiters.remove(fut)
        except exceptions.CancelledError:
            # 如果是等待的过程中被取消了,需要唤醒下一个调用`acquire`
            if not self._locked:
                self._wake_up_first()
            raise
        # 持有锁
        self._locked = True
        return True
    def release(self):
        if self._locked:
            # 释放锁
            self._locked = False
            self._wake_up_first()
        else:
            raise RuntimeError(&#39;Lock is not acquired.&#39;)
    def _wake_up_first(self):
        if not self._waiters:
            return
        # 获取还处于锁状态协程对应的容器
        try:
            # 获取下一个等待获取锁的waiter
            fut = next(iter(self._waiters))
        except StopIteration:
            return
        # 设置容器为True,这样对应协程就可以继续运行了。
        if not fut.done():
            fut.set_result(True)

Seperti yang anda boleh ketahui daripada kod sumber, kunci terutamanya menyediakan fungsi pemerolehan dan pelepasan Terdapat dua situasi yang perlu dibezakan untuk memperoleh kunci:

  • 1:当有协程想要获取锁时会先判断锁是否被持有,如果当前锁没有被持有就直接返回,使协程能够正常运行。

  • 2:如果协程获取锁时,锁发现自己已经被其他协程持有则创建一个属于当前协程的asyncio.Future,用来同步状态,并添加到deque中。

而对于释放锁就比较简单,只要获取deque中的第一个asyncio.Future,并通过fut.set_result(True)进行标记,使asyncio.Futurepeding状态变为done状态,这样一来,持有该asyncio.Future的协程就能继续运行,从而持有锁。

不过需要注意源码中acquire方法中对CancelledError异常进行捕获,再唤醒下一个锁,这是为了解决acquire方法执行异常导致锁一直被卡住的场景,通常情况下这能解决大部分的问题,但是如果遇到错误的封装时,我们需要亲自处理异常,并执行锁的唤醒。比如在通过继承asyncio.Lock编写一个超时锁时,最简单的实现代码如下:

import asyncio
class TimeoutLock(asyncio.Lock):
    def __init__(self, timeout, *, loop=None):
        self.timeout = timeout
        super().__init__(loop=loop)
    async def acquire(self) -> bool:
        return await asyncio.wait_for(super().acquire(), self.timeout)

这份代码非常简单,他只需要在__init__方法传入timeout参数,并在acuiqre方法中通过wait_for来实现锁超时即可,现在假设wait_for方法是一个无法传递协程cancel的方法,且编写的acquire没有进行捕获异常再释放锁的操作,当异常发生的时候会导致锁一直被卡住。 为了解决这个问题,只需要对TimeoutLockacquire方法添加异常捕获,并在捕获到异常时释放锁即可,代码如下:

class TimeoutLock(asyncio.Lock):
    def __init__(self, timeout, *, loop=None):
        self.timeout = timeout
        super().__init__(loop=loop)
    async def acquire(self) -> bool:
        try:
            return await asyncio.wait_for(super().acquire(), self.timeout)
        except Exception:
            self._wake_up_first()
            raise

2.Event

asyncio.Event也是一个简单的同步原语,但它跟asyncio.Lock不一样,asyncio.Lock是确保每个资源只能被一个协程操作,而asyncio.Event是确保某个资源何时可以被协程操作,可以认为asyncio.Lock锁的是资源,asyncio.Event锁的是协程,所以asyncio.Event并不需要acquire来锁资源,release释放资源,所以也用不到async with语法。

asyncio.Event的简单使用示例如下:

import asyncio
async def sub(event: asyncio.Event) -> None:
    await event.wait()
    print("I&#39;m Done")
async def main() -> None:
    event = asyncio.Event()
    for _ in range(10):
        asyncio.create_task(sub(event))
    await asyncio.sleep(1)
    event.set()
asyncio.run(main())

在这个例子中会先创建10个asyncio.Task来执行sub函数,但是所有sub函数都会在event.wait处等待,直到main函数中调用event.set后,所有的sub函数的event.wait会放行,使sub函数能继续执行。

可以看到asyncio.Event功能比较简单,它的源码实现也很简单,源码如下:

class Event(mixins._LoopBoundMixin):
    def __init__(self):
        self._waiters = collections.deque()
        self._value = False
    def is_set(self):
        return self._value
    def set(self):
        if not self._value:
            # 确保每次只能set一次
            self._value = True
            # 设置每个协程存放的容器为True,这样对应的协程就可以运行了
            for fut in self._waiters:
                if not fut.done():
                    fut.set_result(True)
    def clear(self):
        # 清理上一次的set
        self._value = False
    async def wait(self):
        if self._value:
            # 如果设置了,就不需要等待了
            return True
        # 否则需要创建一个容器,并需要等待容器完成
        fut = self._get_loop().create_future()
        self._waiters.append(fut)
        try:
            await fut
            return True
        finally:
            self._waiters.remove(fut)

通过源码可以看到wait方法主要是创建了一个asyncio.Future,并把它加入到deque队列后就一直等待着,而set方法被调用时会遍历整个deque队列,并把处于peding状态的asyncio.Future设置为done,这时其他在调用event.wait方法的协程就会得到放行。

通过源码也可以看出,asyncio.Event并没有继承于_ContextManagerMixin,这是因为它锁的是协程,而不是资源。

asyncio.Event的使用频率比asyncio.Lock多许多,不过通常都会让asyncio.Event和其他数据结构进行封装再使用,比如实现一个服务器的优雅关闭功能,这个功能会确保服务器在等待n秒后或者所有连接都关闭后才关闭服务器,这个功能就可以使用setasyncio.Event结合,如下:

import asyncio
class SetEvent(asyncio.Event):
    def __init__(self, *, loop=None):
        self._set = set()
        super().__init__(loop=loop)
    def add(self, value):
        self._set.add(value)
        self.clear()
    def remove(self, value):
        self._set.remove(value)
        if not self._set:
            self.set()

这个SetEvent结合了setSetEvent的功能,当set有数据的时候,会通过clear方法使SetEvent变为等待状态,而set没数据的时候,会通过set方法使SetEvent变为无需等待的状态,所有调用wait的协程都可以放行,通过这种结合,SetEvent拥有了等待资源为空的功能。 接下来就可以用于服务器的优雅退出功能:

async def mock_conn_io() -> None:
    await asyncio.sleep(1)
def conn_handle(set_event: SetEvent):
    task: asyncio.Task = asyncio.create_task(mock_conn_io())
    set_event.add(task)
    task.add_done_callback(lambda t: set_event.remove(t))
async def main():
    set_event: SetEvent = SetEvent()
    for _ in range(10):
        conn_handle(set_event)
	# 假设这里收到了退出信号
    await asyncio.wait(set_event.wait(), timeout=9)
asyncio.run(main())

在这个演示功能中,mock_conn_io用于模拟服务器的连接正在处理中,而conn_handle用于创建服务器连接,main则是先创建10个连接,并模拟在收到退出信号后等待资源为空或者超时才退出服务。

这只是简单的演示,实际上的优雅关闭功能要考虑的东西不仅仅是这些。

4.Condition

condition只做简单介绍

asyncio.Condition是同步原语中使用最少的一种,因为他使用情况很奇怪,而且大部分场景可以被其他写法代替,比如下面这个例子:

import asyncio
async def task(condition, work_list):
    await asyncio.sleep(1)
    work_list.append(33)
    print(&#39;Task sending notification...&#39;)
    async with condition:
        condition.notify()
async def main():
    condition = asyncio.Condition()
    work_list = list()
    print(&#39;Main waiting for data...&#39;)
    async with condition:
        _ = asyncio.create_task(task(condition, work_list))
        await condition.wait()
    print(f&#39;Got data: {work_list}&#39;)
asyncio.run(main())
# &gt;&gt;&gt; Main waiting for data...
# &gt;&gt;&gt; Task sending notification...
# &gt;&gt;&gt; Got data: [33]

在这个例子中可以看到,notifywait方法只能在async with condition中可以使用,如果没有在async with condition中使用则会报错,同时这个示例代码有点复杂,没办法一看就知道执行逻辑是什么,其实这个逻辑可以转变成一个更简单的写法:

import asyncio
async def task(work_list):
    await asyncio.sleep(1)
    work_list.append(33)
    print(&#39;Task sending notification...&#39;)
    return
async def main():
    work_list = list()
    print(&#39;Main waiting for data...&#39;)
    _task = asyncio.create_task(task(work_list))
    await _task
    print(f&#39;Got data: {work_list}&#39;)
asyncio.run(main())
# &gt;&gt;&gt; Main waiting for data...
# &gt;&gt;&gt; Task sending notification...
# &gt;&gt;&gt; Got data: [33]

通过这个代码可以看到这个写法更简单一点,而且更有逻辑性,而condition的写法却更有点Go协程写法/或者回调函数写法的感觉。 所以建议在认为自己的代码可能会用到asyncio.Conditon时需要先考虑到是否需要asyncio.Codition?是否有别的方案代替,如果没有才考虑去使用asyncio.Conditonk。

5.Semaphore

asyncio.Semaphore--信号量是同步原语中被使用最频繁的,大多数都是用在限流场景中,比如用在爬虫中和客户端网关中限制请求频率。

asyncio.Semaphore可以认为是一个延缓触发的asyncio.Lockasyncio.Semaphore内部会维护一个计数器,无论何时进行获取或释放,它都会递增或者递减(但不会超过边界值),当计数器归零时,就会进入到锁的逻辑,但是这个锁逻辑会在计数器大于0的时候释放j,它的用法如下:`

import asyncio
async def main():
    semaphore = asyncio.Semaphore(10):
    async with semaphore:
        pass
asyncio.run(main())

示例中代码通过async with来指明一个代码块(代码用pass代替),这个代码块是被asyncio.Semaphore管理的,每次协程在进入代码块时,asyncio.Semaphore的内部计数器就会递减一,而离开代码块则asyncio.Semaphore的内部计数器会递增一。

当有一个协程进入代码块时asyncio.Semaphore发现计数器已经为0了,则会使当前协程进入等待状态,直到某个协程离开这个代码块时,计数器会递增一,并唤醒等待的协程,使其能够进入代码块中继续执行。

asyncio.Semaphore的源码如下,需要注意的是由于asyncio.Semaphore是一个延缓的asyncio.Lock,所以当调用一次release后可能会导致被唤醒的协程和刚进入代码块的协程起冲突,所以在acquire方法中要通过一个while循环来解决这个问题:`

class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
    def __init__(self, value=1):
        if value < 0:
            raise ValueError("Semaphore initial value must be >= 0")
        self._value = value
        self._waiters = collections.deque()
        self._wakeup_scheduled = False
    def _wake_up_next(self):
        while self._waiters:
            # 按照放置顺序依次弹出容器 
            waiter = self._waiters.popleft()
            if not waiter.done():
                # 设置容器状态,使对应的协程可以继续执行
                waiter.set_result(None)
                # 设置标记 
                self._wakeup_scheduled = True
                return
    def locked(self):
        return self._value == 0
    async def acquire(self):
        # 如果`self._wakeup_scheduled`为True或者value小于0
        while self._wakeup_scheduled or self._value <= 0:
            # 创建容器并等待执行完成
            fut = self._get_loop().create_future()
            self._waiters.append(fut)
            try:
                await fut
                self._wakeup_scheduled = False
            except exceptions.CancelledError:
                # 如果被取消了,也要唤醒下一个协程
                self._wake_up_next()
                raise
        self._value -= 1
        return True
    def release(self):
        # 释放资源占用,唤醒下一个协程。
        self._value += 1
        self._wake_up_next()

针对asyncio.Semaphore进行修改可以实现很多功能,比如基于信号量可以实现一个简单的协程池,这个协程池可以限制创建协程的量,当协程池满的时候就无法继续创建协程,只有协程中的协程执行完毕后才能继续创建(当然无法控制在协程中创建新的协程),代码如下:

import asyncio
import time
from typing import Coroutine
class Pool(object):
    def __init__(self, max_concurrency: int):
        self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrency)
    async def create_task(self, coro: Coroutine) -> asyncio.Task:
        await  self._semaphore.acquire()
        task: asyncio.Task = asyncio.create_task(coro)
        task.add_done_callback(lambda t: self._semaphore.release())
        return task
async def demo(cnt: int) -> None:
    print(f"{int(time.time())} create {cnt} task...")
    await  asyncio.sleep(cnt)
async def main() -> None:
    pool: Pool = Pool(3)
    for i in range(10):
        await pool.create_task(demo(i))
asyncio.run(main())
# >>> 1677517996 create 0 task...
# >>> 1677517996 create 1 task...
# >>> 1677517996 create 2 task...
# >>> 1677517996 create 3 task...
# >>> 1677517997 create 4 task...
# >>> 1677517998 create 5 task...
# >>> 1677517999 create 6 task...
# >>> 1677518001 create 7 task...
# >>> 1677518003 create 8 task...
# >>> 1677518005 create 9 task...

Atas ialah kandungan terperinci Apakah fungsi primitif penyegerakan yang biasa digunakan dalam perpustakaan Python Asyncio?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam