ホームページ >バックエンド開発 >Python チュートリアル >Python 非同期メソッドの使用方法

Python 非同期メソッドの使用方法

WBOY
WBOY転載
2023-05-10 23:28:041612ブラウズ

なぜ非同期プログラミングなのか?

非同期プログラミングの動機を理解するには、まずコードの実行速度を制限するものを理解する必要があります。理想的には、コードが光の速さで実行され、遅延なく即座にコードがスキップされるようにしたいと考えています。ただし、実際のコードの実行速度は、次の 2 つの要因によりはるかに遅くなります。

  • CPU 時間 (プロセッサが命令を実行するのにかかる時間)

  • ## IO 時間 (ネットワーク リクエストまたはストレージの読み取り/書き込みを待機する時間)

コードが IO を待機しているとき、CPU は基本的にアイドル状態になり、外部デバイスからの応答を待ちます。通常、カーネルはこれを検出し、ただちにシステム内の他のスレッドに実行を切り替えます。したがって、IO 集中型の一連のタスクを高速化したい場合は、タスクごとにスレッドを作成できます。いずれかのスレッドが停止して IO を待機すると、カーネルは別のスレッドに切り替えて処理を続行します。

これは実際にはうまく機能しますが、次の 2 つの欠点があります:

  • スレッドにはオーバーヘッドがあります (特に Python)

  • カーネルがいつスレッドを切り替えるかを制御することはできません

たとえば、10,000 個のタスクを実行したい場合は、10,000 個のスレッドを作成する必要があり、それには多くの時間がかかります。 RAM を使用する場合は、作成するワーカー スレッドの数を減らし、同時実行性を低くしてタスクを実行する必要があります。さらに、これらのスレッドを最初に生成すると CPU 時間が消費されます。

カーネルはいつでもスレッド間の切り替えを選択できるため、コード内でいつでも競合が発生する可能性があります。

非同期の導入

従来の同期スレッドベースのコードでは、カーネルはスレッドが IO バインドされていることを検出し、スレッド間の任意の切り替えを選択する必要があります。 Python 非同期では、プログラマはキーワード

await を使用して IO バインディングを宣言するコード行を確認し、他のタスクを実行する権限が付与されていることを確認します。たとえば、Web リクエストを実行する次のコードを考えてみましょう。

async def request_google():
    reader, writer = await asyncio.open_connection('google.com', 80)
    writer.write(b'GET / HTTP/2\n\n')
    await writer.drain()
    response = await reader.read()
    return response.decode()
ここでは、このコード

await が 2 か所にあります。したがって、バイトがサーバーに送信されるのを待機している間 (writer.drain())、サーバーがいくつかのバイトで応答するのを待機している間 (reader.read()) , 他のコードが実行され、グローバル変数が変更される可能性があることがわかっています。ただし、関数の開始から最初の待機まで、実行中のプログラム内の他のコードに切り替えることなく、コードが 1 行ずつ実行されることを確認できます。これが非同期の利点です。

asyncio は、これらの非同期関数を使用して興味深いことを実行できる標準ライブラリです。たとえば、Google への 2 つのリクエストを同時に実行したい場合は、次のようになります。

async def request_google_twice():
    response_1, response_2 = await asyncio.gather(request_google(), request_google())
    return response_1, response_2

request_google_twice() を呼び出すと、魔法の asyncio.gather 1 つの関数呼び出しが開始されますが、await Writer.drain() を呼び出すと、2 番目の関数呼び出しの実行が開始され、両方のリクエストが並行して発生します。次に、最初または 2 番目に要求された writer.drain() 呼び出しが完了するまで待機し、関数の実行を続けます。

最後に、

asyncio.run という重要な詳細が省略されています。実際に通常の [同期] Python 関数から非同期関数を呼び出すには、呼び出しを asyncio.run(...) でラップします。

async def async_main():
    r1, r2 = await request_google_twice()
    print('Response one:', r1)
    print('Response two:', r2)
    return 12

return_val = asyncio.run(async_main())
## を呼び出すだけの場合は注意してください。 #async_main()

await ... または asyncio.run(...) を呼び出さないと、何も起こりません。これは、非同期の仕組みの性質によって制限されるだけです。 それでは、非同期はどのように機能するのでしょうか。また、これらの魔法の

asyncio.run

関数と asyncio.gather 関数の機能は何でしょうか?詳細については、以下をお読みください。 非同期の仕組み

async

の魅力を理解するには、まずより単純な Python 構造を理解する必要があります。ジェネレータージェネレーター

ジェネレーターは、一連の値を 1 つずつ返す (反復可能) Python 関数です。例:

def get_numbers():
    print("|| get_numbers begin")
    print("|| get_numbers Giving 1...")
    yield 1
    print("|| get_numbers Giving 2...")
    yield 2
    print("|| get_numbers Giving 3...")
    yield 3
    print("|| get_numbers end")

