首页  >  文章  >  后端开发  >  使用 Asyncio 创建和管理任务

使用 Asyncio 创建和管理任务

王林
王林原创
2024-08-15 06:37:08574浏览

Creating and Managing Tasks with Asyncio

Asyncio 允许开发人员轻松地用 Python 编写异步程序。该模块还提供了多种异步任务的方法,并且由于执行方法多种多样,因此可能会让人困惑于使用哪一种。

在本文中,我们将讨论使用 asyncio 创建和管理任务的多种方法。

什么是异步任务?

在 asyncio 中,任务 是一个包装协程并安排其在事件循环内运行的对象。简而言之,任务是一种与其他任务同时运行协程的方式。创建任务后,事件循环将运行它,并根据需要暂停和恢复它以允许其他任务运行。

创建和管理 Asyncio 任务的方法

现在,我们可以讨论创建和管理任务的方法。首先,要使用 asyncio 在 Python 中创建任务,请使用 asyncio.create_task 方法,该方法采用以下参数:

  • coro(必填):要调度的协程对象。这是您想要异步运行的函数。

  • name(可选):可用于调试或日志记录目的的任务名称。您可以为此参数分配一个字符串。

    • 您还可以稍后使用 Task.set_name(name) 和 Task.get_name() 设置或获取名称。
  • context(可选):在Python 3.11中引入,用于设置任务的上下文变量,从而启用任务本地存储。它类似于线程本地存储,但用于异步任务。

    • 除非您正在处理需要上下文管理的高级场景,否则此参数并不常用。

这是asyncio.create_task的用法示例:

import asyncio

# Define a coroutine
async def greet(name):
    await asyncio.sleep(1)  # Simulate an I/O-bound operation
    print(f"Hello, {name}!")

async def main():
    # Create tasks
    task1 = asyncio.create_task(greet("Alice"), name="GreetingAlice")
    task2 = asyncio.create_task(greet("Bob"), name="GreetingBob")

    # Check task names
    print(f"Task 1 name: {task1.get_name()}")
    print(f"Task 2 name: {task2.get_name()}")

    # Wait for both tasks to complete
    await task1
    await task2

# Run the main function
asyncio.run(main())

创建任务时,可以执行许多方法,例如:

  • .cancel():取消任务。

  • .add_done_callback(cb):添加任务完成时运行的回调函数。

  • .done():检查任务是否完成。

  • .result():任务完成后检索结果。

现在我们了解了如何创建任务,让我们看看如何处理等待一个任务或多个任务。

等待任务完成

在本节中,我们将讨论如何等待一个或多个任务的任务完成。异步编程基于这样一个事实:如果正在运行异步任务,我们可以继续执行程序。有时您可能想要更好地控制流程,并希望确保在安全地继续执行程序之前获得可以使用的结果。

要等待单个任务完成,可以使用 asyncio.wait_for。它需要两个参数:

  • awaitable(必需):这是您想要等待的协程、任务或未来。它可以是任何可以等待的对象,例如协程函数调用、asyncio.Task 或 asyncio.Future。

  • 超时(可选):指定等待 aw 完成的最大秒数。如果达到超时并且等待尚未完成,asyncio.wait_for 会引发 TimeoutError。如果超时设置为 None,该函数将无限期地等待等待完成。

以下是使用此方法的示例:

import asyncio

async def slow_task():
    print("Task started...")
    await asyncio.sleep(5)  # Simulating a long-running task
    print("Task finished!")
    return "Completed"

async def main():
    try:
        # Wait for slow_task to finish within 2 seconds
        result = await asyncio.wait_for(slow_task(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("The task took too long and was canceled!")

asyncio.run(main())

在上面的代码中,slow_task() 是一个协程,通过休眠 5 秒来模拟长时间运行的任务。 asyncio.wait_for(slow_task(), timeout=2) 行等待任务完成,但将等待时间限制为 2 秒,从而导致超时,因为任务需要更长的时间。当超过超时时,会引发 TimeoutError,任务被取消,并通过打印一条指示任务花费太长时间的消息来处理异常。

我们还可以等待多个或一组任务完成。这可以使用 asyncio.wait、asyncio.gather 或 asyncio.as_completed 来实现。让我们探索一下每种方法。

异步等待

asyncio.wait 方法等待一组任务并返回两组:一组用于已完成的任务,一组用于待处理的任务。它需要以下参数:

  • aws(必需,可迭代):您想要等待的协程对象、任务或 future 的集合。

  • 超时(float 或 None,可选):等待的最大秒数。如果未提供,则会无限期等待。

  • return_when(常量,可选):指定 asyncio.wait 何时返回。选项包括:

    • asyncio.ALL_COMPLETED (default): Returns when all tasks are complete.
    • asyncio.FIRST_COMPLETED: Returns when the first task is completed.
    • asyncio.FIRST_EXCEPTION: Returns when the first task raises an exception.

Let's see how it is used in an example.

import asyncio
import random

async def task():
    await asyncio.sleep(random.uniform(1, 3))

async def main():
    tasks = [asyncio.create_task(task()) for _ in range(3)]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print(f"Done tasks: {len(done)}, Pending tasks: {len(pending)}")

asyncio.run(main())

In the code above, asyncio.wait waits for a group of tasks and returns two sets: one with completed tasks and another with those still pending. You can control when it returns, such as after the first task is completed or after all tasks are done. In the example, asyncio.wait returns when the first task is completed, leaving the rest in the pending set.

asyncio.gather

The asyncio.gather method runs multiple awaitable objects concurrently and returns a list of their results, optionally handling exceptions. Let's see the arguments it takes.

  • *aws (required, multiple awaitables): A variable number of awaitable objects (like coroutines, tasks, or futures) to run concurrently.

  • return_exceptions (bool, optional): If True, exceptions in the tasks will be returned as part of the results list instead of being raised.

Let's see how it can be used in an example.

import asyncio
import random

async def task(id):
    await asyncio.sleep(random.uniform(1, 3))
    return f"Task {id} done"

async def main():
    results = await asyncio.gather(task(1), task(2), task(3))
    print(results)

asyncio.run(main())

In the code above, asyncio.gather runs multiple awaitable objects concurrently and returns a list of their results in the order they were passed in. It allows you to handle exceptions gracefully if return_exceptions is set to True. In the example, three tasks are run simultaneously, and their results are returned in a list once all tasks are complete.

asyncio.as_completed

The asyncio.as_completed method is used to return an iterator that yields tasks as they are completed, allowing results to be processed immediately. It takes the following arguments:

  • aws (iterable of awaitables): A collection of coroutine objects, tasks, or futures.

  • timeout (float or None, optional): The maximum number of seconds to wait for tasks to complete. If not provided, it waits indefinitely.

Example

import asyncio
import random

async def task(id):
    await asyncio.sleep(random.uniform(1, 3))
    return f"Task {id} done"

async def main():
    tasks = [task(i) for i in range(3)]
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(result)

asyncio.run(main())

In the example above, asyncio.as_completed returns an iterator that yields results as each task completes, allowing you to process them immediately. This is useful when you want to handle results as soon as they're available, rather than waiting for all tasks to finish. In the example, the tasks are run simultaneously, and their results are printed as each one finishes, in the order they complete.

So to make a summary, you use:

  • asyncio.wait: when you need to handle multiple tasks and want to track which tasks are completed and which are still pending. It's useful when you care about the status of each task separately.

  • asyncio.gather: when you want to run multiple tasks concurrently and need the results in a list, especially when the order of results matters or you need to handle exceptions gracefully.

  • asyncio.as_completed: when you want to process results as soon as each task finishes, rather than waiting for all tasks to complete. It’s useful for handling results in the order they become available.

However, these methods don't take atomic task management with built-in error handling. In the next section, we will see about asyncio.TaskGroup and how to use it to manage a group of tasks.

asyncio.TaskGroup

asyncio.TaskGroup is a context manager introduced in Python 3.11 that simplifies managing multiple tasks as a group. It ensures that if any task within the group fails, all other tasks are canceled, providing a way to handle complex task management with robust error handling. The class has one method called created_task used to create and add tasks to the task group. You pass a coroutine to this method, and it returns an asyncio.Task object that is managed by the group.

Here is an example of how it is used:

import asyncio

async def task1():
    await asyncio.sleep(1)
    return "Task 1 done"

async def task2():
    await asyncio.sleep(2)
    return "Task 2 done"

async def task_with_error():
    await asyncio.sleep(1)
    raise ValueError("An error occurred")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(task1())
            task2 = tg.create_task(task2())
            error_task = tg.create_task(task_with_error())
    except Exception as e:
        print(f"Error: {e}")

    # Print results from completed tasks
    print("Task 1 result:", task1.result())
    print("Task 2 result:", task2.result())

asyncio.run(main())

asyncio.TaskGroup manages multiple tasks and ensures that if any task fails, all other tasks in the group are canceled. In the example, a task with an error causes the entire group to be canceled, and only the results of completed tasks are printed.

Usage for this can be in web scraping. You can use asyncio.TaskGroup to handle multiple concurrent API requests and ensure that if any request fails, all other requests are canceled to avoid incomplete data.

We are at the end of the article and we have learned the multiple methods asyncio provides to create and manage tasks. Here is a summary of the methods:

  • asyncio.wait_for: Wait for a task with a timeout.

  • asyncio.wait: Wait for multiple tasks with flexible completion conditions.

  • asyncio.gather: Aggregate multiple tasks into a single awaitable.

  • asyncio.as_completed:在任务完成时处理任务。

  • asyncio.TaskGroup:管理一组任务,并在失败时自动取消。

结论

异步编程可以改变您在 Python 中处理并发任务的方式,使您的代码更加高效、响应更快。在本文中,我们浏览了 asyncio 提供的各种方法来创建和管理任务,从简单的超时到复杂的任务组。了解何时以及如何使用每种方法(asyncio.wait_for、asyncio.wait、asyncio.gather、asyncio.as_completed 和 asyncio.TaskGroup)将帮助您充分利用异步编程的潜力,使您的应用程序更加健壮和可扩展。

要更深入地了解异步编程和更多实际示例,请在此处浏览我们的详细指南。

如果您喜欢这篇文章,请考虑订阅我的时事通讯,这样您就不会错过未来的更新。

编码愉快!

以上是使用 Asyncio 创建和管理任务的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn