Heim >Backend-Entwicklung >Python-Tutorial >So verwenden Sie asynchrone Python-Methoden

So verwenden Sie asynchrone Python-Methoden

WBOY
WBOYnach vorne
2023-05-10 23:28:041612Durchsuche

Warum asynchrone Programmierung?

Um die Motivation für die asynchrone Programmierung zu verstehen, müssen wir zunächst verstehen, was die Ausführungsgeschwindigkeit unseres Codes einschränkt. Im Idealfall möchten wir, dass unser Code mit Lichtgeschwindigkeit läuft und unseren Code sofort und ohne Verzögerung überspringt. Allerdings läuft der Code aufgrund von zwei Faktoren tatsächlich viel langsamer:

  • CPU-Zeit (Zeit, die der Prozessor für die Ausführung von Anweisungen benötigt)

  • IO-Zeit (Zeit, die für das Warten auf Netzwerkanforderungen oder Speicher-Lese-/Schreibvorgänge aufgewendet wird)

Wenn unser Code auf E/A wartet, ist die CPU grundsätzlich im Leerlauf und wartet auf eine Antwort von einem externen Gerät. Normalerweise erkennt der Kernel dies und schaltet die Ausführung sofort auf andere Threads im System um. Wenn wir also eine Reihe von E/A-intensiven Aufgaben beschleunigen möchten, können wir für jede Aufgabe einen Thread erstellen. Wenn einer der Threads stoppt und auf E/A wartet, wechselt der Kernel zu einem anderen Thread, um die Verarbeitung fortzusetzen.

Das funktioniert in der Praxis gut, hat aber zwei Nachteile:

  • Threads haben Overhead (insbesondere in Python)

  • Wir haben keine Kontrolle darüber, wann der Kernel zwischen Threads wechselt

Zum Beispiel, wenn Wenn wir 10.000 Aufgaben ausführen möchten, müssen wir entweder 10.000 Threads erstellen, was viel RAM verbraucht, oder wir müssen eine kleinere Anzahl von Arbeitsthreads erstellen und die Aufgaben mit weniger Parallelität ausführen. Darüber hinaus verbraucht das anfängliche Erzeugen dieser Threads CPU-Zeit.

Da der Kernel jederzeit zwischen Threads wechseln kann, können in unserem Code jederzeit Rennen auftreten.

Einführung in die Asynchronität

Im herkömmlichen synchronen Thread-basierten Code muss der Kernel erkennen, wann ein Thread IO-gebunden ist, und sich dafür entscheiden, nach Belieben zwischen Threads zu wechseln. Bei der Python-Asynchronität verwendet der Programmierer das Schlüsselwort await, um die Codezeile zu bestätigen, die die E/A-Bindung deklariert und bestätigt, dass die Berechtigung zum Ausführen anderer Aufgaben erteilt wurde. Betrachten Sie beispielsweise den folgenden Code, der eine Webanfrage ausführt: await确认声明 IO 绑定的代码行,并确认授予执行其他任务的权限。例如,考虑以下执行Web请求的代码:

async def request_google():
    reader, writer = await asyncio.open_connection('google.com', 80)
    writer.write(b'GET / HTTP/2\n\n')
    await writer.drain()
    response = await reader.read()
    return response.decode()

在这里,在这里,我们看到该代码在两个地方await。因此,在等待我们的字节被发送到服务器(writer.drain())时,在等待服务器用一些字节(reader.read())回复时,我们知道其他代码可能会执行,全局变量可能会更改。然而,从函数开始到第一次等待,我们可以确保代码逐行运行,而不会切换到运行程序中的其他代码。这就是异步的美妙之处。

asyncio是一个标准库,可以让我们用这些异步函数做一些有趣的事情。例如,如果我们想同时向Google执行两个请求,我们可以:

async def request_google_twice():
    response_1, response_2 = await asyncio.gather(request_google(), request_google())
    return response_1, response_2

当我们调用request_google_twice()时,神奇的asyncio.gather会启动一个函数调用,但是当我们调用时await writer.drain(),它会开始执行第二个函数调用,这样两个请求就会并行发生。然后,它等待第一个或第二个请求的writer.drain()调用完成并继续执行该函数。

最后,有一个重要的细节被遗漏了:asyncio.run。要从常规的 [同步] Python 函数实际调用异步函数,我们将调用包装在asyncio.run(...)

async def async_main():
    r1, r2 = await request_google_twice()
    print('Response one:', r1)
    print('Response two:', r2)
    return 12

return_val = asyncio.run(async_main())

请注意,如果我们只调用async_main()而不调用await ...或者 asyncio.run(...),则不会发生任何事情。这只是由异步工作方式的性质所限制的。

那么,异步究竟是如何工作的,这些神奇的asyncio.runasyncio.gather函数有什么作用呢?阅读下文以了解详情。

异步是如何工作的

要了解async的魔力,我们首先需要了解一个更简单的 Python 构造:生成器

生成器

生成器是 Python 函数,它逐个返回一系列值(可迭代)。例如:

def get_numbers():
    print("|| get_numbers begin")
    print("|| get_numbers Giving 1...")
    yield 1
    print("|| get_numbers Giving 2...")
    yield 2
    print("|| get_numbers Giving 3...")
    yield 3
    print("|| get_numbers end")

print("| for begin")
for number in get_numbers():
    print(f"| Got {number}.")
print("| for end")
| for begin
|| get_numbers begin
|| get_numbers Giving 1...
| Got 1.
|| get_numbers Giving 2...
| Got 2.
|| get_numbers Giving 3...
| Got 3.
|| get_numbers end
| for end

因此,我们看到,对于for循环的每个迭代,我们在生成器中只执行一次。我们可以使用Python的next()

In [3]: generator = get_numbers()                                                                                                                                                            

In [4]: next(generator)                                                                                                                                                                      
|| get_numbers begin
|| get_numbers Giving 1...
Out[4]: 1

In [5]: next(generator)                                                                                                                                                                      
|| get_numbers Giving 2...
Out[5]: 2

In [6]: next(generator)                                                                                                                                                                      
|| get_numbers Giving 3...
Out[6]: 3

In [7]: next(generator)                                                                                                                                                                      
|| get_numbers end
---------------------------------------
StopIteration       Traceback (most recent call last)
<ipython-input-154-323ce5d717bb> in <module>
----> 1 next(generator)

StopIteration:

Hier sehen wir diesen Code await an zwei Stellen. Während wir also darauf warten, dass unsere Bytes an den Server gesendet werden (writer.drain()), während wir darauf warten, dass der Server mit einigen Bytes antwortet (reader.read()). ) wissen wir, dass möglicherweise anderer Code ausgeführt wird und sich globale Variablen ändern können. Vom Start der Funktion bis zum ersten Warten können wir jedoch sicherstellen, dass der Code Zeile für Zeile ausgeführt wird, ohne zu anderem Code im laufenden Programm zu wechseln. Das ist das Schöne an Async.

asyncio ist eine Standardbibliothek, die es uns ermöglicht, einige interessante Dinge mit diesen asynchronen Funktionen zu tun. Wenn wir beispielsweise zwei Anfragen gleichzeitig an Google stellen möchten, können wir: 🎜