print("| for begin")
for number in get_numbers():
    print(f"| Got {number}.")
print("| for end")
| for begin
|| get_numbers begin
|| get_numbers Giving 1...
| Got 1.
|| get_numbers Giving 2...
| Got 2.
|| get_numbers Giving 3...
| Got 3.
|| get_numbers end
| for end

つまり、for ループの反復ごとに、ジェネレーターで 1 回だけ実行されることがわかります。 Python の

next()

関数を使用すると、この繰り返しをより明示的に実行できます。

In [3]: generator = get_numbers()                                                                                                                                                            

In [4]: next(generator)                                                                                                                                                                      
|| get_numbers begin
|| get_numbers Giving 1...
Out[4]: 1

In [5]: next(generator)                                                                                                                                                                      
|| get_numbers Giving 2...
Out[5]: 2

In [6]: next(generator)                                                                                                                                                                      
|| get_numbers Giving 3...
Out[6]: 3

In [7]: next(generator)                                                                                                                                                                      
|| get_numbers end
---------------------------------------
StopIteration       Traceback (most recent call last)
<ipython-input-154-323ce5d717bb> in <module>
----> 1 next(generator)

StopIteration:

这与异步函数的行为非常相似。正如异步函数从函数开始直到第一次等待时连续执行代码一样,我们第一次调用next()时,生成器将从函数顶部执行到第一个yield 语句。然而,现在我们只是从生成器返回数字。我们将使用相同的思想,但返回一些不同的东西来使用生成器创建类似异步的函数。

使用生成器进行异步

让我们使用生成器来创建我们自己的小型异步框架。

但是,为简单起见,让我们将实际 IO 替换为睡眠(即。time.sleep)。让我们考虑一个需要定期发送更新的应用程序:

def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        time.sleep(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

因此,如果我们调用send_updates(3, 1.0),它将输出这三条消息,每条消息间隔 1 秒:

[1.0] Sending update 1/3.
[1.0] Sending update 2/3.
[1.0] Sending update 3/3.

现在,假设我们要同时运行几个不同的时间间隔。例如,send_updates(10, 1.0)send_updates(5, 2.0)send_updates(4, 3.0)。我们可以使用线程来做到这一点,如下所示:

threads = [
    threading.Thread(target=send_updates, args=(10, 1.0)),
    threading.Thread(target=send_updates, args=(5, 2.0)),
    threading.Thread(target=send_updates, args=(4, 3.0))
]
for i in threads:
    i.start()
for i in threads:
    i.join()

这可行,在大约 12 秒内完成,但使用具有前面提到的缺点的线程。让我们使用生成器构建相同的东西。

在演示生成器的示例中,我们返回了整数。为了获得类似异步的行为,而不是返回任意值,我们希望返回一些描述要等待的IO的对象。在我们的例子中,我们的“IO”只是一个计时器,它将等待一段时间。因此,让我们创建一个计时器对象,用于此目的:

class AsyncTimer:
    def __init__(self, duration: float):
        self.done_time = time.time() + duration

现在,让我们从我们的函数中产生这个而不是调用time.sleep

def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        yield AsyncTimer(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

现在,每次我们调用send_updates(...)时调用next(...),我们都会得到一个AsyncTimer对象,告诉我们直到我们应该等待什么时候:

generator = send_updates(3, 1.5)
timer = next(generator)  # [1.5] Sending update 1/3.
print(timer.done_time - time.time())  # 1.498...

由于我们的代码现在实际上并没有调用time.sleep,我们现在可以同时执行另一个send_updates调用。

所以,为了把这一切放在一起,我们需要退后一步,意识到一些事情:

  • 生成器就像部分执行的函数,等待一些 IO(计时器)。

  • 每个部分执行的函数都有一些 IO(计时器),它在继续执行之前等待。

  • 因此,我们程序的当前状态是每个部分执行的函数(生成器)和该函数正在等待的 IO(计时器)对的对列表

  • 现在,要运行我们的程序,我们只需要等到某个 IO 准备就绪(即我们的一个计时器已过期),然后再向前一步执行相应的函数,得到一个阻塞该函数的新 IO。

实现此逻辑为我们提供了以下信息:

# Initialize each generator with a timer of 0 so it immediately executes
generator_timer_pairs = [
    (send_updates(10, 1.0), AsyncTimer(0)),
    (send_updates(5, 2.0), AsyncTimer(0)),
    (send_updates(4, 3.0), AsyncTimer(0))
]

while generator_timer_pairs:
    pair = min(generator_timer_pairs, key=lambda x: x[1].done_time)
    generator, min_timer = pair

    # Wait until this timer is ready
    time.sleep(max(0, min_timer.done_time - time.time()))
    del generator_timer_pairs[generator_timer_pairs.index(pair)]

    try:  # Execute one more step of this function
        new_timer = next(generator)
        generator_timer_pairs.append((generator, new_timer))
    except StopIteration:  # When the function is complete
        pass

有了这个,我们有了一个使用生成器的类似异步函数的工作示例。请注意,当生成器完成时,它会引发StopIteration,并且当我们不再有部分执行的函数(生成器)时,我们的函数就完成了

现在,我们把它包装在一个函数中,我们得到了类似于asyncio.run的东西。结合asyncio.gather运行:

def async_run_all(*generators):
    generator_timer_pairs = [
        (generator, AsyncTimer(0))
        for generator in generators
    ]
    while generator_timer_pairs:
        pair = min(generator_timer_pairs, key=lambda x: x[1].done_time)
        generator, min_timer = pair

        time.sleep(max(0, min_timer.done_time - time.time()))
        del generator_timer_pairs[generator_timer_pairs.index(pair)]

        try:
            new_timer = next(generator)
            generator_timer_pairs.append((generator, new_timer))
        except StopIteration:
            pass

async_run_all(
    send_updates(10, 1.0),
    send_updates(5, 2.0),
    send_updates(4, 3.0)
)

使用 async/await 进行异步

实现我们的caveman版本的asyncio的最后一步是支持Python 3.5中引入的async/await语法。await的行为类似于yield,只是它不是直接返回提供的值,而是返回next((...).__await__())async函数返回“协程”,其行为类似于生成器,但需要使用.send(None)而不是next()(请注意,正如生成器在最初调用时不返回任何内容一样,异步函数在逐步执行之前不会执行任何操作,这解释了我们前面提到的)。

因此,鉴于这些信息,我们只需进行一些调整即可将我们的示例转换为async/await。以下是最终结果:

class AsyncTimer:
    def __init__(self, duration: float):
        self.done_time = time.time() + duration
    def __await__(self):
        yield self

async def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        await AsyncTimer(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

def _wait_until_io_ready(ios):
    min_timer = min(ios, key=lambda x: x.done_time)
    time.sleep(max(0, min_timer.done_time - time.time()))
    return ios.index(min_timer)

def async_run_all(*coroutines):
    coroutine_io_pairs = [
        (coroutine, AsyncTimer(0))
        for coroutine in coroutines
    ]
    while coroutine_io_pairs:
        ios = [io for cor, io in coroutine_io_pairs]
        ready_index = _wait_until_io_ready(ios)
        coroutine, _ = coroutine_io_pairs.pop(ready_index)

        try:
            new_io = coroutine.send(None)
            coroutine_io_pairs.append((coroutine, new_io))
        except StopIteration:
            pass

async_run_all(
    send_updates(10, 1.0),
    send_updates(5, 2.0),
    send_updates(4, 3.0)
)

我们有了它,我们的迷你异步示例完成了,使用async/await. 现在,您可能已经注意到我将 timer 重命名为 io 并将查找最小计时器的逻辑提取到一个名为_wait_until_io_ready. 这是有意将这个示例与最后一个主题联系起来:真实 IO。

これで、async/await を使用した小さな非同期の例が完成しました。ここで、timer の名前を io に変更し、最小タイマーを見つけるためのロジックを _wait_until_io_ready という関数に抽出したことに気づいたかもしれません。これは、この例を最後のトピック Real IO に結び付けるためです。

実際の IO (タイマーだけではありません)

これらの例はすべて素晴らしいですが、実際の非同期とどのように関連しているのでしょうか。実際の IO ソケットとファイルの読み取りで TCP を待機したいと考えています。書き込み?さて、美しさはその _wait_until_io_ready 関数にあります。実際の IO を動作させるには、ファイル記述子を含む AsyncTimer のような新しいオブジェクト AsyncReadFile を作成するだけです。次に、AsyncReadFile を待っているオブジェクトのセットは、ファイル記述子のセットに対応します。最後に、関数 (syscall) select() を使用して、これらのファイル記述子のいずれかが準備できるまで待機します。 TCP/UDP ソケットはファイル記述子を使用して実装されるため、これはネットワーク要求もカバーします。

では、これらの例はすべて素晴らしいものですが、実際の非同期 IO とどのような関係があるのでしょうか? TCP ソケットやファイルの読み取り/書き込みなど、実際の IO を待ちますか?そうですね、利点は _wait_until_io_ready 関数にあります。実際の IO を機能させるには、AsyncTimer と同様の、 ファイル記述子 を含む新しい AsyncReadFile を作成するだけです。次に、待機している AsyncReadFile オブジェクトのセットは、ファイル記述子のセットに対応します。最後に、関数 (syscall)select() を使用して、これらのファイル記述子のいずれかが準備できるまで待機します。 TCP/UDP ソケットはファイル記述子を使用して実装されるため、これはネットワーク要求もカバーします。

以上がPython 非同期メソッドの使用方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。