ホームページ  >  記事  >  バックエンド開発  >  Python で同期関数と非同期関数を混合する方法

Python で同期関数と非同期関数を混合する方法

WBOY
WBOY転載
2023-05-12 09:58:212289ブラウズ

    コルーチン関数で同期関数を呼び出す

    コルーチン関数で同期関数を直接呼び出すとイベント ループがブロックされ、そのため、プログラム全体。まず例を見てみましょう:

    次は、非同期 Web フレームワーク FastAPI を使用して記述された例です。FastAPI は比較的高速ですが、誤った操作を行うと非常に遅くなります。

    import time
    
    from fastapi import FastAPI
    
    app = FastAPI()
    
    
    @app.get("/")
    async def root():
        time.sleep(10)
        return {"message": "Hello World"}
    
    
    @app.get("/health")
    async def health():
        return {"status": "ok"}

    上記では 2 つのインターフェイスを作成しました。root インターフェイス関数には 10 秒かかると仮定します。この 10 秒の間に、health インターフェイスがアクセスされます。何が起こるでしょうか?

    Python で同期関数と非同期関数を混合する方法

    root インターフェイス (左) にアクセスし、すぐに health インターフェイス (右) にアクセスし、health インターフェイスにアクセスします。 root インターフェイスが戻るまでブロックされますが、health インターフェイスは正常に応答します。

    time.sleep は、イベント ループ全体をブロックする「同期」関数です。

    どうすれば解決できますか?前述の処理方法について考えてみましょう。関数がメイン スレッドをブロックしている場合は、別のスレッドを開いてブロックしている関数を単独で実行させます。したがって、同じ原則がここにも適用され、スレッドを開いて、ファイルの読み取りなどのブロック操作を個別に実行します。

    loop.run_in_executor メソッドは、同期関数を非同期ノンブロッキング モードに変換して処理します。具体的には、 loop.run_in_executor() 同期関数を a thread または process として作成し、その中で関数を実行することで、イベント ループのブロックを回避できます。

    正式な例: スレッドまたはプロセス プールでコードを実行します。

    次に、loop.run_in_executor を使用して、上記の例を次のように書き換えます:

    import asyncio
    import time
    
    from fastapi import FastAPI
    
    app = FastAPI()
    
    
    @app.get("/")
    async def root():
        loop = asyncio.get_event_loop()
    
        def do_blocking_work():
            time.sleep(10)
            print("Done blocking work!!")
    
        await loop.run_in_executor(None, do_blocking_work)
        return {"message": "Hello World"}
    
    
    @app.get("/health")
    async def health():
        return {"status": "ok"}

    結果は次のようになります:

    Python で同期関数と非同期関数を混合する方法

    root インターフェイスがブロックされていても、health は相互に影響を与えることなく通常どおりアクセスできます。

    注: これはすべてデモンストレーション用です。実際に FastAPI を使用して開発する場合は、async def rootdef root ## に直接置き換えることができます。 # つまり、同期インターフェイス関数に置き換えます。FastAPI は、この同期インターフェイス関数を処理するためのスレッドを内部で自動的に作成します。一般に、FastAPI は、メイン スレッド (またはメイン スレッド内のイベント ループ) のブロックを回避するために、内部的にスレッドに依存して同期関数を処理します。

    同期関数内で非同期関数を呼び出す

    コルーチンは「イベント ループ」内でのみ実行でき、同時に実行できるコルーチンは 1 つだけです。

    したがって、同期関数内で非同期関数を呼び出す本質は、コルーチンをイベント ループに「スロー」し、コルーチンが実行を終了して結果を取得するのを待つことです。

    次の関数でこの効果を実現できます:

    • asyncio.run

    • asyncio.run_coroutine_threadsafe

    • loop.run_until_complete

    • create_task

      ##次に、これらの方法を 1 つずつ説明し、例を示します。
    asyncio.run

    このメソッドは最も簡単に使用できます。まずその使用方法を見てから、どのシナリオが直接使用できないかについて説明します

    asyncio.run

    import asyncio
    
    async def do_work():
        return 1
    
    def main():
        result = asyncio.run(do_work())
        print(result)  # 1
    
    if __name__ == "__main__":
        main()
    Justrun

    が完了し、戻り値を受け入れます。

    ただし、asyncio.run

    は呼び出されるたびに新しいイベント ループを開き、イベント ループが終了すると自動的に閉じることに注意する必要があります。

    スレッドにはイベント ループ

    が 1 つだけあるため、現在のスレッドに既存のイベント ループがすでに存在する場合は、

    asyncio.run を使用しないでください。次の例外がスローされます: RuntimeError: asyncio.run() は実行中のイベント ループから呼び出すことはできません

    したがって、

    asyncio.run
    新しいイベント ループを開くときに使用されます。

    asyncio.run_coroutine_threadsafe

    ドキュメント: https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe

    指定されたイベント ループにコルーチンを送信します。 (スレッドセーフ)

    concurrent.futures.Future
    を返し、他の OS スレッドからの結果を待ちます。


    つまり、コルーチンを他のスレッドのイベント ループにスローして実行します

    ここでの「イベント ループ」は、現在のスレッドのイベント ループではなく、他のスレッドのイベント ループである必要があることに注意してください。 返された結果は、future オブジェクトです。コルーチンの実行結果を取得する必要がある場合は、

    future.result()

    を使用して取得できます。future オブジェクトの詳細については、 、https://docs.python.org/zh-cn/3/library/concurrent.futures.html#concurrent.futures.Future

    を参照してください。

    下方给了一个例子,一共有两个线程:thread_with_loopanother_thread,分别用于启动事件循环和调用 run_coroutine_threadsafe

    import asyncio
    import threading
    import time
    
    loop = None
    
    
    def get_loop():
        global loop
        if loop is None:
            loop = asyncio.new_event_loop()
        return loop
    
    
    def another_thread():
        async def coro_func():
            return 1
    
        loop = get_loop()
        # 将协程提交到另一个线程的事件循环中执行
        future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
        # 等待协程执行结果
        print(future.result())
        # 停止事件循环
        loop.call_soon_threadsafe(loop.stop)
    
    
    def thread_with_loop():
        loop = get_loop()
        # 启动事件循环,确保事件循环不会退出,直到 loop.stop() 被调用
        loop.run_forever()
        loop.close()
    
    
    # 启动一个线程,线程内部启动了一个事件循环
    threading.Thread(target=thread_with_loop).start()
    time.sleep(1)
    # 在主线程中启动一个协程, 并将协程提交到另一个线程的事件循环中执行
    t = threading.Thread(target=another_thread)
    t.start()
    t.join()

    loop.run_until_complete

    文档: https://docs.python.org/zh-cn/3.10/library/asyncio-eventloop.html#asyncio.loop.run_until_complete

    运行直到 future ( Future 的实例 ) 被完成。

    这个方法和 asyncio.run 类似。

    具体就是传入一个协程对象或者任务,然后可以直接拿到协程的返回值。

    run_until_complete 属于 loop 对象的方法,所以这个方法的使用前提是有一个事件循环,注意这个事件循环必须是非运行状态,如果是运行中就会抛出如下异常:

    RuntimeError: This event loop is already running

    例子:

    loop = asyncio.new_event_loop()
    loop.run_until_complete(do_async_work())

    create_task

    文档: https://docs.python.org/zh-cn/3/library/asyncio-task.html#creating-tasks

    再次准确一点:要运行一个协程函数的本质是将携带协程函数的任务提交至事件循环中,由事件循环发现、调度并执行。

    其实一共就是满足两个条件:

    • 任务;

    • 事件循环。

    我们使用 async def func 定义的函数叫做协程函数func() 这样调用之后返回的结果是协程对象,到这一步协程函数内的代码都没有被执行,直到协程对象被包装成了任务,事件循环才会“正眼看它们”。

    所以事件循环调度运行的基本单元就是任务,那为什么我们在使用 async/await 这些语句时没有涉及到任务这个概念呢?

    这是因为 await 语法糖在内部将协程对象封装成了任务,再次强调事件循环只认识任务

    所以,想要运行一个协程对象,其实就是将协程对象封装成一个任务,至于事件循环是如何发现、调度和执行的,这个我们不用关心。

    那将协程封装成的任务的方法有哪些呢?

    • asyncio.create_task

    • asyncio.ensure_future

    • loop.create_task

    看着有好几个的,没关系,我们只关心 loop.create_task,因为其他方法最终都是调用 loop.create_task

    使用起来也是很简单的,将协程对象传入,返回值是一个任务对象。

    async def do_work():
        return 222
    
    task = loop.create_task(do_work())

    do_work 会被异步执行,那么 do_work 的结果怎么获取呢,task.result() 可以吗?

    分情况:

    • 如果是在一个协程函数内使用 await task.result(),这是可以的;

    • 如果是在普通函数内则不行。你不可能立即获得协程函数的返回值,因为协程函数还没有被执行呢。

    asyncio.Task 运行使用 add_done_callback 添加完成时的回调函数,所以我们可以「曲线救国」,使用回调函数将结果添加到队列、Future 等等。

    我这里给个基于 concurrent.futures.Future 获取结果的例子,如下:

    import asyncio
    from asyncio import Task
    from concurrent.futures import Future
    
    from fastapi import FastAPI
    
    app = FastAPI()
    loop = asyncio.get_event_loop()
    
    
    async def do_work1():
        return 222
    
    
    @app.get("/")
    def root():
        # 新建一个 future 对象,用于接受结果值
        future = Future()
    
        # 提交任务至事件循环
        task = loop.create_task(do_work1())
    
        # 回调函数
        def done_callback(task: Task):
            # 设置结果
            future.set_result(task.result())
    
        # 为这个任务添加回调函数
        task.add_done_callback(done_callback)
    
        # future.result 会被阻塞,直到有结果返回为止
        return future.result()  # 222

    以上がPython で同期関数と非同期関数を混合する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。