def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        time.sleep(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

Wenn wir request_google_twice() aufrufen, die Magie asyncio.gather startet einen Funktionsaufruf, aber wenn wir await write.drain() aufrufen, beginnt es mit der Ausführung des zweiten Funktionsaufrufs, sodass die beiden Anforderungen in ausgeführt werden parallel. Anschließend wartet es auf den Abschluss des ersten oder zweiten angeforderten writer.drain()-Aufrufs und fährt mit der Ausführung der Funktion fort. 🎜

Schließlich wird ein wichtiges Detail ausgelassen: asyncio.run. Um tatsächlich eine asynchrone Funktion aus einer regulären [synchronen] Python-Funktion aufzurufen, schließen wir den Aufruf in asyncio.run(...) ein: 🎜

[1.0] Sending update 1/3.
[1.0] Sending update 2/3.
[1.0] Sending update 3/3.

Bitte beachten Sie, dass dies nicht der Fall ist, wenn wir nur async_main() aufrufen, ohne await ... oder asyncio.run(...) aufzurufen irgendetwas passiert. Dies wird nur durch die Art der asynchronen Funktionsweise begrenzt. 🎜

Wie funktioniert Asynchronität und welche Funktionen haben diese magischen Funktionen asyncio.run und asyncio.gather? Wollstoff? Lesen Sie weiter unten, um mehr zu erfahren. 🎜

So funktioniert Async 🎜

Um die Magie von async zu verstehen, müssen wir zuerst Sie müssen ein einfacheres Python-Konstrukt verstehen: Generator 🎜

Generator 🎜

Generator ist eine Python-Funktion, die zurückgibt eine Folge von Werten nacheinander (iterierbar). Zum Beispiel: 🎜

threads = [
    threading.Thread(target=send_updates, args=(10, 1.0)),
    threading.Thread(target=send_updates, args=(5, 2.0)),
    threading.Thread(target=send_updates, args=(4, 3.0))
]
for i in threads:
    i.start()
for i in threads:
    i.join()
class AsyncTimer:
    def __init__(self, duration: float):
        self.done_time = time.time() + duration

Wir sehen also, dass wir für jede Iteration der for-Schleife diese nur einmal im Generator ausführen. Wir können diese Iteration expliziter mit der Python-Funktion next() durchführen: 🎜

In [3]: generator = get_numbers()                                                                                                                                                            

In [4]: next(generator)                                                                                                                                                                      
|| get_numbers begin
|| get_numbers Giving 1...
Out[4]: 1

In [5]: next(generator)                                                                                                                                                                      
|| get_numbers Giving 2...
Out[5]: 2

In [6]: next(generator)                                                                                                                                                                      
|| get_numbers Giving 3...
Out[6]: 3

In [7]: next(generator)                                                                                                                                                                      
|| get_numbers end
---------------------------------------
StopIteration       Traceback (most recent call last)
<ipython-input-154-323ce5d717bb> in <module>
----> 1 next(generator)

StopIteration:

这与异步函数的行为非常相似。正如异步函数从函数开始直到第一次等待时连续执行代码一样,我们第一次调用next()时,生成器将从函数顶部执行到第一个yield 语句。然而,现在我们只是从生成器返回数字。我们将使用相同的思想,但返回一些不同的东西来使用生成器创建类似异步的函数。

使用生成器进行异步

让我们使用生成器来创建我们自己的小型异步框架。

但是,为简单起见,让我们将实际 IO 替换为睡眠(即。time.sleep)。让我们考虑一个需要定期发送更新的应用程序:

def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        time.sleep(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

因此,如果我们调用send_updates(3, 1.0),它将输出这三条消息,每条消息间隔 1 秒:

[1.0] Sending update 1/3.
[1.0] Sending update 2/3.
[1.0] Sending update 3/3.

现在,假设我们要同时运行几个不同的时间间隔。例如,send_updates(10, 1.0)send_updates(5, 2.0)send_updates(4, 3.0)。我们可以使用线程来做到这一点,如下所示:

threads = [
    threading.Thread(target=send_updates, args=(10, 1.0)),
    threading.Thread(target=send_updates, args=(5, 2.0)),
    threading.Thread(target=send_updates, args=(4, 3.0))
]
for i in threads:
    i.start()
for i in threads:
    i.join()

这可行,在大约 12 秒内完成,但使用具有前面提到的缺点的线程。让我们使用生成器构建相同的东西。

在演示生成器的示例中,我们返回了整数。为了获得类似异步的行为,而不是返回任意值,我们希望返回一些描述要等待的IO的对象。在我们的例子中,我们的“IO”只是一个计时器,它将等待一段时间。因此,让我们创建一个计时器对象,用于此目的:

class AsyncTimer:
    def __init__(self, duration: float):
        self.done_time = time.time() + duration

现在,让我们从我们的函数中产生这个而不是调用time.sleep

def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        yield AsyncTimer(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

现在,每次我们调用send_updates(...)时调用next(...),我们都会得到一个AsyncTimer对象,告诉我们直到我们应该等待什么时候:

generator = send_updates(3, 1.5)
timer = next(generator)  # [1.5] Sending update 1/3.
print(timer.done_time - time.time())  # 1.498...

由于我们的代码现在实际上并没有调用time.sleep,我们现在可以同时执行另一个send_updates调用。

所以,为了把这一切放在一起,我们需要退后一步,意识到一些事情:

  • 生成器就像部分执行的函数,等待一些 IO(计时器)。

  • 每个部分执行的函数都有一些 IO(计时器),它在继续执行之前等待。

  • 因此,我们程序的当前状态是每个部分执行的函数(生成器)和该函数正在等待的 IO(计时器)对的对列表

  • 现在,要运行我们的程序,我们只需要等到某个 IO 准备就绪(即我们的一个计时器已过期),然后再向前一步执行相应的函数,得到一个阻塞该函数的新 IO。

实现此逻辑为我们提供了以下信息:

# Initialize each generator with a timer of 0 so it immediately executes
generator_timer_pairs = [
    (send_updates(10, 1.0), AsyncTimer(0)),
    (send_updates(5, 2.0), AsyncTimer(0)),
    (send_updates(4, 3.0), AsyncTimer(0))
]

while generator_timer_pairs:
    pair = min(generator_timer_pairs, key=lambda x: x[1].done_time)
    generator, min_timer = pair

    # Wait until this timer is ready
    time.sleep(max(0, min_timer.done_time - time.time()))
    del generator_timer_pairs[generator_timer_pairs.index(pair)]

    try:  # Execute one more step of this function
        new_timer = next(generator)
        generator_timer_pairs.append((generator, new_timer))
    except StopIteration:  # When the function is complete
        pass

有了这个,我们有了一个使用生成器的类似异步函数的工作示例。请注意,当生成器完成时,它会引发StopIteration,并且当我们不再有部分执行的函数(生成器)时,我们的函数就完成了

现在,我们把它包装在一个函数中,我们得到了类似于asyncio.run的东西。结合asyncio.gather运行:

def async_run_all(*generators):
    generator_timer_pairs = [
        (generator, AsyncTimer(0))
        for generator in generators
    ]
    while generator_timer_pairs:
        pair = min(generator_timer_pairs, key=lambda x: x[1].done_time)
        generator, min_timer = pair

        time.sleep(max(0, min_timer.done_time - time.time()))
        del generator_timer_pairs[generator_timer_pairs.index(pair)]

        try:
            new_timer = next(generator)
            generator_timer_pairs.append((generator, new_timer))
        except StopIteration:
            pass

async_run_all(
    send_updates(10, 1.0),
    send_updates(5, 2.0),
    send_updates(4, 3.0)
)

使用 async/await 进行异步

实现我们的caveman版本的asyncio的最后一步是支持Python 3.5中引入的async/await语法。await的行为类似于yield,只是它不是直接返回提供的值,而是返回next((...).__await__())async函数返回“协程”,其行为类似于生成器,但需要使用.send(None)而不是next()(请注意,正如生成器在最初调用时不返回任何内容一样,异步函数在逐步执行之前不会执行任何操作,这解释了我们前面提到的)。

因此,鉴于这些信息,我们只需进行一些调整即可将我们的示例转换为async/await。以下是最终结果:

class AsyncTimer:
    def __init__(self, duration: float):
        self.done_time = time.time() + duration
    def __await__(self):
        yield self

async def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        await AsyncTimer(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

def _wait_until_io_ready(ios):
    min_timer = min(ios, key=lambda x: x.done_time)
    time.sleep(max(0, min_timer.done_time - time.time()))
    return ios.index(min_timer)

def async_run_all(*coroutines):
    coroutine_io_pairs = [
        (coroutine, AsyncTimer(0))
        for coroutine in coroutines
    ]
    while coroutine_io_pairs:
        ios = [io for cor, io in coroutine_io_pairs]
        ready_index = _wait_until_io_ready(ios)
        coroutine, _ = coroutine_io_pairs.pop(ready_index)

        try:
            new_io = coroutine.send(None)
            coroutine_io_pairs.append((coroutine, new_io))
        except StopIteration:
            pass

async_run_all(
    send_updates(10, 1.0),
    send_updates(5, 2.0),
    send_updates(4, 3.0)
)

我们有了它,我们的迷你异步示例完成了,使用async/await. 现在,您可能已经注意到我将 timer 重命名为 io 并将查找最小计时器的逻辑提取到一个名为_wait_until_io_ready. 这是有意将这个示例与最后一个主题联系起来:真实 IO。

Hier haben wir unser kleines asynchrones Beispiel mit async/await。现在,你可能已经注意到我将timer重命名为io,并将用于查找最小计时器的逻辑提取到一个名为_wait_until_io_ready的函数中。这是为了将本示例与最后一个主题:真正的IO,连接起来。

真正的 IO(而不仅仅是定时器)

所以,所有这些例子都很棒,但是它们与真正的 asyncio 有什么关系,我们希望在真正 IO 上等待 TCP 套接字和文件读/写?嗯,美丽就在那个_wait_until_io_ready功能中。为了让真正的 IO 正常工作,我们所要做的就是创建一些AsyncReadFile类似于AsyncTimer包含文件描述符的新对象。然后,AsyncReadFile我们正在等待的对象集对应于一组文件描述符。最后,我们可以使用函数 (syscall) select()等待这些文件描述符之一准备好。由于 TCP/UDP 套接字是使用文件描述符实现的,因此这也涵盖了网络请求。

所以,所有这些例子都很好,但它们与真正的异步IO有什么关系呢?我们希望等待实际的IO,比如TCP套接字和文件读/写?好吧,其优点在于_wait_until_io_ready函数。要使真正的IO工作,我们需要做的就是创建一些新的AsyncReadFile,类似于AsyncTimer,它包含一个文件描述符。然后,我们正在等待的一组AsyncReadFile对象对应于一组文件描述符。最后,我们可以使用函数(syscallselect()Warten Sie, bis einer dieser Dateideskriptoren bereit ist, fertiggestellt. Da TCP/UDP-Sockets mithilfe von Dateideskriptoren implementiert werden, deckt dies auch Netzwerkanfragen ab.

Das obige ist der detaillierte Inhalt vonSo verwenden Sie asynchrone Python-Methoden. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen