In "Python Asyncio Scheduling Principles", the two basic scheduling units of Asyncio are introduced, Handler and TimeHandler, they can only be called by the loop.call_xx function. Developers do not know their existence on the surface. They and loop.call_xx belong to the basic functions of the event loop, but these operations They all belong to a single operation, and developers need to write their own code to connect their operations in series. In "The Role of Python's Waitable Objects in Asyncio", it is introduced that the initiator of the coroutine chain asyncio.Task can interact with the event loop through loop.call_soon and connect in series Waitable objects in the entire coroutine chain and schedule the execution of waitable objects. However, for loop.call_at and loop.call_later, developers still need to use asyncio.Future to compare the execution results of Timehandler with asyncio.Task are connected in series, such as the code implementation of sleeping for one second:

import asyncio

async def main():
    loop = asyncio.get_event_loop()
    f = asyncio.Future()

    def _on_complete():

    loop.call_later(1, _on_complete)
    return await f

if __name__ == "__main__":
    import time
    s_t = time.time()
    print(time.time() - s_t)

In this code, asyncio.Future executes a container-like function, and it will Accepts various states and synchronizes its own state to asyncio.Task that manages the current coroutine chain, so that asyncio.Task can manage other types of operations. The principles of all functional functions in the asyncio.tasks module are similar. The parameters they accept are basically waitable objects, and then asyncio.Futurte is used as a container to synchronize the calling end. With the status of the waitable object, you can also synchronize the status of asyncio.Task to the waitable object through other methods.

1. Sleep--asyncio.sleep

asyncio.sleep is a commonly used method through which developers can easily set the coroutine to sleep. Time itself is also very simple. Its source code is as follows:

def __sleep0():

async def sleep(delay, result=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay <= 0:
        await __sleep0()
        return result

    loop = events.get_running_loop()
    future = loop.create_future()
    h = loop.call_later(delay,
                        future, result)
        return await future

Through the source code, you can find that when the set sleep time is equal to less than 0, sleep only executes yield, no other logic will be executed, and a Future object will be created when the value is greater than 0, and then it will wait until the Future object is loop.call_laterThe result value is returned only after the control is completed.

It should be noted that when asyncio.sleep is 0, sleepexecuting yield can make Task.__step Perception and giving up control is the smallest way to give up control of the current coroutine, so we can use asyncio when writing functions that involve a lot of CPU or take a long time. sleep(0) to actively give up control, as follows:

import asyncio

async def demo() -> None:
    for index, i in enumerate(range(10000)):
        if index % 100 == 0:
            await asyncio.sleep(0)
        ...  # 假设这里的代码占用过多的CPU时间

In this example, give up control every 100 times in the loop to reduce the impact on other coroutines.

2. Shield cancellation--asyncio.shield

asyncio.shield can protect a waitable object from being canceled, or prevent cancellation propagation on the coroutine chain To a waitable object managed by asyncio.shield, but calling the cancel method of the waitable object can still cancel the operation of the waitable object, as in the following example:

import asyncio

async def sub(f):
    await asyncio.shield(f)

async def main():
    f1 = asyncio.Future()
    f2 = asyncio.Future()
    sub1 = asyncio.create_task(sub(f1))
    sub2 = asyncio.create_task(sub(f2))
    await asyncio.sleep(0)  # 确保已经取消完成
    print("f1 future run success:", f1.done())
    print("f2 future run success:", f2.done())
    print("sub1 future run result:", sub1.done())
    print("sub2 future run result:", sub2.done())


# >>> future run success: True
# >>> future run success: False
# >>> sub1 future run result: True
# >>> sub2 future run result: True

Among them, f1, f2 are all created in the main function, and then wrapped by the sub function at the same time, and passed through asyncio.create_task Run asynchronously in the background and return sub1 and sub2 respectively. Two Future correspond to the execution status of the sub function. Then cancel the execution of f1 and sub2 respectively, and replace f1,f2,sub1, When printing out whether sub2 is done, you can find that the status of f1, sub1, sub2 is all done (cancellation is also considered done), while f2 is still running.

As mentioned in the article "The Role of Python's Waitable Objects in Asyncio", a coroutine chain is led by asyncio.Task, and all subsequent successes and exceptions will be in It is propagated on this chain, and cancellation is essentially an exception, so it can also be propagated on the coroutine chain. In order to prevent the running waitable object from receiving exceptions from the coroutine chain and let the coroutine chain know the execution result of the waitable object, the waitable object will first be run in another coroutine chain. , then create a container and connect it to the original chain, and tell the container the result when the waitable object is completed, and the container will propagate the result to the original coroutine chain. The corresponding source code is as follows: <pre class="brush:py;">def shield(arg): # 如果是Coro,则需要包装成future inner = _ensure_future(arg) if inner.done(): # 如果已经完成,就不需要被处理了 return inner loop = futures._get_loop(inner) # 创建一个future容器 outer = loop.create_future() def _inner_done_callback(inner): if outer.cancelled(): if not inner.cancelled(): # 如果容器已经被取消,而自己没被取消且已经完成,则手动获取下结果,方便被回收 inner.exception() return if inner.cancelled(): # 如果自己被取消,则把取消通过容器传播到协程链上 outer.cancel() else: # 自己已经完成且容器未完成,把自己的结果或者异常通过替身传播到协程链上 exc = inner.exception() if exc is not None: outer.set_exception(exc) else: outer.set_result(inner.result()) def _outer_done_callback(outer): if not inner.done(): inner.remove_done_callback(_inner_done_callback) # 添加回调,在执行成功或被取消时通知对方 inner.add_done_callback(_inner_done_callback) outer.add_done_callback(_outer_done_callback) return outer</pre><p>通过源码可以发现<code>shield被调用的时候(假设驱动调用shieldTask名为main.Task),会先通过_ensure_future辅助函数创建一个Task(other.Task)在后台异步运行可等待对象,驱动可等待对象的运行,由于是新的Task驱动着可等待对象的执行,所以main.Task的任何状态不会传播到当前的可等待对象。 接着创建一个Future容器,并在other.TaskFuture容器挂上完成的回调使他们在完成的时候都能通知到对方,最后返回Future容器给main.Task,使main.Task能够间接的知道可等待对象的运行结果,如下图:

What are the commonly used functions of asyncio.task in Python Asyncio library?

不过Future容器完成的回调只是把托管可等待对象的other.Task回调给移除了,导致main.Task的状态不会同步到other.Task中(图中Future通知可等待对象aws的通道是不通的),进而不会影响到托管的可等待对象。 而other.Task完成的回调会把任何状态同步到Future中,进而影响到main.Task


asyncio.wait_for可以托管可等待对象,直到可等待对象完成,不过可等待对象在设定的时间内还没执行完成时会被直接取消执行并抛出asyncio.TimeoutError异常。 它的运行原理综合了上面的asyncio.shieldasyncio.sleep,它一样会为可等待对象创建一个Future容器,并在容器上挂了一个超时的回调和可等待对象执行结束的回调,接着就等待容器执行结束。 不过在了解asyncio.wait_for之前,先了解他用到的两个辅助函数_cancel_and_wait_release_waiter,他们的源码如下:

def _release_waiter(waiter, *args):
    if not waiter.done():

async def _cancel_and_wait(fut, loop):
    waiter = loop.create_future()
    cb = functools.partial(_release_waiter, waiter)

        await waiter




async def wait_for(fut, timeout):
    loop = events.get_running_loop()

    if timeout is None:
        return await fut

    if timeout <= 0:
        # 当超时的值小于等于0时就意味着想马上得到结果
        fut = ensure_future(fut, loop=loop)

        if fut.done():
            # 如果执行完成就返回可等待对象的数据
            return fut.result()
        # 取消可等待对象并等待
        await _cancel_and_wait(fut, loop=loop)
        # 如果被_cancel_and_wait取消,那么会抛出CancelledError异常,这时候把它转为超时异常
            return fut.result()
        except exceptions.CancelledError as exc:
            raise exceptions.TimeoutError() from exc

    # 初始化一个Future,只有在超时和完成时才会变为done
    waiter = loop.create_future()
    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
    cb = functools.partial(_release_waiter, waiter)

    fut = ensure_future(fut, loop=loop)

            await waiter
        except exceptions.CancelledError:
            # 此时是asyncio.Task被取消,并把取消传播到waiter
            if fut.done():
                return fut.result()
                # 如果任务被取消了,那么需要确保任务没有被执行才返回
                await _cancel_and_wait(fut, loop=loop)
        # 计时结束或者是执行完毕的情况
        if fut.done():
            # 执行完毕,返回对应的值
            return fut.result()
            # 计时结束,清理资源,并抛出异常
            # 如果任务被取消了,那么需要确保任务没有被执行才返回
            await _cancel_and_wait(fut, loop=loop)
            # 如果被_cancel_and_wait取消,那么会抛出CancelledError异常,这时候把它转为超时异常
                return fut.result()
            except exceptions.CancelledError as exc:
                raise exceptions.TimeoutError() from exc


What are the commonly used functions of asyncio.task in Python Asyncio library?





import asyncio

async def main():
    return await asyncio.wait(

if __name__ == "__main__":

这段代码可以正常的运作,不会抛出超时错,不过还要注意的是在后续版本中asyncio.wait只支持Task对象,如果想要传入的是coroFuture对象,则需要开发者自己手动转换。 wait的逻辑与wait_for类似,源码如下:

async def _wait(fs, timeout, return_when, loop):
    assert fs, &#39;Set of Futures is empty.&#39;
    waiter = loop.create_future()
    timeout_handle = None
    if timeout is not None:
        # 定义一个time handler,在timeout秒后通过`_release_waiter`完成.
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
    counter = len(fs)

    def _on_completion(f):
        # 每个可等待对象执行完成的回调
        nonlocal counter
        counter -= 1
        if (counter <= 0 or
            return_when == FIRST_COMPLETED or
            return_when == FIRST_EXCEPTION and
             (not f.cancelled() and f.exception() is not None)
            # 如果所有任务执行完成,或者是第一个完成或者是第一个抛出异常时,
            # 意味着执行完成,需要取消time handler,并标记为完成
            if timeout_handle is not None:
            if not waiter.done():
    # 为每个可等待对象添加回调
    for f in fs:

        # 等待替身执行完成
        await waiter
        # 取消time handler并移除回调(因为cancel是异步的)
        if timeout_handle is not None:
        for f in fs:

    # 处理并返回done和pending,其中done代表完成,pending代表执行中。
    done, pending = set(), set()
    for f in fs:
        if f.done():
    return done, pending


import asyncio

async def main():
    f_list = [asyncio.Future() for _ in range(10)]
    done, pending = await asyncio.wait(f_list, timeout=1)
    print(len(done), len(pending))
    print([i for i in pending if i.done()])
    print([i for i in pending if i.done()])

if __name__ == "__main__":
# >>> 0 10
# >>> []
# >>> [<Future finished result=True>]




import asyncio

async def sub(i):
    await asyncio.sleep(i)
    return i

async def main():
    for f in asyncio.as_completed([sub(i) for i in range(5)], timeout=3):
        print(await f)

if __name__ == "__main__":
# >>> 0
# >>> 1
# >>> 2
# >>> Traceback (most recent call last):
#       File "/home/so1n/github/demo_project/demo.py", line 18, in <module>
#         asyncio.run(main())
#       File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
#         return loop.run_until_complete(main)
#       File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
#         return future.result()
#       File "/home/so1n/github/demo_project/demo.py", line 14, in main
#         print(await f)
#       File "/usr/lib/python3.7/asyncio/tasks.py", line 532, in _wait_for_one
#         raise futures.TimeoutError
#     concurrent.futures._base.TimeoutError


def as_completed(fs, *, timeout=None):
    from .queues import Queue  # Import here to avoid circular import problem.
    done = Queue()

    loop = events._get_event_loop()
    todo = {ensure_future(f, loop=loop) for f in set(fs)}
    timeout_handle = None

    def _on_timeout():
        # 超时时调用,需要注意的是,失败时结果为空,所以要推送一个空的数据到队列中
        # 在消费者发现元素为空时抛出错误
        for f in todo:
            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
        todo.clear()  # Can&#39;t do todo.remove(f) in the loop.

    def _on_completion(f):
        # 如果成功,就把Future推送到队列中,消费者可以通过Future获取到结果
        if not todo:
            return  # _on_timeout() was here first.
        if not todo and timeout_handle is not None:

    async def _wait_for_one():
        f = await done.get()
        if f is None:
            # 如果元素为空,则证明已经超时了,要抛出异常
            raise exceptions.TimeoutError
        return f.result()

    for f in todo:
    if todo and timeout is not None:
        timeout_handle = loop.call_later(timeout, _on_timeout)
    # 通过生成器语法返回协程函数,该协程函数可以获取最近完成的可等待对象的结果
    for _ in range(len(todo)):
        yield _wait_for_one()


import asyncio
import random

async def sub():
    # 一半的几率会被set一个值并返回,一半的几率会卡死
    f = asyncio.Future()
    if random.choice([0, 1]) == 0:
    return await f

async def main():
        for f in asyncio.as_completed([sub() for i in range(5)], timeout=1):
            print(await f)
    except asyncio.TimeoutError:
        # 忽略超时
    # 统计未完成的sub任务
    cnt = 0
    for i in asyncio.all_tasks():
        if i._coro.__name__ == sub.__name__:
            cnt += 1
    print("runing task by name sub:", cnt)

if __name__ == "__main__":
# >>> None
# >>> None
# >>> None
# >>> runing task by name sub: 2


