Home  >  Article  >  Backend Development  >  Processing DAGs with async Python and graphlib

Processing DAGs with async Python and graphlib

王林
王林Original
2024-08-27 06:37:09527browse

Processing DAGs with async Python and graphlib

I recently came across an interesting module in Python's bottomless standard library: graphlib. If you haven't worked with it before, it's a small utility that was added in Python 3.9 and only implements one class: TopologicalSorter.

The name is self-explanatory -- this is a class for topological sorting of a graph. But I don't think it was originally written with just sorting in mind, since it has rather cryptic, but incredibly useful API, such as prepare() method or is_active(). This example in the documentation hints on the motivation behind it:

topological_sorter = TopologicalSorter()

# Add nodes to 'topological_sorter'...

topological_sorter.prepare()
while topological_sorter.is_active():
    for node in topological_sorter.get_ready():
        # Worker threads or processes take nodes to work on off the
        # 'task_queue' queue.
        task_queue.put(node)

    # When the work for a node is done, workers put the node in
    # 'finalized_tasks_queue' so we can get more nodes to work on.
    # The definition of 'is_active()' guarantees that, at this point, at
    # least one node has been placed on 'task_queue' that hasn't yet
    # been passed to 'done()', so this blocking 'get()' must (eventually)
    # succeed.  After calling 'done()', we loop back to call 'get_ready()'
    # again, so put newly freed nodes on 'task_queue' as soon as
    # logically possible.
    node = finalized_tasks_queue.get()
    topological_sorter.done(node)

So graphlib is not a module just for sorting graphs, it's also a utility for running graphs of tasks in topological order, which is useful if your workloads have tasks depending on results of other tasks. Graphs are a great way to model this problem, and topological order is how you make sure tasks are processed in the right order.

One thing that is missing from the docs is asyncio example, which turns out to be quite easy to write. Since with asyncio you don't have to deal with thread-safety, you can get by without using queue for synchronizing threads or any other additional complexity of the sort.

We'll define a simplistic async node visitor function:

async def visit(node: str, sorter: TopologicalSorter):
    print(f"processing node {node}")
    sorter.done(node)

In the real world this can be as complex as you'd like, as long as you're doing I/O bound work so reap the benefits of asyncio. The important bit is to call the sorter.done(node) in the end of the function to let the instance of TopologicalSorter know we're done with this node and we can progress onto the next.

Then we plug the visit function into our topologically ordered run:

sorter = TopologicalSorter(graph)

sorter.prepare()

while sorter.is_active():
    node_group = sorter.get_ready()

    if not node_group:
        # no nodes are ready yet, so we sleep for a bit
        await asyncio.sleep(0.25)
    else:
        tasks = set()
        for node in node_group:
            task = asyncio.create_task(visit(node, sorter))
            tasks.add(task)
            task.add_done_callback(tasks.discard)

Full source code of a working script can be found in this gist.

One peculiar aspect of graphlib is the format of the graph the TopologicalSorter accepts as an argument -- it is in reverse order from your typical representation of a graph. E.g. if you have a graph like this A -> B -> C, normally you'd to represent it like this:

graph = {
  "A": ["B"],
  "B": ["C"],
  "C": [],
}

but the TopologicalSorter wants this graph in the with edge direction reversed:

If the optional graph argument is provided it must be a dictionary representing a directed acyclic graph where the keys are nodes and the values are iterables of all predecessors of that node in the graph

So the right way to represent A -> B -> C for the TopologicalSorter is this:

graph = {
  "C": ["B"],
  "B": ["A"],
  "A": [],
}

More info and a rather heated debate on this can be found here: https://bugs.python.org/issue46071.

Happy coding!

The above is the detailed content of Processing DAGs with async Python and graphlib. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn