>백엔드 개발 >파이썬 튜토리얼 >파이썬 동시.미래

파이썬 동시.미래

Linda Hamilton
Linda Hamilton원래의
2024-11-04 04:13:29463검색

python concurrent.futures

미래

Future는 계산 결과나 계산 중에 발생한 오류를 담을 수 있는 컨테이너입니다. future가 생성되면 PENDING 상태로 시작됩니다. 라이브러리에서는 테스트 목적을 제외하고는 이 개체를 수동으로 생성하지 않습니다.

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

PENDING 상태는 사용자가 요청한 계산이 스레드 풀에 등록되어 대기열에 배치되었지만 아직 실행을 위해 어떤 스레드에서도 선택되지 않았음을 나타냅니다. 자유 스레드가 대기열에서 작업(콜백)을 가져오면 미래는 RUNNING 상태로 전환됩니다. 미래는 PENDING 상태인 동안에만 취소할 수 있습니다. 따라서 PENDING 상태와 RUNNING 상태 사이에 요청된 계산을 취소할 수 있는 시간 간격이 있습니다.

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

스레드 풀에서 요청된 작업은 계산된 값으로 완료되거나 오류가 발생할 수 있습니다. 결과에 관계없이 미래는 FINISHED 상태로 전환됩니다. 결과 또는 오류는 해당 필드에 저장됩니다.

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

계산 결과를 검색하려면 결과 메소드가 사용됩니다. 계산이 아직 완료되지 않은 경우 이 메서드는 계산이 완료되거나 대기 시간이 초과될 때까지 현재 스레드(결과가 호출된 스레드)를 차단합니다.

계산이 오류 없이 성공적으로 완료되면 결과 메소드는 계산된 값을 반환합니다.

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_result('foo')
threading.Thread(target=target).start()
assert(f.result() == 'foo')

계산 중에 예외가 발생하면 결과에서 해당 예외가 발생합니다.

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_exception(NameError)
threading.Thread(target=target).start()
try:
    f.result()
    raise Exception()
except NameError:
    assert(True)

대기하는 동안 메소드가 시간 초과되면 TimeoutError가 발생합니다.

import concurrent.futures as futures

f = futures.Future()
try:
    f.result(1)
    raise Exception()
except TimeoutError:
    assert(f._result is None)
    assert(f._exception is None)

취소된 계산 결과를 얻으려고 하면 CancelledError가 발생합니다.

import concurrent.futures as futures

f = futures.Future()
assert(f.cancel())
try:
    f.result()
    raise Exception()
except futures.CancelledError:
    assert(True)

대기 전략

개발 과정에서는 스레드 풀에서 N개의 계산을 실행하고 완료될 때까지 기다려야 하는 경우가 많습니다. 이를 위해 라이브러리는 대기 기능을 제공합니다. FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED 등 여러 가지 대기 전략이 있습니다.

모든 대기 전략의 공통점은 wait 메서드에 전달된 future가 이미 완료된 경우 선택한 전략에 관계없이 전달된 future 컬렉션이 반환된다는 것입니다. 오류, 결과, 취소 등 어떻게 완료되었는지는 중요하지 않습니다.

import concurrent.futures as futures

def test(return_when):
    f1, f2, f3 = futures.Future(), futures.Future(), futures.Future()
    f1.cancel()
    f1.set_running_or_notify_cancel() # required
    f2.set_result('foo')
    f3.set_exception(NameError)

    r = futures.wait([f1, f2, f3], return_when=return_when)
    assert(len(r.done) == 3)
    assert(len(r.not_done) == 0)

for return_when in [futures.ALL_COMPLETED, futures.FIRST_EXCEPTION, futures.FIRST_COMPLETED]:
    test(return_when)

ALL_COMPLETED 전략

ALL_COMPLETED 전략은 전달된 모든 Future가 완료될 때까지 기다리거나 시간 초과 후 해당 순간까지 완료된 Future 컬렉션을 종료하는 것을 보장하며 이는 불완전할 수 있습니다.

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

FIRST_COMPLETED

FIRST_COMPLETED 전략은 시간 초과 시 하나 이상의 완료된 future 또는 빈 컬렉션이 있는 컬렉션 반환을 보장합니다. 이 전략은 반환된 컬렉션에 여러 요소가 포함될 수 없다는 의미는 아닙니다.

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

FIRST_EXCEPTION

FIRST_EXCEPTION 전략은 계산 중 하나가 오류로 완료되면 대기를 중단합니다. 예외가 발생하지 않으면 동작은 ALL_COMPLETED future와 동일합니다.

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

ThreadPoolExecutor

객체는 스레드 풀 생성을 담당합니다. 이 개체와 상호 작용하는 주요 방법은 제출 방법입니다. 스레드 풀에 계산을 등록할 수 있습니다. 이에 대한 응답으로 계산 상태를 모니터링하고 최종 결과를 얻는 데 사용되는 Future 개체가 반환됩니다.

속성

  • 새 스레드는 필요한 경우에만 생성됩니다.
    • 계산 요청 시 여유 스레드가 하나 이상 있으면 새 스레드가 생성되지 않습니다
    • 계산 요청 시 사용 가능한 스레드가 없으면 maxWorkers 제한에 도달하지 않은 경우 새 스레드가 생성됩니다.
    • 사용 가능한 스레드가 없고 maxWorkers 제한에 도달한 경우 계산은 대기열에 배치되고 사용 가능한 다음 스레드에서 수행됩니다.
  • 기본적으로 계산 요구에 할당되는 최대 스레드 수는 논리 프로세서 코어 수와 같습니다
  • 한 번 생성된 스레드는 부하가 낮아도 파괴되지 않습니다

위 내용은 파이썬 동시.미래의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.