首頁 >後端開發 >Python教學 >使用非同步 Python 和 graphlib 處理 DAG

使用非同步 Python 和 graphlib 處理 DAG

王林
王林原創
2024-08-27 06:37:09742瀏覽

Processing DAGs with async Python and graphlib

我最近在Python的無底標準庫中發現了一個有趣的模組:graphlib。如果您以前沒有使用過它,它是 Python 3.9 中添加的小實用程序,並且僅實作一個類別:TopologicalSorter。

這個名字是不言自明的-這是一個用於圖的拓樸排序的類別。但我不認為它最初是為了排序而寫的,因為它有相當神秘但非常有用的API,例如prepare()方法或is_active()。文檔中的這個範例暗示了背後的動機:

topological_sorter = TopologicalSorter()

# Add nodes to 'topological_sorter'...

topological_sorter.prepare()
while topological_sorter.is_active():
    for node in topological_sorter.get_ready():
        # Worker threads or processes take nodes to work on off the
        # 'task_queue' queue.
        task_queue.put(node)

    # When the work for a node is done, workers put the node in
    # 'finalized_tasks_queue' so we can get more nodes to work on.
    # The definition of 'is_active()' guarantees that, at this point, at
    # least one node has been placed on 'task_queue' that hasn't yet
    # been passed to 'done()', so this blocking 'get()' must (eventually)
    # succeed.  After calling 'done()', we loop back to call 'get_ready()'
    # again, so put newly freed nodes on 'task_queue' as soon as
    # logically possible.
    node = finalized_tasks_queue.get()
    topological_sorter.done(node)

所以 graphlib 不僅僅是一個用於對圖進行排序的模組,它還是一個按拓撲順序運行任務圖的實用程序,如果您的工作負載的任務依賴於其他任務的結果,這將非常有用。圖是對這個問題進行建模的好方法,拓撲順序是確保任務按正確順序處理的方法。

文件中缺少的一件事是 asyncio 範例,事實證明它非常容易編寫。由於使用 asyncio,您不必處理線程安全性問題,因此您無需使用佇列來同步線程或任何其他額外的複雜性。

我們將定義一個簡單的非同步節點訪客函數:

async def visit(node: str, sorter: TopologicalSorter):
    print(f"processing node {node}")
    sorter.done(node)

在現實世界中,只要您正在執行 I/O 密集型工作,這可以像您希望的那樣複雜,因此可以享受 asyncio 的好處。重要的一點是在函數末尾呼叫 sorter.done(node) ,讓 TopologicalSorter 的實例知道我們已經完成了這個節點,我們可以進入下一個節點。

然後我們將存取函數插入到我們的拓撲有序運行中:

sorter = TopologicalSorter(graph)

sorter.prepare()

while sorter.is_active():
    node_group = sorter.get_ready()

    if not node_group:
        # no nodes are ready yet, so we sleep for a bit
        await asyncio.sleep(0.25)
    else:
        tasks = set()
        for node in node_group:
            task = asyncio.create_task(visit(node, sorter))
            tasks.add(task)
            task.add_done_callback(tasks.discard)

工作腳本的完整原始程式碼可以在這個要點中找到。

graphlib 的一個特殊方面是 TopologicalSorter 接受作為參數的圖的格式 - 它與圖的典型表示形式相反。例如。如果你有一個像這樣的圖表 A -> B-> C,通常你會這樣表示:

graph = {
  "A": ["B"],
  "B": ["C"],
  "C": [],
}

但是拓樸排序器希望該圖的邊緣方向反轉:

如果提供了可選的圖參數,它必須是表示有向無環圖的字典,其中鍵是節點,值是圖中該節點的所有前輩的可迭代

所以表示 A 的正確方式 -> B->拓樸排序器的 C 是這樣的:

graph = {
  "C": ["B"],
  "B": ["A"],
  "A": [],
}

更多資訊和對此的相當激烈的辯論可以在這裡找到:https://bugs.python.org/issue46071。

編碼愉快!

以上是使用非同步 Python 和 graphlib 處理 DAG的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn