Rumah  >  Artikel  >  pembangunan bahagian belakang  >  Cara menggunakan kaedah tak segerak Python

Cara menggunakan kaedah tak segerak Python

WBOY
WBOYke hadapan
2023-05-10 23:28:041495semak imbas

Mengapa pengaturcaraan tak segerak?

Untuk memahami motivasi pengaturcaraan tak segerak, pertama sekali kita perlu memahami apa yang mengehadkan kelajuan kod kita. Sebaik-baiknya, kami mahu kod kami berjalan pada kelajuan cahaya, melangkau kod kami serta-merta tanpa sebarang kelewatan. Walau bagaimanapun, kod sebenarnya berjalan lebih perlahan disebabkan oleh dua faktor:

  • Masa CPU (masa yang diperlukan pemproses untuk melaksanakan arahan)

  • Masa IO (masa menunggu permintaan rangkaian atau storan membaca/menulis)

Semasa kod kami menunggu IO, CPU pada dasarnya melahu, menunggu beberapa peranti luaran bertindak balas. Biasanya, kernel mengesan ini dan segera menukar pelaksanaan kepada utas lain dalam sistem. Jadi, jika kita ingin mempercepatkan satu set tugasan intensif IO, kita boleh membuat urutan untuk setiap tugas. Apabila salah satu utas berhenti, menunggu IO, kernel akan beralih ke utas lain untuk meneruskan pemprosesan.

Ini berfungsi dengan baik dalam amalan, tetapi mempunyai dua kelemahan:

  • Benang mempunyai overhead (terutamanya dalam Python)

  • Kami tidak mempunyai kawalan ke atas apabila kernel memilih untuk bertukar antara benang

Sebagai contoh, jika kita ingin melaksanakan 10,000 tugas, kita sama ada perlu mencipta 10,000 utas, yang akan mengambil banyak RAM, sama ada kita perlu mencipta bilangan utas pekerja yang lebih sedikit dan melaksanakan tugas dengan kurang serentak. Selain itu, pada mulanya pemijahan benang ini memakan masa CPU.

Memandangkan kernel boleh memilih untuk bertukar antara benang pada bila-bila masa, perlumbaan boleh berlaku pada bila-bila masa dalam kod kami.

Memperkenalkan asynchrony

Dalam kod berasaskan benang segerak tradisional, kernel mesti mengesan apabila thread terikat IO dan memilih untuk menukar antara thread sesuka hati. Dengan Python async, pengaturcara menggunakan kata kunci await untuk mengesahkan baris kod yang mengisytiharkan pengikatan IO dan mengesahkan bahawa kebenaran diberikan untuk melaksanakan tugas lain. Sebagai contoh, pertimbangkan kod berikut yang melaksanakan permintaan web:

async def request_google():
    reader, writer = await asyncio.open_connection('google.com', 80)
    writer.write(b'GET / HTTP/2\n\n')
    await writer.drain()
    response = await reader.read()
    return response.decode()

Di sini, di sini kita melihat kod ini di dua tempat await. Jadi sementara menunggu bait kami dihantar ke pelayan (writer.drain()), dan sementara menunggu pelayan membalas dengan beberapa bait (reader.read()), kami tahu bahawa kod lain mungkin dilaksanakan dan pembolehubah global mungkin berubah. Walau bagaimanapun, dari permulaan fungsi hingga penantian pertama, kami boleh memastikan bahawa kod berjalan baris demi baris tanpa beralih kepada kod lain dalam program yang sedang berjalan. Inilah keindahan async.

asyncio ialah perpustakaan standard yang membolehkan kami melakukan beberapa perkara menarik dengan fungsi tak segerak ini. Sebagai contoh, jika kita ingin melaksanakan dua permintaan kepada Google pada masa yang sama, kita boleh:

async def request_google_twice():
    response_1, response_2 = await asyncio.gather(request_google(), request_google())
    return response_1, response_2

Apabila kita memanggil request_google_twice(), sihir asyncio.gather memulakan panggilan fungsi, tetapi apabila kita memanggil await writer.drain() , ia akan mula melaksanakan panggilan fungsi kedua supaya kedua-dua permintaan berlaku selari. Ia kemudian menunggu panggilan writer.drain() yang pertama atau kedua yang diminta untuk diselesaikan dan terus melaksanakan fungsi tersebut.

Akhir sekali, terdapat butiran penting yang ditinggalkan: asyncio.run. Untuk benar-benar memanggil fungsi tak segerak daripada fungsi Python [synchronous] biasa, kami membungkus panggilan dengan asyncio.run(...):

async def async_main():
    r1, r2 = await request_google_twice()
    print('Response one:', r1)
    print('Response two:', r2)
    return 12

return_val = asyncio.run(async_main())

Ambil perhatian bahawa jika kami hanya memanggil async_main() tanpa memanggil sama ada await ... atau asyncio.run(...) , tiada apa yang berlaku. Ini hanya dihadkan oleh sifat cara tak segerak berfungsi.

Jadi, bagaimana sebenarnya async berfungsi, dan apakah fungsi asyncio.run dan asyncio.gather ajaib ini? Baca di bawah untuk mengetahui lebih lanjut.

Cara async berfungsi

Untuk memahami keajaiban async, kita perlu terlebih dahulu memahami binaan Python yang lebih mudah: Penjana

Penjana

Penjana ialah Fungsi Python yang mengembalikan urutan nilai satu demi satu (boleh diulang). Contohnya:

def get_numbers():
    print("|| get_numbers begin")
    print("|| get_numbers Giving 1...")
    yield 1
    print("|| get_numbers Giving 2...")
    yield 2
    print("|| get_numbers Giving 3...")
    yield 3
    print("|| get_numbers end")

print("| for begin")
for number in get_numbers():
    print(f"| Got {number}.")
print("| for end")
| for begin
|| get_numbers begin
|| get_numbers Giving 1...
| Got 1.
|| get_numbers Giving 2...
| Got 2.
|| get_numbers Giving 3...
| Got 3.
|| get_numbers end
| for end

Jadi, kita lihat bahawa untuk setiap lelaran gelung for, kita laksanakannya sekali sahaja dalam penjana. Kita boleh melakukan lelaran ini dengan lebih jelas menggunakan fungsi next() Python:

In [3]: generator = get_numbers()                                                                                                                                                            

In [4]: next(generator)                                                                                                                                                                      
|| get_numbers begin
|| get_numbers Giving 1...
Out[4]: 1

In [5]: next(generator)                                                                                                                                                                      
|| get_numbers Giving 2...
Out[5]: 2

In [6]: next(generator)                                                                                                                                                                      
|| get_numbers Giving 3...
Out[6]: 3

In [7]: next(generator)                                                                                                                                                                      
|| get_numbers end
---------------------------------------
StopIteration       Traceback (most recent call last)
<ipython-input-154-323ce5d717bb> in <module>
----> 1 next(generator)

StopIteration:

这与异步函数的行为非常相似。正如异步函数从函数开始直到第一次等待时连续执行代码一样,我们第一次调用next()时,生成器将从函数顶部执行到第一个yield 语句。然而,现在我们只是从生成器返回数字。我们将使用相同的思想,但返回一些不同的东西来使用生成器创建类似异步的函数。

使用生成器进行异步

让我们使用生成器来创建我们自己的小型异步框架。

但是,为简单起见,让我们将实际 IO 替换为睡眠(即。time.sleep)。让我们考虑一个需要定期发送更新的应用程序:

def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        time.sleep(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

因此,如果我们调用send_updates(3, 1.0),它将输出这三条消息,每条消息间隔 1 秒:

[1.0] Sending update 1/3.
[1.0] Sending update 2/3.
[1.0] Sending update 3/3.

现在,假设我们要同时运行几个不同的时间间隔。例如,send_updates(10, 1.0)send_updates(5, 2.0)send_updates(4, 3.0)。我们可以使用线程来做到这一点,如下所示:

threads = [
    threading.Thread(target=send_updates, args=(10, 1.0)),
    threading.Thread(target=send_updates, args=(5, 2.0)),
    threading.Thread(target=send_updates, args=(4, 3.0))
]
for i in threads:
    i.start()
for i in threads:
    i.join()

这可行,在大约 12 秒内完成,但使用具有前面提到的缺点的线程。让我们使用生成器构建相同的东西。

在演示生成器的示例中,我们返回了整数。为了获得类似异步的行为,而不是返回任意值,我们希望返回一些描述要等待的IO的对象。在我们的例子中,我们的“IO”只是一个计时器,它将等待一段时间。因此,让我们创建一个计时器对象,用于此目的:

class AsyncTimer:
    def __init__(self, duration: float):
        self.done_time = time.time() + duration

现在,让我们从我们的函数中产生这个而不是调用time.sleep

def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        yield AsyncTimer(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

现在,每次我们调用send_updates(...)时调用next(...),我们都会得到一个AsyncTimer对象,告诉我们直到我们应该等待什么时候:

generator = send_updates(3, 1.5)
timer = next(generator)  # [1.5] Sending update 1/3.
print(timer.done_time - time.time())  # 1.498...

由于我们的代码现在实际上并没有调用time.sleep,我们现在可以同时执行另一个send_updates调用。

所以,为了把这一切放在一起,我们需要退后一步,意识到一些事情:

  • 生成器就像部分执行的函数,等待一些 IO(计时器)。

  • 每个部分执行的函数都有一些 IO(计时器),它在继续执行之前等待。

  • 因此,我们程序的当前状态是每个部分执行的函数(生成器)和该函数正在等待的 IO(计时器)对的对列表

  • 现在,要运行我们的程序,我们只需要等到某个 IO 准备就绪(即我们的一个计时器已过期),然后再向前一步执行相应的函数,得到一个阻塞该函数的新 IO。

实现此逻辑为我们提供了以下信息:

# Initialize each generator with a timer of 0 so it immediately executes
generator_timer_pairs = [
    (send_updates(10, 1.0), AsyncTimer(0)),
    (send_updates(5, 2.0), AsyncTimer(0)),
    (send_updates(4, 3.0), AsyncTimer(0))
]

while generator_timer_pairs:
    pair = min(generator_timer_pairs, key=lambda x: x[1].done_time)
    generator, min_timer = pair

    # Wait until this timer is ready
    time.sleep(max(0, min_timer.done_time - time.time()))
    del generator_timer_pairs[generator_timer_pairs.index(pair)]

    try:  # Execute one more step of this function
        new_timer = next(generator)
        generator_timer_pairs.append((generator, new_timer))
    except StopIteration:  # When the function is complete
        pass

有了这个,我们有了一个使用生成器的类似异步函数的工作示例。请注意,当生成器完成时,它会引发StopIteration,并且当我们不再有部分执行的函数(生成器)时,我们的函数就完成了

现在,我们把它包装在一个函数中,我们得到了类似于asyncio.run的东西。结合asyncio.gather运行:

def async_run_all(*generators):
    generator_timer_pairs = [
        (generator, AsyncTimer(0))
        for generator in generators
    ]
    while generator_timer_pairs:
        pair = min(generator_timer_pairs, key=lambda x: x[1].done_time)
        generator, min_timer = pair

        time.sleep(max(0, min_timer.done_time - time.time()))
        del generator_timer_pairs[generator_timer_pairs.index(pair)]

        try:
            new_timer = next(generator)
            generator_timer_pairs.append((generator, new_timer))
        except StopIteration:
            pass

async_run_all(
    send_updates(10, 1.0),
    send_updates(5, 2.0),
    send_updates(4, 3.0)
)

使用 async/await 进行异步

实现我们的caveman版本的asyncio的最后一步是支持Python 3.5中引入的async/await语法。await的行为类似于yield,只是它不是直接返回提供的值,而是返回next((...).__await__())async函数返回“协程”,其行为类似于生成器,但需要使用.send(None)而不是next()(请注意,正如生成器在最初调用时不返回任何内容一样,异步函数在逐步执行之前不会执行任何操作,这解释了我们前面提到的)。

因此,鉴于这些信息,我们只需进行一些调整即可将我们的示例转换为async/await。以下是最终结果:

class AsyncTimer:
    def __init__(self, duration: float):
        self.done_time = time.time() + duration
    def __await__(self):
        yield self

async def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        await AsyncTimer(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

def _wait_until_io_ready(ios):
    min_timer = min(ios, key=lambda x: x.done_time)
    time.sleep(max(0, min_timer.done_time - time.time()))
    return ios.index(min_timer)

def async_run_all(*coroutines):
    coroutine_io_pairs = [
        (coroutine, AsyncTimer(0))
        for coroutine in coroutines
    ]
    while coroutine_io_pairs:
        ios = [io for cor, io in coroutine_io_pairs]
        ready_index = _wait_until_io_ready(ios)
        coroutine, _ = coroutine_io_pairs.pop(ready_index)

        try:
            new_io = coroutine.send(None)
            coroutine_io_pairs.append((coroutine, new_io))
        except StopIteration:
            pass

async_run_all(
    send_updates(10, 1.0),
    send_updates(5, 2.0),
    send_updates(4, 3.0)
)

我们有了它,我们的迷你异步示例完成了,使用async/await. 现在,您可能已经注意到我将 timer 重命名为 io 并将查找最小计时器的逻辑提取到一个名为_wait_until_io_ready. 这是有意将这个示例与最后一个主题联系起来:真实 IO。

Di sini kami telah melengkapkan contoh async kecil kami, menggunakan async/await. Sekarang, anda mungkin perasan bahawa saya telah menamakan semula timer kepada io dan mengekstrak logik untuk mencari pemasa minimum ke dalam fungsi yang dipanggil _wait_until_io_ready. Ini adalah untuk menyambung contoh ini dengan topik terakhir: IO Sebenar.

IO sebenar (bukan hanya pemasa)

Jadi, semua contoh ini bagus, tetapi apa kaitannya dengan asyncio sebenar, kami mahu menunggu pada TCP pada Soket dan fail IO sebenar membaca/menulis? Nah, keindahannya terdapat pada ciri _wait_until_io_ready itu. Untuk membolehkan IO sebenar berfungsi, kita hanya perlu mencipta beberapa AsyncReadFile objek baharu yang serupa dengan AsyncTimer yang mengandungi deskriptor fail. Kemudian, AsyncReadFile set objek yang kita tunggu sepadan dengan set deskriptor fail. Akhir sekali, kita boleh menggunakan fungsi (syscall) select() untuk menunggu salah satu daripada deskriptor fail ini sedia. Oleh kerana soket TCP/UDP dilaksanakan menggunakan deskriptor fail, ini juga meliputi permintaan rangkaian.

Jadi, semua contoh ini bagus, tetapi apakah kaitannya dengan IO tak segerak sebenar? Adakah kita mahu menunggu IO sebenar, seperti soket TCP dan fail membaca/menulis? Nah, kelebihannya terletak pada fungsi _wait_until_io_ready. Untuk menjadikan IO sebenar berfungsi, apa yang perlu kita lakukan ialah mencipta AsyncReadFile baharu, serupa dengan AsyncTimer, yang mengandungi deskriptor fail . Kemudian, set AsyncReadFile objek yang kita tunggu sepadan dengan set deskriptor fail. Akhir sekali, kita boleh menggunakan fungsi (syscall)select() untuk menunggu salah satu daripada deskriptor fail ini sedia. Oleh kerana soket TCP/UDP dilaksanakan menggunakan deskriptor fail, ini meliputi permintaan rangkaian juga.

Atas ialah kandungan terperinci Cara menggunakan kaedah tak segerak Python. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam