首頁  >  文章  >  後端開發  >  Python混合怎麼使用同步和非同步函數

Python混合怎麼使用同步和非同步函數

WBOY
WBOY轉載
2023-05-12 09:58:212289瀏覽

    在協程函數中呼叫同步函數

    在協程函數中直接呼叫同步函數會阻塞事件循環,進而影響整個程式的效能。我們先來看一個例子:

    以下是使用非同步 Web 框架 FastAPI 寫的一個例子,FastAPI 是比較快,但不正確的操作將會變得很慢。

    import time
    
    from fastapi import FastAPI
    
    app = FastAPI()
    
    
    @app.get("/")
    async def root():
        time.sleep(10)
        return {"message": "Hello World"}
    
    
    @app.get("/health")
    async def health():
        return {"status": "ok"}

    上面我們寫了兩個接口,假設root 接口函數耗時10 秒,在這10 秒內訪問health 接口,想一想會發生什麼?

    Python混合怎麼使用同步和非同步函數

    存取root 介面(左),立即存取health 介面(右),health 接口被阻塞,直到root 介面回傳後,health 介面才成功回應。

    time.sleep 就是一個「同步」函數,它會阻塞整個事件循環。

    如何解決呢?想想以前的處理方法,如果一個函數會阻塞主線程,那就再開一個線程讓這個阻塞函數單獨運行。所以,這裡也是同理,開一個執行緒單獨去運行那些阻塞式操作,像是讀取檔等。

    loop.run_in_executor 方法將同步函數轉換為非同步非阻塞方式進行處理。具體來說,loop.run_in_executor() 可以將同步函數建立為一個執行緒進程,並在其中執行該函數,從而避免阻塞事件循環。

    官方範例:在執行緒或進程池中執行程式碼。

    那麼,我們使用loop.run_in_executor 改寫上面例子,如下:

    import asyncio
    import time
    
    from fastapi import FastAPI
    
    app = FastAPI()
    
    
    @app.get("/")
    async def root():
        loop = asyncio.get_event_loop()
    
        def do_blocking_work():
            time.sleep(10)
            print("Done blocking work!!")
    
        await loop.run_in_executor(None, do_blocking_work)
        return {"message": "Hello World"}
    
    
    @app.get("/health")
    async def health():
        return {"status": "ok"}

    效果如下:

    Python混合怎麼使用同步和非同步函數

    root 介面被封鎖期間,health 依然正常存取互不影響。

    注意: 這裡都是為了示範,實際在使用FastAPI 開發時,你可以直接將async def root 更換成def root ,也就是將其換成同步介面函數,FastAPI 內部會自動建立執行緒處理這個同步介面函數。總的來說,FastAPI 內部也是依賴執行緒去處理同步函數來避免阻塞主執行緒(或主執行緒中的事件循環)。

    在同步函數中呼叫非同步函數

    協程只能在「事件循環」內執行,且同一時刻只能有一個協程被執行。

    所以,在同步函數中呼叫非同步函數,其本質就是將協程「丟進」事件循環中,等待該協程執行完獲取結果即可。

    以下這些函數,都可以實現這個效果:

    • asyncio.run

    • asyncio.run_coroutine_threadsafe

    • #loop.run_until_complete

    • create_task

    #create_task

    接下來,我們將一一講解這些方法並舉例說明。

    asyncio.run

    這個方法使用起來最簡單,先看下如何使用,然後緊跟著講一下哪些場景不能直接使用

    asyncio.run

    import asyncio
    
    async def do_work():
        return 1
    
    def main():
        result = asyncio.run(do_work())
        print(result)  # 1
    
    if __name__ == "__main__":
        main()

    直接

    run 就完事了,然後接受回傳值即可。 但需要,注意的是 asyncio.run

    每次呼叫都會新開一個事件循環,當結束時自動關閉該事件循環。

    一個執行緒內只存在一個事件迴圈
    ,所以如果目前執行緒已經有存在的事件循環了,就不應該使用

    asyncio.run 了,否則就會拋出如下異常:

    RuntimeError: asyncio.run() cannot be called from a running event loop

    因此,
    asyncio.run

    用作新開一個事件循環時使用。
    asyncio.run_coroutine_threadsafe

    文件: https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe

    向指定事件循環提交一個協程。 (線程安全)傳回一個 

    concurrent.futures.Future

     以等待來自其他 OS 執行緒的結果。

    換句話說,就是將協程丟給其他執行緒中的事件循環去執行

    。 ######值得注意的是這裡的「事件循環」應該是其他執行緒中的事件循環,非目前執行緒的事件循環。 ######其回傳的結果是一個future 對象,如果你需要獲取協程的執行結果可以使用###future.result()### 獲取,關於future 對象的更多介紹,請參閱https: //docs.python.org/zh-cn/3/library/concurrent.futures.html#concurrent.futures.Future###

    下方给了一个例子,一共有两个线程:thread_with_loopanother_thread,分别用于启动事件循环和调用 run_coroutine_threadsafe

    import asyncio
    import threading
    import time
    
    loop = None
    
    
    def get_loop():
        global loop
        if loop is None:
            loop = asyncio.new_event_loop()
        return loop
    
    
    def another_thread():
        async def coro_func():
            return 1
    
        loop = get_loop()
        # 将协程提交到另一个线程的事件循环中执行
        future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
        # 等待协程执行结果
        print(future.result())
        # 停止事件循环
        loop.call_soon_threadsafe(loop.stop)
    
    
    def thread_with_loop():
        loop = get_loop()
        # 启动事件循环,确保事件循环不会退出,直到 loop.stop() 被调用
        loop.run_forever()
        loop.close()
    
    
    # 启动一个线程,线程内部启动了一个事件循环
    threading.Thread(target=thread_with_loop).start()
    time.sleep(1)
    # 在主线程中启动一个协程, 并将协程提交到另一个线程的事件循环中执行
    t = threading.Thread(target=another_thread)
    t.start()
    t.join()

    loop.run_until_complete

    文档: https://docs.python.org/zh-cn/3.10/library/asyncio-eventloop.html#asyncio.loop.run_until_complete

    运行直到 future ( Future 的实例 ) 被完成。

    这个方法和 asyncio.run 类似。

    具体就是传入一个协程对象或者任务,然后可以直接拿到协程的返回值。

    run_until_complete 属于 loop 对象的方法,所以这个方法的使用前提是有一个事件循环,注意这个事件循环必须是非运行状态,如果是运行中就会抛出如下异常:

    RuntimeError: This event loop is already running

    例子:

    loop = asyncio.new_event_loop()
    loop.run_until_complete(do_async_work())

    create_task

    文档: https://docs.python.org/zh-cn/3/library/asyncio-task.html#creating-tasks

    再次准确一点:要运行一个协程函数的本质是将携带协程函数的任务提交至事件循环中,由事件循环发现、调度并执行。

    其实一共就是满足两个条件:

    • 任务;

    • 事件循环。

    我们使用 async def func 定义的函数叫做协程函数func() 这样调用之后返回的结果是协程对象,到这一步协程函数内的代码都没有被执行,直到协程对象被包装成了任务,事件循环才会“正眼看它们”。

    所以事件循环调度运行的基本单元就是任务,那为什么我们在使用 async/await 这些语句时没有涉及到任务这个概念呢?

    这是因为 await 语法糖在内部将协程对象封装成了任务,再次强调事件循环只认识任务

    所以,想要运行一个协程对象,其实就是将协程对象封装成一个任务,至于事件循环是如何发现、调度和执行的,这个我们不用关心。

    那将协程封装成的任务的方法有哪些呢?

    • asyncio.create_task

    • asyncio.ensure_future

    • loop.create_task

    看着有好几个的,没关系,我们只关心 loop.create_task,因为其他方法最终都是调用 loop.create_task

    使用起来也是很简单的,将协程对象传入,返回值是一个任务对象。

    async def do_work():
        return 222
    
    task = loop.create_task(do_work())

    do_work 会被异步执行,那么 do_work 的结果怎么获取呢,task.result() 可以吗?

    分情况:

    • 如果是在一个协程函数内使用 await task.result(),这是可以的;

    • 如果是在普通函数内则不行。你不可能立即获得协程函数的返回值,因为协程函数还没有被执行呢。

    asyncio.Task 运行使用 add_done_callback 添加完成时的回调函数,所以我们可以「曲线救国」,使用回调函数将结果添加到队列、Future 等等。

    我这里给个基于 concurrent.futures.Future 获取结果的例子,如下:

    import asyncio
    from asyncio import Task
    from concurrent.futures import Future
    
    from fastapi import FastAPI
    
    app = FastAPI()
    loop = asyncio.get_event_loop()
    
    
    async def do_work1():
        return 222
    
    
    @app.get("/")
    def root():
        # 新建一个 future 对象,用于接受结果值
        future = Future()
    
        # 提交任务至事件循环
        task = loop.create_task(do_work1())
    
        # 回调函数
        def done_callback(task: Task):
            # 设置结果
            future.set_result(task.result())
    
        # 为这个任务添加回调函数
        task.add_done_callback(done_callback)
    
        # future.result 会被阻塞,直到有结果返回为止
        return future.result()  # 222

    以上是Python混合怎麼使用同步和非同步函數的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    陳述:
    本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除