首页  >  文章  >  后端开发  >  使用异步 Python 和 graphlib 处理 DAG

使用异步 Python 和 graphlib 处理 DAG

王林
王林原创
2024-08-27 06:37:09669浏览

Processing DAGs with async Python and graphlib

我最近在Python的无底标准库中发现了一个有趣的模块:graphlib。如果您以前没有使用过它,它是 Python 3.9 中添加的一个小实用程序,并且仅实现一个类:TopologicalSorter。

这个名字是不言自明的——这是一个用于图的拓扑排序的类。但我不认为它最初是为了排序而编写的,因为它有相当神秘但非常有用的API,例如prepare()方法或is_active()。文档中的这个示例暗示了其背后的动机:

topological_sorter = TopologicalSorter()

# Add nodes to 'topological_sorter'...

topological_sorter.prepare()
while topological_sorter.is_active():
    for node in topological_sorter.get_ready():
        # Worker threads or processes take nodes to work on off the
        # 'task_queue' queue.
        task_queue.put(node)

    # When the work for a node is done, workers put the node in
    # 'finalized_tasks_queue' so we can get more nodes to work on.
    # The definition of 'is_active()' guarantees that, at this point, at
    # least one node has been placed on 'task_queue' that hasn't yet
    # been passed to 'done()', so this blocking 'get()' must (eventually)
    # succeed.  After calling 'done()', we loop back to call 'get_ready()'
    # again, so put newly freed nodes on 'task_queue' as soon as
    # logically possible.
    node = finalized_tasks_queue.get()
    topological_sorter.done(node)

所以 graphlib 不仅仅是一个用于对图进行排序的模块,它也是一个按拓扑顺序运行任务图的实用程序,如果您的工作负载的任务依赖于其他任务的结果,这非常有用。图是对这个问题进行建模的好方法,拓扑顺序是确保任务按正确顺序处理的方法。

文档中缺少的一件事是 asyncio 示例,事实证明它非常容易编写。由于使用 asyncio,您不必处理线程安全问题,因此您无需使用队列来同步线程或任何其他额外的复杂性。

我们将定义一个简单的异步节点访问者函数:

async def visit(node: str, sorter: TopologicalSorter):
    print(f"processing node {node}")
    sorter.done(node)

在现实世界中,只要您正在执行 I/O 密集型工作,这可以像您希望的那样复杂,因此可以享受 asyncio 的好处。重要的一点是在函数末尾调用 sorter.done(node) ,让 TopologicalSorter 的实例知道我们已经完成了这个节点,我们可以进入下一个节点。

然后我们将访问函数插入到我们的拓扑有序运行中:

sorter = TopologicalSorter(graph)

sorter.prepare()

while sorter.is_active():
    node_group = sorter.get_ready()

    if not node_group:
        # no nodes are ready yet, so we sleep for a bit
        await asyncio.sleep(0.25)
    else:
        tasks = set()
        for node in node_group:
            task = asyncio.create_task(visit(node, sorter))
            tasks.add(task)
            task.add_done_callback(tasks.discard)

工作脚本的完整源代码可以在这个要点中找到。

graphlib 的一个特殊方面是 TopologicalSorter 接受作为参数的图的格式 - 它与图的典型表示形式相反。例如。如果你有一个像这样的图表 A -> B-> C,通常你会这样表示:

graph = {
  "A": ["B"],
  "B": ["C"],
  "C": [],
}

但是拓扑排序器希望该图的边缘方向反转:

如果提供了可选的图参数,它必须是表示有向无环图的字典,其中键是节点,值是图中该节点的所有前辈的可迭代

所以表示 A 的正确方式 -> B->拓扑排序器的 C 是这样的:

graph = {
  "C": ["B"],
  "B": ["A"],
  "A": [],
}

更多信息和对此的相当激烈的辩论可以在这里找到:https://bugs.python.org/issue46071。

编码愉快!

以上是使用异步 Python 和 graphlib 处理 DAG的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn