>  기사  >  백엔드 개발  >  비동기 Python 및 graphlib를 사용하여 DAG 처리

비동기 Python 및 graphlib를 사용하여 DAG 처리

王林
王林원래의
2024-08-27 06:37:09667검색

Processing DAGs with async Python and graphlib

저는 최근 Python의 밑바닥 없는 표준 라이브러리에서 흥미로운 모듈인 graphlib를 발견했습니다. 이전에 사용해 본 적이 없다면 Python 3.9에 추가된 작은 유틸리티로 TopologicalSorter라는 하나의 클래스만 구현합니다.

이름은 설명이 필요하지 않습니다. 이는 그래프의 위상 정렬을 위한 클래스입니다. 그러나 원래는 정렬만을 염두에 두고 작성된 것은 아닌 것 같습니다. 왜냐하면 prepare() 메소드 또는 is_active()와 같은 다소 비밀스럽기는 하지만 매우 유용한 API를 포함하고 있기 때문입니다. 문서에 있는 다음 예는 그 뒤에 숨은 동기를 암시합니다.

topological_sorter = TopologicalSorter()

# Add nodes to 'topological_sorter'...

topological_sorter.prepare()
while topological_sorter.is_active():
    for node in topological_sorter.get_ready():
        # Worker threads or processes take nodes to work on off the
        # 'task_queue' queue.
        task_queue.put(node)

    # When the work for a node is done, workers put the node in
    # 'finalized_tasks_queue' so we can get more nodes to work on.
    # The definition of 'is_active()' guarantees that, at this point, at
    # least one node has been placed on 'task_queue' that hasn't yet
    # been passed to 'done()', so this blocking 'get()' must (eventually)
    # succeed.  After calling 'done()', we loop back to call 'get_ready()'
    # again, so put newly freed nodes on 'task_queue' as soon as
    # logically possible.
    node = finalized_tasks_queue.get()
    topological_sorter.done(node)

그래서 graphlib는 단순히 그래프를 정렬하는 모듈이 아니라 작업 그래프를 토폴로지 순서로 실행하는 유틸리티이기도 합니다. 이는 워크로드에 다른 작업의 결과에 따라 달라지는 작업이 있는 경우 유용합니다. 그래프는 이 문제를 모델링하는 좋은 방법이며 토폴로지 순서는 작업이 올바른 순서로 처리되도록 하는 방법입니다.

문서에서 누락된 것 중 하나는 작성하기 매우 쉬운 asyncio 예제입니다. asyncio를 사용하면 스레드 안전성을 처리할 필요가 없으므로 스레드 동기화를 위한 대기열이나 기타 추가적인 정렬 복잡성을 사용하지 않고도 문제를 해결할 수 있습니다.

간단한 비동기 노드 방문자 기능을 정의하겠습니다.

async def visit(node: str, sorter: TopologicalSorter):
    print(f"processing node {node}")
    sorter.done(node)

실제 세계에서는 I/O 바인딩 작업을 수행하는 한 원하는 만큼 복잡할 수 있으므로 asyncio의 이점을 누릴 수 있습니다. 중요한 점은 함수 끝에서 sorter.done(노드)을 호출하여 TopologicalSorter 인스턴스에 이 노드가 완료되었으며 다음 노드로 진행할 수 있음을 알리는 것입니다.

그런 다음 방문 기능을 위상순으로 정렬된 실행에 연결합니다.

sorter = TopologicalSorter(graph)

sorter.prepare()

while sorter.is_active():
    node_group = sorter.get_ready()

    if not node_group:
        # no nodes are ready yet, so we sleep for a bit
        await asyncio.sleep(0.25)
    else:
        tasks = set()
        for node in node_group:
            task = asyncio.create_task(visit(node, sorter))
            tasks.add(task)
            task.add_done_callback(tasks.discard)

작업 스크립트의 전체 소스 코드는 이 요지에서 확인할 수 있습니다.

graphlib의 독특한 측면 중 하나는 TopologicalSorter가 인수로 받아들이는 그래프 형식입니다. 이는 일반적인 그래프 표현과 반대 순서입니다. 예: 다음과 같은 그래프가 있는 경우 A -> B -> C, 일반적으로 다음과 같이 표현합니다.

graph = {
  "A": ["B"],
  "B": ["C"],
  "C": [],
}

그러나 TopologicalSorter는 가장자리 방향이 반대인 이 그래프를 원합니다.

선택적 그래프 인수가 제공되는 경우 이는 키가 노드이고 값이 그래프에서 해당 노드의 모든 선행 항목의 반복 가능한 방향성 비순환 그래프를 나타내는 사전이어야 합니다

그래서 A를 표현하는 올바른 방법은 -> B -> TopologicalSorter의 C는 다음과 같습니다.

graph = {
  "C": ["B"],
  "B": ["A"],
  "A": [],
}

이에 대한 자세한 정보와 다소 열띤 논쟁은 여기에서 확인할 수 있습니다: https://bugs.python.org/issue46071.

즐거운 코딩하세요!

위 내용은 비동기 Python 및 graphlib를 사용하여 DAG 처리의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.