Heim > Artikel > Backend-Entwicklung > Verarbeitung von DAGs mit asynchronem Python und Graphlib
Ich bin kürzlich auf ein interessantes Modul in der grenzenlosen Standardbibliothek von Python gestoßen: graphlib. Falls Sie noch nie damit gearbeitet haben: Es handelt sich um ein kleines Dienstprogramm, das in Python 3.9 hinzugefügt wurde und nur eine Klasse implementiert: TopologicalSorter.
Der Name ist selbsterklärend – es handelt sich um eine Klasse zur topologischen Sortierung eines Graphen. Aber ich glaube nicht, dass es ursprünglich nur im Hinblick auf das Sortieren geschrieben wurde, da es über eine ziemlich kryptische, aber unglaublich nützliche API verfügt, wie zum Beispiel die Methode Prepare() oder is_active(). Dieses Beispiel in der Dokumentation gibt Hinweise auf die Motivation dahinter:
topological_sorter = TopologicalSorter() # Add nodes to 'topological_sorter'... topological_sorter.prepare() while topological_sorter.is_active(): for node in topological_sorter.get_ready(): # Worker threads or processes take nodes to work on off the # 'task_queue' queue. task_queue.put(node) # When the work for a node is done, workers put the node in # 'finalized_tasks_queue' so we can get more nodes to work on. # The definition of 'is_active()' guarantees that, at this point, at # least one node has been placed on 'task_queue' that hasn't yet # been passed to 'done()', so this blocking 'get()' must (eventually) # succeed. After calling 'done()', we loop back to call 'get_ready()' # again, so put newly freed nodes on 'task_queue' as soon as # logically possible. node = finalized_tasks_queue.get() topological_sorter.done(node)
Graphlib ist also nicht nur ein Modul zum Sortieren von Diagrammen, sondern auch ein Dienstprogramm zum Ausführen von Diagrammen von Aufgaben in topologischer Reihenfolge, was nützlich ist, wenn Ihre Workloads Aufgaben haben, die von den Ergebnissen anderer Aufgaben abhängen. Diagramme sind eine großartige Möglichkeit, dieses Problem zu modellieren, und durch die topologische Reihenfolge stellen Sie sicher, dass Aufgaben in der richtigen Reihenfolge verarbeitet werden.
Eine Sache, die in den Dokumenten fehlt, ist ein Asyncio-Beispiel, das sich als recht einfach zu schreiben herausstellt. Da Sie sich bei Asyncio nicht um die Thread-Sicherheit kümmern müssen, können Sie auf die Verwendung einer Warteschlange zum Synchronisieren von Threads oder eine andere zusätzliche Komplexität dieser Art verzichten.
Wir definieren eine vereinfachte asynchrone Knotenbesucherfunktion:
async def visit(node: str, sorter: TopologicalSorter): print(f"processing node {node}") sorter.done(node)
In der realen Welt kann dies so komplex sein, wie Sie möchten, solange Sie I/O-gebundene Arbeit erledigen, also profitieren Sie von den Vorteilen von Asyncio. Der wichtige Teil besteht darin, sorter.done(node) am Ende der Funktion aufzurufen, um der Instanz von TopologicalSorter mitzuteilen, dass wir mit diesem Knoten fertig sind und mit dem nächsten fortfahren können.
Dann fügen wir die Besuchsfunktion in unseren topologisch geordneten Lauf ein:
sorter = TopologicalSorter(graph) sorter.prepare() while sorter.is_active(): node_group = sorter.get_ready() if not node_group: # no nodes are ready yet, so we sleep for a bit await asyncio.sleep(0.25) else: tasks = set() for node in node_group: task = asyncio.create_task(visit(node, sorter)) tasks.add(task) task.add_done_callback(tasks.discard)
Der vollständige Quellcode eines funktionierenden Skripts finden Sie in diesem Gist.
Ein besonderer Aspekt von graphlib ist das Format des Graphen, den TopologicalSorter als Argument akzeptiert – es ist in umgekehrter Reihenfolge zu Ihrer typischen Darstellung eines Graphen. Z.B. Wenn Sie ein Diagramm wie dieses haben: A -> B -> C, normalerweise würden Sie es so darstellen:
graph = { "A": ["B"], "B": ["C"], "C": [], }
aber der TopologicalSorter möchte diesen Graphen mit umgekehrter Kantenrichtung haben:
Wenn das optionale Diagrammargument angegeben wird, muss es sich um ein Wörterbuch handeln, das einen gerichteten azyklischen Graphen darstellt, bei dem die Schlüssel Knoten und die Werte iterierbare Werte aller Vorgänger dieses Knotens im Diagramm sind
Die richtige Art, A darzustellen -> B -> C für den TopologicalSorter lautet:
graph = { "C": ["B"], "B": ["A"], "A": [], }
Weitere Informationen und eine ziemlich hitzige Debatte dazu finden Sie hier: https://bugs.python.org/issue46071.
Viel Spaß beim Codieren!
Das obige ist der detaillierte Inhalt vonVerarbeitung von DAGs mit asynchronem Python und Graphlib. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!