要了解非同步程式設計的動機,我們首先必須了解是什麼限制了我們的程式碼運行速度。理想情況下,我們希望我們的程式碼以光速運行,立即跳過我們的程式碼,沒有任何延遲。然而,由於兩個因素,實際上程式碼運行速度要慢得多:
CPU時間(處理器執行指令的時間)
IO時間(等待網路請求或儲存讀取/寫入的時間)
當我們的程式碼在等待IO 時,CPU 基本上是空閒的,等待某個外部裝置回應。通常,核心會偵測到這一點並立即切換到執行系統中的其他執行緒。因此,如果我們想加快處理一組 IO 密集型任務,我們可以為每個任務建立一個執行緒。當其中一個線程停止,等待 IO 時,核心將切換到另一個線程繼續處理。
這在實踐中效果很好,但有兩個缺點:
#線程有開銷(尤其是在Python 中)
我們無法控制核心何時選擇在線程之間切換
例如,如果我們想要執行10,000 個任務,我們要么必須創建10,000 個線程,這將佔用大量RAM,要么我們需要創建較少數量的工作線程並以較少的並發性執行任務。此外,最初產生這些執行緒會佔用 CPU 時間。
由於核心可以隨時選擇在執行緒之間切換,因此我們程式碼中的任何時候都可能出現相互競爭。
在傳統的基於同步執行緒的程式碼中,核心必須偵測執行緒何時是IO綁定的,並選擇在執行緒之間隨意切換。使用 Python 非同步,程式設計師使用關鍵字await
確認宣告 IO 綁定的程式碼行,並確認授予執行其他任務的權限。例如,考慮以下執行Web請求的程式碼:
async def request_google(): reader, writer = await asyncio.open_connection('google.com', 80) writer.write(b'GET / HTTP/2\n\n') await writer.drain() response = await reader.read() return response.decode()
在這裡,在這裡,我們看到該程式碼在兩個地方await
。因此,在等待我們的位元組被傳送到伺服器(writer.drain()
)時,在等待伺服器用一些位元組(reader.read()
)回覆時,我們知道其他程式碼可能會執行,全域變數可能會更改。然而,從函數開始到第一次等待,我們可以確保程式碼逐行運行,而不會切換到運行程式中的其他程式碼。這就是異步的美妙之處。
asyncio
是一個標準函式庫,可以讓我們用這些非同步函數做一些有趣的事情。例如,如果我們想同時向Google執行兩個請求,我們可以:
async def request_google_twice(): response_1, response_2 = await asyncio.gather(request_google(), request_google()) return response_1, response_2
當我們呼叫request_google_twice()
時,神奇的asyncio.gather
#會啟動一個函數調用,但是當我們調用時await writer.drain()
,它會開始執行第二個函數調用,這樣兩個請求就會並行發生。然後,它等待第一個或第二個請求的writer.drain()
呼叫完成並繼續執行該函數。
最後,有一個重要的細節被遺漏了:asyncio.run
。要從常規的[同步] Python 函數實際呼叫非同步函數,我們將呼叫包裝在asyncio.run(...)
:
async def async_main(): r1, r2 = await request_google_twice() print('Response one:', r1) print('Response two:', r2) return 12 return_val = asyncio.run(async_main())
請注意,如果我們只呼叫async_main()
而不呼叫await ...
或asyncio.run(...)
,則不會發生任何事情。這只是由非同步工作方式的性質所限制的。
那麼,非同步究竟是如何運作的,這些神奇的asyncio.run
和asyncio.gather
函數有什麼作用呢?閱讀下文以了解詳情。
要了解async
的魔力,我們首先需要了解一個更簡單的Python 建構:生成器
生成器是Python 函數,它逐一傳回一系列值(可迭代)。例如:
def get_numbers(): print("|| get_numbers begin") print("|| get_numbers Giving 1...") yield 1 print("|| get_numbers Giving 2...") yield 2 print("|| get_numbers Giving 3...") yield 3 print("|| get_numbers end") print("| for begin") for number in get_numbers(): print(f"| Got {number}.") print("| for end")
| for begin || get_numbers begin || get_numbers Giving 1... | Got 1. || get_numbers Giving 2... | Got 2. || get_numbers Giving 3... | Got 3. || get_numbers end | for end
因此,我們看到,對於for迴圈的每個迭代,我們在生成器中只執行一次。我們可以使用Python的next()
函數更明確地執行此迭代:
In [3]: generator = get_numbers() In [4]: next(generator) || get_numbers begin || get_numbers Giving 1... Out[4]: 1 In [5]: next(generator) || get_numbers Giving 2... Out[5]: 2 In [6]: next(generator) || get_numbers Giving 3... Out[6]: 3 In [7]: next(generator) || get_numbers end --------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-154-323ce5d717bb> in <module> ----> 1 next(generator) StopIteration:
这与异步函数的行为非常相似。正如异步函数从函数开始直到第一次等待时连续执行代码一样,我们第一次调用next()
时,生成器将从函数顶部执行到第一个yield
语句。然而,现在我们只是从生成器返回数字。我们将使用相同的思想,但返回一些不同的东西来使用生成器创建类似异步的函数。
让我们使用生成器来创建我们自己的小型异步框架。
但是,为简单起见,让我们将实际 IO 替换为睡眠(即。time.sleep
)。让我们考虑一个需要定期发送更新的应用程序:
def send_updates(count: int, interval_seconds: float): for i in range(1, count + 1): time.sleep(interval_seconds) print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))
因此,如果我们调用send_updates(3, 1.0)
,它将输出这三条消息,每条消息间隔 1 秒:
[1.0] Sending update 1/3. [1.0] Sending update 2/3. [1.0] Sending update 3/3.
现在,假设我们要同时运行几个不同的时间间隔。例如,send_updates(10, 1.0)
,send_updates(5, 2.0)
和send_updates(4, 3.0)
。我们可以使用线程来做到这一点,如下所示:
threads = [ threading.Thread(target=send_updates, args=(10, 1.0)), threading.Thread(target=send_updates, args=(5, 2.0)), threading.Thread(target=send_updates, args=(4, 3.0)) ] for i in threads: i.start() for i in threads: i.join()
这可行,在大约 12 秒内完成,但使用具有前面提到的缺点的线程。让我们使用生成器构建相同的东西。
在演示生成器的示例中,我们返回了整数。为了获得类似异步的行为,而不是返回任意值,我们希望返回一些描述要等待的IO的对象。在我们的例子中,我们的“IO”只是一个计时器,它将等待一段时间。因此,让我们创建一个计时器对象,用于此目的:
class AsyncTimer: def __init__(self, duration: float): self.done_time = time.time() + duration
现在,让我们从我们的函数中产生这个而不是调用time.sleep
:
def send_updates(count: int, interval_seconds: float): for i in range(1, count + 1): yield AsyncTimer(interval_seconds) print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))
现在,每次我们调用send_updates(...)
时调用next(...)
,我们都会得到一个AsyncTimer
对象,告诉我们直到我们应该等待什么时候:
generator = send_updates(3, 1.5) timer = next(generator) # [1.5] Sending update 1/3. print(timer.done_time - time.time()) # 1.498...
由于我们的代码现在实际上并没有调用time.sleep
,我们现在可以同时执行另一个send_updates
调用。
所以,为了把这一切放在一起,我们需要退后一步,意识到一些事情:
生成器就像部分执行的函数,等待一些 IO(计时器)。
每个部分执行的函数都有一些 IO(计时器),它在继续执行之前等待。
因此,我们程序的当前状态是每个部分执行的函数(生成器)和该函数正在等待的 IO(计时器)对的对列表
现在,要运行我们的程序,我们只需要等到某个 IO 准备就绪(即我们的一个计时器已过期),然后再向前一步执行相应的函数,得到一个阻塞该函数的新 IO。
实现此逻辑为我们提供了以下信息:
# Initialize each generator with a timer of 0 so it immediately executes generator_timer_pairs = [ (send_updates(10, 1.0), AsyncTimer(0)), (send_updates(5, 2.0), AsyncTimer(0)), (send_updates(4, 3.0), AsyncTimer(0)) ] while generator_timer_pairs: pair = min(generator_timer_pairs, key=lambda x: x[1].done_time) generator, min_timer = pair # Wait until this timer is ready time.sleep(max(0, min_timer.done_time - time.time())) del generator_timer_pairs[generator_timer_pairs.index(pair)] try: # Execute one more step of this function new_timer = next(generator) generator_timer_pairs.append((generator, new_timer)) except StopIteration: # When the function is complete pass
有了这个,我们有了一个使用生成器的类似异步函数的工作示例。请注意,当生成器完成时,它会引发StopIteration
,并且当我们不再有部分执行的函数(生成器)时,我们的函数就完成了
现在,我们把它包装在一个函数中,我们得到了类似于asyncio.run
的东西。结合asyncio.gather
运行:
def async_run_all(*generators): generator_timer_pairs = [ (generator, AsyncTimer(0)) for generator in generators ] while generator_timer_pairs: pair = min(generator_timer_pairs, key=lambda x: x[1].done_time) generator, min_timer = pair time.sleep(max(0, min_timer.done_time - time.time())) del generator_timer_pairs[generator_timer_pairs.index(pair)] try: new_timer = next(generator) generator_timer_pairs.append((generator, new_timer)) except StopIteration: pass async_run_all( send_updates(10, 1.0), send_updates(5, 2.0), send_updates(4, 3.0) )
实现我们的caveman版本的asyncio
的最后一步是支持Python 3.5中引入的async/await
语法。await
的行为类似于yield
,只是它不是直接返回提供的值,而是返回next((...).__await__())
。async
函数返回“协程”,其行为类似于生成器,但需要使用.send(None)
而不是next()
(请注意,正如生成器在最初调用时不返回任何内容一样,异步函数在逐步执行之前不会执行任何操作,这解释了我们前面提到的)。
因此,鉴于这些信息,我们只需进行一些调整即可将我们的示例转换为async/await
。以下是最终结果:
class AsyncTimer: def __init__(self, duration: float): self.done_time = time.time() + duration def __await__(self): yield self async def send_updates(count: int, interval_seconds: float): for i in range(1, count + 1): await AsyncTimer(interval_seconds) print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count)) def _wait_until_io_ready(ios): min_timer = min(ios, key=lambda x: x.done_time) time.sleep(max(0, min_timer.done_time - time.time())) return ios.index(min_timer) def async_run_all(*coroutines): coroutine_io_pairs = [ (coroutine, AsyncTimer(0)) for coroutine in coroutines ] while coroutine_io_pairs: ios = [io for cor, io in coroutine_io_pairs] ready_index = _wait_until_io_ready(ios) coroutine, _ = coroutine_io_pairs.pop(ready_index) try: new_io = coroutine.send(None) coroutine_io_pairs.append((coroutine, new_io)) except StopIteration: pass async_run_all( send_updates(10, 1.0), send_updates(5, 2.0), send_updates(4, 3.0) )
我们有了它,我们的迷你异步示例完成了,使用async/await
. 现在,您可能已经注意到我将 timer 重命名为 io 并将查找最小计时器的逻辑提取到一个名为_wait_until_io_ready
. 这是有意将这个示例与最后一个主题联系起来:真实 IO。
在這裡,我們完成了我們的小型非同步範例,使用了async/await
。現在,你可能已經注意到我將timer
重命名為io,並將用於尋找最小計時器的邏輯提取到一個名為_wait_until_io_ready
的函數中。這是為了將本範例與最後一個主題:真正的IO,連接起來。
所以,所有這些例子都很棒,但是它們與真正的asyncio 有什麼關係,我們希望在真正IO 上等待TCP套接字和檔案讀/寫?嗯,美麗就在那個_wait_until_io_ready
功能中。為了讓真正的 IO 正常運作,我們要做的就是建立一些AsyncReadFile
類似於AsyncTimer
包含檔案描述子的新物件。然後,AsyncReadFile
我們正在等待的物件集對應到一組檔案描述符。最後,我們可以使用函數 (syscall) select()等待這些檔案描述子之一來準備好。由於 TCP/UDP 套接字是使用文件描述符實現的,因此這也涵蓋了網路請求。
所以,所有這些例子都很好,但它們與真正的非同步IO有什麼關係呢?我們希望等待實際的IO,例如TCP套接字和文件讀/寫?好吧,其優點在於_wait_until_io_ready
函數。要讓真正的IO運作,我們需要做的就是建立一些新的AsyncReadFile
,類似於AsyncTimer
,它包含一個檔案描述子。然後,我們正在等待的一組AsyncReadFile
物件對應於一組檔案描述子。最後,我們可以使用函數(syscall
)select()
等待這些檔案描述子之一準備好。由於TCP/UDP套接字是使用檔案描述符實現的,因此這也涵蓋了網路請求。
以上是Python非同步方法怎麼使用的詳細內容。更多資訊請關注PHP中文網其他相關文章!