ホームページ >バックエンド開発 >Python チュートリアル >非同期 Python とgraphlib を使用した DAG の処理

非同期 Python とgraphlib を使用した DAG の処理

王林
王林オリジナル
2024-08-27 06:37:09786ブラウズ

Processing DAGs with async Python and graphlib

私は最近、Python の底なしの標準ライブラリ、graphlib で興味深いモジュールを見つけました。これまでに使用したことがない場合は、これは Python 3.9 で追加された小さなユーティリティであり、TopologicalSorter という 1 つのクラスのみを実装しています。

名前は一目瞭然です。これはグラフのトポロジカルソートのためのクラスです。しかし、これは、prepare() メソッドや is_active() など、かなり不可解だが信じられないほど便利な API を備えているため、元々はソートだけを念頭に置いて書かれたとは思えません。ドキュメント内のこの例は、その背後にある動機を示唆しています:

topological_sorter = TopologicalSorter()

# Add nodes to 'topological_sorter'...

topological_sorter.prepare()
while topological_sorter.is_active():
    for node in topological_sorter.get_ready():
        # Worker threads or processes take nodes to work on off the
        # 'task_queue' queue.
        task_queue.put(node)

    # When the work for a node is done, workers put the node in
    # 'finalized_tasks_queue' so we can get more nodes to work on.
    # The definition of 'is_active()' guarantees that, at this point, at
    # least one node has been placed on 'task_queue' that hasn't yet
    # been passed to 'done()', so this blocking 'get()' must (eventually)
    # succeed.  After calling 'done()', we loop back to call 'get_ready()'
    # again, so put newly freed nodes on 'task_queue' as soon as
    # logically possible.
    node = finalized_tasks_queue.get()
    topological_sorter.done(node)

つまり、graphlib はグラフを並べ替えるだけのモジュールではなく、タスクのグラフをトポロジカルな順序で実行するためのユーティリティでもあります。これは、ワークロードに他のタスクの結果に依存するタスクがある場合に便利です。グラフはこの問題をモデル化する優れた方法であり、トポロジー的順序はタスクが正しい順序で処理されることを確認する方法です。

ドキュメントに欠けているものの 1 つは asyncio の例ですが、これは非常に簡単に作成できることがわかりました。 asyncio を使用すると、スレッドセーフに対処する必要がないため、スレッドの同期にキューを使用したり、その他の追加の複雑さを使用したりせずに済みます。

単純な非同期ノードのビジター関数を定義します。

async def visit(node: str, sorter: TopologicalSorter):
    print(f"processing node {node}")
    sorter.done(node)

現実の世界では、I/O バウンドの作業を行っている限り、これはいくらでも複雑になる可能性があるため、asyncio のメリットを享受できます。重要なのは、関数の最後で sorter.done(node) を呼び出して、TopologicalSorter のインスタンスにこのノードの処理が完了し、次のノードに進むことができることを知らせることです。

次に、訪問関数をトポロジー的に順序付けされた実行に接続します。

sorter = TopologicalSorter(graph)

sorter.prepare()

while sorter.is_active():
    node_group = sorter.get_ready()

    if not node_group:
        # no nodes are ready yet, so we sleep for a bit
        await asyncio.sleep(0.25)
    else:
        tasks = set()
        for node in node_group:
            task = asyncio.create_task(visit(node, sorter))
            tasks.add(task)
            task.add_done_callback(tasks.discard)

動作するスクリプトの完全なソース コードは、この要点にあります。

graphlib の独特な側面の 1 つは、TopologicalSorter が引数として受け入れるグラフの形式です。これは、一般的なグラフ表現とは逆の順序になっています。例えば。このようなグラフがある場合、A -> B -> C、通常は次のように表します:

graph = {
  "A": ["B"],
  "B": ["C"],
  "C": [],
}

しかし、TopologicalSorter はこのグラフのエッジ方向を逆にしたいと考えています:

オプションのグラフ引数が指定される場合、それはキーがノードであり、値がグラフ内のそのノードのすべての先行ノードの反復可能である有向非巡回グラフを表す辞書である必要があります

A を表す正しい方法 -> B -> TopologicalSorter の C は次のとおりです:

graph = {
  "C": ["B"],
  "B": ["A"],
  "A": [],
}

これに関する詳しい情報と白熱した議論は、こちらでご覧いただけます: https://bugs.python.org/issue46071.

コーディングを楽しんでください!

以上が非同期 Python とgraphlib を使用した DAG の処理の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。