코루틴 함수에서 동기화 함수를 직접 호출하면 이벤트 루프가 차단되어 전체 프로그램 성능에 영향을 미칩니다. 먼저 예제를 살펴보겠습니다.
다음은 비동기 웹 프레임워크 FastAPI를 사용하여 작성된 예제입니다. FastAPI는 비교적 빠르지만 잘못된 작업은 매우 느려집니다.
import time from fastapi import FastAPI app = FastAPI() @app.get("/") async def root(): time.sleep(10) return {"message": "Hello World"} @app.get("/health") async def health(): return {"status": "ok"}
위에서 두 개의 인터페이스를 작성했습니다. root
인터페이스 함수가 10초 내에 health
인터페이스에 액세스하면 어떤 일이 일어날지 생각해 보세요. root
接口函数耗时 10 秒,在这 10 秒内访问 health
接口,想一想会发生什么?
访问 root
接口(左),立即访问 health
接口(右),health
接口被阻塞,直至 root
接口返回后,health
接口才成功响应。
time.sleep
就是一个「同步」函数,它会阻塞整个事件循环。
如何解决呢?想一想以前的处理方法,如果一个函数会阻塞主线程,那么就再开一个线程让这个阻塞函数单独运行。所以,这里也是同理,开一个线程单独去运行那些阻塞式操作,比如读取文件等。
loop.run_in_executor
方法将同步函数转换为异步非阻塞方式进行处理。具体来说,loop.run_in_executor()
可以将同步函数创建为一个线程或进程,并在其中执行该函数,从而避免阻塞事件循环。
官方例子:在线程或者进程池中执行代码。
那么,我们使用 loop.run_in_executor
改写上面例子,如下:
import asyncio import time from fastapi import FastAPI app = FastAPI() @app.get("/") async def root(): loop = asyncio.get_event_loop() def do_blocking_work(): time.sleep(10) print("Done blocking work!!") await loop.run_in_executor(None, do_blocking_work) return {"message": "Hello World"} @app.get("/health") async def health(): return {"status": "ok"}
效果如下:
root
接口被阻塞期间,health
依然正常访问互不影响。
注意: 这里都是为了演示,实际在使用 FastAPI 开发时,你可以直接将
async def root
更换成def root
,也就是将其换成同步接口函数,FastAPI 内部会自动创建线程处理这个同步接口函数。总的来说,FastAPI 内部也是依靠线程去处理同步函数从而避免阻塞主线程(或主线程中的事件循环)。
协程只能在「事件循环」内被执行,且同一时刻只能有一个协程被执行。
所以,在同步函数中调用异步函数,其本质就是将协程「扔进」事件循环中,等待该协程执行完获取结果即可。
以下这些函数,都可以实现这个效果:
asyncio.run
asyncio.run_coroutine_threadsafe
loop.run_until_complete
create_task
接下来,我们将一一讲解这些方法并举例说明。
这个方法使用起来最简单,先看下如何使用,然后紧跟着讲一下哪些场景不能直接使用 asyncio.run
import asyncio async def do_work(): return 1 def main(): result = asyncio.run(do_work()) print(result) # 1 if __name__ == "__main__": main()
直接 run
就完事了,然后接受返回值即可。
但是需要,注意的是 asyncio.run
每次调用都会新开一个事件循环,当结束时自动关闭该事件循环。
一个线程内只存在一个事件循环,所以如果当前线程已经有存在的事件循环了,就不应该使用 asyncio.run
了,否则就会抛出如下异常:
RuntimeError: asyncio.run() cannot be called from a running event loop
因此,asyncio.run
用作新开一个事件循环时使用。
文档: https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe
向指定事件循环提交一个协程。(线程安全)
返回一个concurrent.futures.Future
以等待来自其他 OS 线程的结果。
换句话说,就是将协程丢给其他线程中的事件循环去运行。
值得注意的是这里的「事件循环」应该是其他线程中的事件循环,非当前线程的事件循环。
其返回的结果是一个 future 对象,如果你需要获取协程的执行结果可以使用 future.result()
health
인터페이스에 즉시 액세스(오른쪽), health
인터페이스는 root
가 될 때까지 차단됩니다. 인터페이스가 를 반환하면 health
인터페이스가 성공적으로 응답합니다. 🎜🎜time.sleep
은 전체 이벤트 루프를 차단하는 "동기화" 함수입니다. 🎜🎜어떻게 해결하나요? 이전 처리 방법을 생각해 보세요. 함수가 기본 스레드를 차단하는 경우 다른 스레드를 열어 차단 기능이 단독으로 실행되도록 하세요. 따라서 여기에도 동일한 원칙이 적용됩니다. 파일 읽기 등의 차단 작업을 별도로 실행하려면 스레드를 엽니다. 🎜🎜loop.run_in_executor
메서드는 처리를 위해 동기 함수를 비동기 비차단 메서드로 변환합니다. 특히 loop.run_in_executor()
는 동기화 함수를 스레드 또는 프로세스로 생성하고 그 안에 있는 함수를 실행하여 이벤트 루프 차단을 방지할 수 있습니다. . 🎜🎜공식 예: 스레드 또는 프로세스 풀에서 코드를 실행합니다. 🎜🎜그런 다음
loop.run_in_executor
를 사용하여 위의 예를 다음과 같이 다시 작성합니다. 🎜import asyncio import threading import time loop = None def get_loop(): global loop if loop is None: loop = asyncio.new_event_loop() return loop def another_thread(): async def coro_func(): return 1 loop = get_loop() # 将协程提交到另一个线程的事件循环中执行 future = asyncio.run_coroutine_threadsafe(coro_func(), loop) # 等待协程执行结果 print(future.result()) # 停止事件循环 loop.call_soon_threadsafe(loop.stop) def thread_with_loop(): loop = get_loop() # 启动事件循环,确保事件循环不会退出,直到 loop.stop() 被调用 loop.run_forever() loop.close() # 启动一个线程,线程内部启动了一个事件循环 threading.Thread(target=thread_with_loop).start() time.sleep(1) # 在主线程中启动一个协程, 并将协程提交到另一个线程的事件循环中执行 t = threading.Thread(target=another_thread) t.start() t.join()🎜효과는 다음과 같습니다. 🎜🎜🎜🎜
root
인터페이스가 차단되는 동안 health 일반 액세스는 여전히 서로 영향을 미치지 않습니다. 🎜🎜참고: 이는 모두 데모용입니다. FastAPI를 사용한 실제 개발에서는🎜동기 함수에서 비동기 함수 호출🎜🎜코루틴은 "이벤트 루프" 내에서만 실행될 수 있으며 동시에 하나의 코루틴만 실행할 수 있습니다. 🎜🎜따라서 동기 함수에서 비동기 함수를 호출하는 핵심은 코루틴을 이벤트 루프에 "던지고" 코루틴이 실행을 완료하여 결과를 얻을 때까지 기다리는 것입니다. 🎜🎜다음 함수를 사용하면 이 효과를 얻을 수 있습니다. 🎜async def root
를def root
로 직접 바꿀 수 있습니다. 즉, FastAPI는 이 동기 인터페이스 기능을 처리하기 위해 내부적으로 스레드를 자동으로 생성합니다. 일반적으로 FastAPI는 메인 스레드(또는 메인 스레드의 이벤트 루프) 차단을 방지하기 위해 내부적으로 스레드를 사용하여 동기화 기능을 처리합니다. 🎜
asyncio.run
🎜asyncio .run_coroutine_threadsafe
🎜loop.run_until_complete
🎜create_task
🎜asyncio.run
🎜을 직접 사용할 수 없는 시나리오에 대해 이야기해 보겠습니다. loop = asyncio.new_event_loop() loop.run_until_complete(do_async_work())🎜직접
run
이 완료된 후, 반환 값을 받아들입니다. 🎜🎜하지만 asyncio.run
은 호출될 때마다 새 이벤트 루프를 열고, 종료되면 자동으로 이벤트 루프를 닫는다는 점에 유의해야 합니다. 🎜🎜스레드에는 이벤트 루프가 하나만 있습니다. 따라서 현재 스레드에 이미 기존 이벤트 루프가 있는 경우 asyncio.run
을 사용하면 안 됩니다. 그렇지 않으면 오류가 발생합니다. 다음 예외가 발생합니다: 🎜🎜RuntimeError: asyncio.run()은 실행 중인 이벤트 루프에서 호출할 수 없습니다🎜🎜따라서 새 이벤트를 열 때
asyncio.run
이 사용됩니다. 루프 사용. 🎜🎜지정된 이벤트로 loop 코루틴을 제출합니다. (스레드로부터 안전함)🎜즉, 코루틴을 다른 스레드의 이벤트 루프에 던져서 실행하세요. 🎜🎜여기서 "이벤트 루프"는 현재 스레드의 이벤트 루프가 아니라 다른 스레드의 이벤트 루프여야 한다는 점에 주목할 가치가 있습니다. 🎜🎜반환된 결과는 future 객체입니다. 코루틴의 실행 결과를 얻으려면
다른 OS 스레드의 결과를 기다리려면concurrent.futures.Future
를 반환하세요. 🎜
future.result()
를 사용하여 future 객체를 얻을 수 있습니다. //docs .python.org/zh-cn/3/library/concurrent.futures.html#concurrent.futures.Future🎜下方给了一个例子,一共有两个线程:thread_with_loop
和 another_thread
,分别用于启动事件循环和调用 run_coroutine_threadsafe
import asyncio import threading import time loop = None def get_loop(): global loop if loop is None: loop = asyncio.new_event_loop() return loop def another_thread(): async def coro_func(): return 1 loop = get_loop() # 将协程提交到另一个线程的事件循环中执行 future = asyncio.run_coroutine_threadsafe(coro_func(), loop) # 等待协程执行结果 print(future.result()) # 停止事件循环 loop.call_soon_threadsafe(loop.stop) def thread_with_loop(): loop = get_loop() # 启动事件循环,确保事件循环不会退出,直到 loop.stop() 被调用 loop.run_forever() loop.close() # 启动一个线程,线程内部启动了一个事件循环 threading.Thread(target=thread_with_loop).start() time.sleep(1) # 在主线程中启动一个协程, 并将协程提交到另一个线程的事件循环中执行 t = threading.Thread(target=another_thread) t.start() t.join()
文档: https://docs.python.org/zh-cn/3.10/library/asyncio-eventloop.html#asyncio.loop.run_until_complete
运行直到 future (
Future
的实例 ) 被完成。
这个方法和
asyncio.run
类似。
具体就是传入一个协程对象或者任务,然后可以直接拿到协程的返回值。
run_until_complete
属于 loop
对象的方法,所以这个方法的使用前提是有一个事件循环,注意这个事件循环必须是非运行状态,如果是运行中就会抛出如下异常:
RuntimeError: This event loop is already running
例子:
loop = asyncio.new_event_loop() loop.run_until_complete(do_async_work())
文档: https://docs.python.org/zh-cn/3/library/asyncio-task.html#creating-tasks
再次准确一点:要运行一个协程函数的本质是将携带协程函数的任务提交至事件循环中,由事件循环发现、调度并执行。
其实一共就是满足两个条件:
任务;
事件循环。
我们使用 async def func
定义的函数叫做协程函数,func()
这样调用之后返回的结果是协程对象,到这一步协程函数内的代码都没有被执行,直到协程对象被包装成了任务,事件循环才会“正眼看它们”。
所以事件循环调度运行的基本单元就是任务,那为什么我们在使用 async/await
这些语句时没有涉及到任务这个概念呢?
这是因为 await
语法糖在内部将协程对象封装成了任务,再次强调事件循环只认识任务。
所以,想要运行一个协程对象,其实就是将协程对象封装成一个任务,至于事件循环是如何发现、调度和执行的,这个我们不用关心。
那将协程封装成的任务的方法有哪些呢?
asyncio.create_task
asyncio.ensure_future
loop.create_task
看着有好几个的,没关系,我们只关心 loop.create_task
,因为其他方法最终都是调用 loop.create_task
。
使用起来也是很简单的,将协程对象传入,返回值是一个任务对象。
async def do_work(): return 222 task = loop.create_task(do_work())
do_work
会被异步执行,那么 do_work 的结果怎么获取呢,task.result()
可以吗?
分情况:
如果是在一个协程函数内使用 await task.result()
,这是可以的;
如果是在普通函数内则不行。你不可能立即获得协程函数的返回值,因为协程函数还没有被执行呢。
asyncio.Task 运行使用 add_done_callback 添加完成时的回调函数,所以我们可以「曲线救国」,使用回调函数将结果添加到队列、Future 等等。
我这里给个基于 concurrent.futures.Future
获取结果的例子,如下:
import asyncio from asyncio import Task from concurrent.futures import Future from fastapi import FastAPI app = FastAPI() loop = asyncio.get_event_loop() async def do_work1(): return 222 @app.get("/") def root(): # 新建一个 future 对象,用于接受结果值 future = Future() # 提交任务至事件循环 task = loop.create_task(do_work1()) # 回调函数 def done_callback(task: Task): # 设置结果 future.set_result(task.result()) # 为这个任务添加回调函数 task.add_done_callback(done_callback) # future.result 会被阻塞,直到有结果返回为止 return future.result() # 222
위 내용은 Python에서 동기 및 비동기 함수를 혼합하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!