コルーチン関数で同期関数を直接呼び出すとイベント ループがブロックされ、そのため、プログラム全体。まず例を見てみましょう:
次は、非同期 Web フレームワーク FastAPI を使用して記述された例です。FastAPI は比較的高速ですが、誤った操作を行うと非常に遅くなります。
import time from fastapi import FastAPI app = FastAPI() @app.get("/") async def root(): time.sleep(10) return {"message": "Hello World"} @app.get("/health") async def health(): return {"status": "ok"}
上記では 2 つのインターフェイスを作成しました。root
インターフェイス関数には 10 秒かかると仮定します。この 10 秒の間に、health
インターフェイスがアクセスされます。何が起こるでしょうか?
root
インターフェイス (左) にアクセスし、すぐに health
インターフェイス (右) にアクセスし、health
インターフェイスにアクセスします。 root
インターフェイスが戻るまでブロックされますが、health
インターフェイスは正常に応答します。
time.sleep
は、イベント ループ全体をブロックする「同期」関数です。
どうすれば解決できますか?前述の処理方法について考えてみましょう。関数がメイン スレッドをブロックしている場合は、別のスレッドを開いてブロックしている関数を単独で実行させます。したがって、同じ原則がここにも適用され、スレッドを開いて、ファイルの読み取りなどのブロック操作を個別に実行します。
loop.run_in_executor
メソッドは、同期関数を非同期ノンブロッキング モードに変換して処理します。具体的には、 loop.run_in_executor()
同期関数を a thread または process として作成し、その中で関数を実行することで、イベント ループのブロックを回避できます。
正式な例: スレッドまたはプロセス プールでコードを実行します。
次に、loop.run_in_executor
を使用して、上記の例を次のように書き換えます:
import asyncio import time from fastapi import FastAPI app = FastAPI() @app.get("/") async def root(): loop = asyncio.get_event_loop() def do_blocking_work(): time.sleep(10) print("Done blocking work!!") await loop.run_in_executor(None, do_blocking_work) return {"message": "Hello World"} @app.get("/health") async def health(): return {"status": "ok"}
結果は次のようになります:
root
インターフェイスがブロックされていても、health
は相互に影響を与えることなく通常どおりアクセスできます。
同期関数内で非同期関数を呼び出すコルーチンは「イベント ループ」内でのみ実行でき、同時に実行できるコルーチンは 1 つだけです。 したがって、同期関数内で非同期関数を呼び出す本質は、コルーチンをイベント ループに「スロー」し、コルーチンが実行を終了して結果を取得するのを待つことです。 次の関数でこの効果を実現できます:注: これはすべてデモンストレーション用です。実際に FastAPI を使用して開発する場合は、
async def root
をdef root ## に直接置き換えることができます。 # つまり、同期インターフェイス関数に置き換えます。FastAPI は、この同期インターフェイス関数を処理するためのスレッドを内部で自動的に作成します。一般に、FastAPI は、メイン スレッド (またはメイン スレッド内のイベント ループ) のブロックを回避するために、内部的にスレッドに依存して同期関数を処理します。
asyncio.run
asyncio.run_coroutine_threadsafe
loop.run_until_complete
create_task
import asyncio async def do_work(): return 1 def main(): result = asyncio.run(do_work()) print(result) # 1 if __name__ == "__main__": main()
Just
run が完了し、戻り値を受け入れます。 ただし、
asyncio.run
スレッドにはイベント ループ
asyncio.run を使用しないでください。次の例外がスローされます: RuntimeError: asyncio.run() は実行中のイベント ループから呼び出すことはできません
新しいイベント ループを開くときに使用されます。したがって、
asyncio.run
asyncio.run_coroutine_threadsafe
指定されたイベント ループにコルーチンを送信します。 (スレッドセーフ)
concurrent.futures.Futureを返し、他の OS スレッドからの結果を待ちます。。
つまり、
コルーチンを他のスレッドのイベント ループにスローして実行します
ここでの「イベント ループ」は、現在のスレッドのイベント ループではなく、他のスレッドのイベント ループである必要があることに注意してください。 返された結果は、future オブジェクトです。コルーチンの実行結果を取得する必要がある場合は、
future.result()を使用して取得できます。future オブジェクトの詳細については、 、https://docs.python.org/zh-cn/3/library/concurrent.futures.html#concurrent.futures.Future
を参照してください。下方给了一个例子,一共有两个线程:thread_with_loop
和 another_thread
,分别用于启动事件循环和调用 run_coroutine_threadsafe
import asyncio import threading import time loop = None def get_loop(): global loop if loop is None: loop = asyncio.new_event_loop() return loop def another_thread(): async def coro_func(): return 1 loop = get_loop() # 将协程提交到另一个线程的事件循环中执行 future = asyncio.run_coroutine_threadsafe(coro_func(), loop) # 等待协程执行结果 print(future.result()) # 停止事件循环 loop.call_soon_threadsafe(loop.stop) def thread_with_loop(): loop = get_loop() # 启动事件循环,确保事件循环不会退出,直到 loop.stop() 被调用 loop.run_forever() loop.close() # 启动一个线程,线程内部启动了一个事件循环 threading.Thread(target=thread_with_loop).start() time.sleep(1) # 在主线程中启动一个协程, 并将协程提交到另一个线程的事件循环中执行 t = threading.Thread(target=another_thread) t.start() t.join()
文档: https://docs.python.org/zh-cn/3.10/library/asyncio-eventloop.html#asyncio.loop.run_until_complete
运行直到 future (
Future
的实例 ) 被完成。
这个方法和
asyncio.run
类似。
具体就是传入一个协程对象或者任务,然后可以直接拿到协程的返回值。
run_until_complete
属于 loop
对象的方法,所以这个方法的使用前提是有一个事件循环,注意这个事件循环必须是非运行状态,如果是运行中就会抛出如下异常:
RuntimeError: This event loop is already running
例子:
loop = asyncio.new_event_loop() loop.run_until_complete(do_async_work())
文档: https://docs.python.org/zh-cn/3/library/asyncio-task.html#creating-tasks
再次准确一点:要运行一个协程函数的本质是将携带协程函数的任务提交至事件循环中,由事件循环发现、调度并执行。
其实一共就是满足两个条件:
任务;
事件循环。
我们使用 async def func
定义的函数叫做协程函数,func()
这样调用之后返回的结果是协程对象,到这一步协程函数内的代码都没有被执行,直到协程对象被包装成了任务,事件循环才会“正眼看它们”。
所以事件循环调度运行的基本单元就是任务,那为什么我们在使用 async/await
这些语句时没有涉及到任务这个概念呢?
这是因为 await
语法糖在内部将协程对象封装成了任务,再次强调事件循环只认识任务。
所以,想要运行一个协程对象,其实就是将协程对象封装成一个任务,至于事件循环是如何发现、调度和执行的,这个我们不用关心。
那将协程封装成的任务的方法有哪些呢?
asyncio.create_task
asyncio.ensure_future
loop.create_task
看着有好几个的,没关系,我们只关心 loop.create_task
,因为其他方法最终都是调用 loop.create_task
。
使用起来也是很简单的,将协程对象传入,返回值是一个任务对象。
async def do_work(): return 222 task = loop.create_task(do_work())
do_work
会被异步执行,那么 do_work 的结果怎么获取呢,task.result()
可以吗?
分情况:
如果是在一个协程函数内使用 await task.result()
,这是可以的;
如果是在普通函数内则不行。你不可能立即获得协程函数的返回值,因为协程函数还没有被执行呢。
asyncio.Task 运行使用 add_done_callback 添加完成时的回调函数,所以我们可以「曲线救国」,使用回调函数将结果添加到队列、Future 等等。
我这里给个基于 concurrent.futures.Future
获取结果的例子,如下:
import asyncio from asyncio import Task from concurrent.futures import Future from fastapi import FastAPI app = FastAPI() loop = asyncio.get_event_loop() async def do_work1(): return 222 @app.get("/") def root(): # 新建一个 future 对象,用于接受结果值 future = Future() # 提交任务至事件循环 task = loop.create_task(do_work1()) # 回调函数 def done_callback(task: Task): # 设置结果 future.set_result(task.result()) # 为这个任务添加回调函数 task.add_done_callback(done_callback) # future.result 会被阻塞,直到有结果返回为止 return future.result() # 222
以上がPython で同期関数と非同期関数を混合する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。