Rumah  >  Artikel  >  pembangunan bahagian belakang  >  Bagaimana untuk mencampurkan fungsi segerak dan tak segerak dalam Python

Bagaimana untuk mencampurkan fungsi segerak dan tak segerak dalam Python

WBOY
WBOYke hadapan
2023-05-12 09:58:212289semak imbas

    Fungsi penyegerakan panggilan dalam fungsi coroutine

    Panggilan terus fungsi penyegerakan dalam fungsi coroutine akan menyekat gelung acara, sekali gus menjejaskan prestasi keseluruhan program. Mari lihat contoh dahulu:

    Berikut ialah contoh yang ditulis menggunakan rangka kerja web tak segerak FastAPI agak pantas, tetapi operasi yang salah akan menjadi sangat perlahan.

    import time
    
    from fastapi import FastAPI
    
    app = FastAPI()
    
    
    @app.get("/")
    async def root():
        time.sleep(10)
        return {"message": "Hello World"}
    
    
    @app.get("/health")
    async def health():
        return {"status": "ok"}

    Kami menulis dua antara muka di atas Andaikan bahawa fungsi antara muka root mengambil masa 10 saat Jika anda mengakses antara muka health dalam masa 10 saat ini, fikirkan apa yang akan berlaku.

    Bagaimana untuk mencampurkan fungsi segerak dan tak segerak dalam Python

    mengakses antara muka root (kiri), serta-merta mengakses antara muka health (kanan) dan antara muka health disekat sehingga antara muka root kembali, health Antara muka bertindak balas dengan jayanya.

    time.sleep ialah fungsi "penyegerakan" yang menyekat keseluruhan gelung acara.

    Bagaimana untuk menyelesaikannya? Fikirkan tentang kaedah pemprosesan sebelumnya Jika fungsi menyekat utas utama, kemudian buka utas lain untuk membiarkan fungsi menyekat berjalan sendiri. Oleh itu, prinsip yang sama digunakan di sini Buka utas untuk menjalankan operasi menyekat secara berasingan, seperti membaca fail, dsb. Kaedah

    loop.run_in_executor menukar fungsi segerak kepada kaedah tidak menyekat tak segerak untuk pemprosesan. Khususnya, loop.run_in_executor() anda boleh mencipta fungsi penyegerakan sebagai benang atau proses dan laksanakan fungsi di dalamnya, dengan itu mengelakkan sekatan gelung acara.

    Contoh rasmi: Jalankan kod dalam urutan atau kumpulan proses.

    Kemudian, kami menggunakan loop.run_in_executor untuk menulis semula contoh di atas seperti berikut:

    import asyncio
    import time
    
    from fastapi import FastAPI
    
    app = FastAPI()
    
    
    @app.get("/")
    async def root():
        loop = asyncio.get_event_loop()
    
        def do_blocking_work():
            time.sleep(10)
            print("Done blocking work!!")
    
        await loop.run_in_executor(None, do_blocking_work)
        return {"message": "Hello World"}
    
    
    @app.get("/health")
    async def health():
        return {"status": "ok"}

    Kesannya adalah seperti berikut:

    Bagaimana untuk mencampurkan fungsi segerak dan tak segerak dalam Python

    root Walaupun antara muka disekat, health masih boleh mengakses secara normal tanpa menjejaskan satu sama lain.

    Nota: Ini hanya untuk demonstrasi Dalam pembangunan sebenar menggunakan FastAPI, anda boleh terus menggantikan async def root dengan def root, iaitu menggantikannya dengan fungsi Antaramuka penyegerakan. , FastAPI secara automatik akan mencipta benang secara dalaman untuk memproses fungsi antara muka segerak ini. Secara umum, FastAPI secara dalaman bergantung pada utas untuk mengendalikan fungsi penyegerakan untuk mengelak daripada menyekat utas utama (atau gelung peristiwa dalam utas utama).

    Panggil fungsi tak segerak dalam fungsi segerak

    Coroutine hanya boleh dilaksanakan dalam "gelung peristiwa", dan hanya satu coroutine boleh dilaksanakan pada masa yang sama.

    Jadi, intipati memanggil fungsi tak segerak dalam fungsi segerak adalah untuk "membuang" coroutine ke dalam gelung acara dan menunggu coroutine selesai melaksanakan untuk mendapatkan hasilnya.

    Fungsi berikut boleh mencapai kesan ini:

    • asyncio.run

    • asyncio.run_coroutine_threadsafe

    • loop.run_until_complete

    • create_task

    Seterusnya, kami akan menerangkan kaedah ini satu persatu dan memberi contoh. menggambarkan.

    asyncio.run

    Kaedah ini adalah yang paling mudah untuk digunakan asyncio.runSecara langsung

    Itu sahaja, terima sahaja nilai pulangan.

    runTetapi sila ambil perhatian bahawa

    akan membuka gelung acara baharu setiap kali ia dipanggil dan menutup gelung acara secara automatik apabila ia tamat.

    asyncio.run

    Hanya terdapat satu gelung peristiwa

    dalam urutan, jadi jika utas semasa sudah mempunyai gelung peristiwa sedia ada, tidak boleh digunakan, jika tidak, pengecualian berikut akan dilemparkan: asyncio.run

    RuntimeError: asyncio.run() tidak boleh dipanggil daripada gelung acara berjalan

    Oleh itu,
    digunakan semasa membuka gelung acara baharu.

    asyncio.runasyncio.run_coroutine_threadsafe

    Dokumentasi: https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe

    Serahkan coroutine ke gelung acara yang ditentukan. (Selamat untuk benang)
    Mengembalikan

    untuk menunggu hasil daripada urutan OS lain.
    concurrent.futures.Future

    Dalam erti kata lain,
    baling coroutine ke gelung peristiwa dalam urutan lain untuk dijalankan

    . Perlu diambil perhatian bahawa "gelung peristiwa" di sini harus menjadi gelung peristiwa dalam urutan lain, bukan gelung peristiwa pada urutan semasa.

    Hasil yang dikembalikan ialah objek masa hadapan. Jika anda perlu mendapatkan hasil pelaksanaan coroutine, anda boleh menggunakan

    untuk mendapatkan maklumat lanjut tentang objek masa hadapan, lihat https://docs. python.org/ zh-cn/3/library/concurrent.futures.html#concurrent.futures.Future

    下方给了一个例子,一共有两个线程:thread_with_loopanother_thread,分别用于启动事件循环和调用 run_coroutine_threadsafe

    import asyncio
    import threading
    import time
    
    loop = None
    
    
    def get_loop():
        global loop
        if loop is None:
            loop = asyncio.new_event_loop()
        return loop
    
    
    def another_thread():
        async def coro_func():
            return 1
    
        loop = get_loop()
        # 将协程提交到另一个线程的事件循环中执行
        future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
        # 等待协程执行结果
        print(future.result())
        # 停止事件循环
        loop.call_soon_threadsafe(loop.stop)
    
    
    def thread_with_loop():
        loop = get_loop()
        # 启动事件循环,确保事件循环不会退出,直到 loop.stop() 被调用
        loop.run_forever()
        loop.close()
    
    
    # 启动一个线程,线程内部启动了一个事件循环
    threading.Thread(target=thread_with_loop).start()
    time.sleep(1)
    # 在主线程中启动一个协程, 并将协程提交到另一个线程的事件循环中执行
    t = threading.Thread(target=another_thread)
    t.start()
    t.join()

    loop.run_until_complete

    文档: https://docs.python.org/zh-cn/3.10/library/asyncio-eventloop.html#asyncio.loop.run_until_complete

    运行直到 future ( Future 的实例 ) 被完成。

    这个方法和 asyncio.run 类似。

    具体就是传入一个协程对象或者任务,然后可以直接拿到协程的返回值。

    run_until_complete 属于 loop 对象的方法,所以这个方法的使用前提是有一个事件循环,注意这个事件循环必须是非运行状态,如果是运行中就会抛出如下异常:

    RuntimeError: This event loop is already running

    例子:

    loop = asyncio.new_event_loop()
    loop.run_until_complete(do_async_work())

    create_task

    文档: https://docs.python.org/zh-cn/3/library/asyncio-task.html#creating-tasks

    再次准确一点:要运行一个协程函数的本质是将携带协程函数的任务提交至事件循环中,由事件循环发现、调度并执行。

    其实一共就是满足两个条件:

    • 任务;

    • 事件循环。

    我们使用 async def func 定义的函数叫做协程函数func() 这样调用之后返回的结果是协程对象,到这一步协程函数内的代码都没有被执行,直到协程对象被包装成了任务,事件循环才会“正眼看它们”。

    所以事件循环调度运行的基本单元就是任务,那为什么我们在使用 async/await 这些语句时没有涉及到任务这个概念呢?

    这是因为 await 语法糖在内部将协程对象封装成了任务,再次强调事件循环只认识任务

    所以,想要运行一个协程对象,其实就是将协程对象封装成一个任务,至于事件循环是如何发现、调度和执行的,这个我们不用关心。

    那将协程封装成的任务的方法有哪些呢?

    • asyncio.create_task

    • asyncio.ensure_future

    • loop.create_task

    看着有好几个的,没关系,我们只关心 loop.create_task,因为其他方法最终都是调用 loop.create_task

    使用起来也是很简单的,将协程对象传入,返回值是一个任务对象。

    async def do_work():
        return 222
    
    task = loop.create_task(do_work())

    do_work 会被异步执行,那么 do_work 的结果怎么获取呢,task.result() 可以吗?

    分情况:

    • 如果是在一个协程函数内使用 await task.result(),这是可以的;

    • 如果是在普通函数内则不行。你不可能立即获得协程函数的返回值,因为协程函数还没有被执行呢。

    asyncio.Task 运行使用 add_done_callback 添加完成时的回调函数,所以我们可以「曲线救国」,使用回调函数将结果添加到队列、Future 等等。

    我这里给个基于 concurrent.futures.Future 获取结果的例子,如下:

    import asyncio
    from asyncio import Task
    from concurrent.futures import Future
    
    from fastapi import FastAPI
    
    app = FastAPI()
    loop = asyncio.get_event_loop()
    
    
    async def do_work1():
        return 222
    
    
    @app.get("/")
    def root():
        # 新建一个 future 对象,用于接受结果值
        future = Future()
    
        # 提交任务至事件循环
        task = loop.create_task(do_work1())
    
        # 回调函数
        def done_callback(task: Task):
            # 设置结果
            future.set_result(task.result())
    
        # 为这个任务添加回调函数
        task.add_done_callback(done_callback)
    
        # future.result 会被阻塞,直到有结果返回为止
        return future.result()  # 222

    Atas ialah kandungan terperinci Bagaimana untuk mencampurkan fungsi segerak dan tak segerak dalam Python. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

    Kenyataan:
    Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam