首页 >后端开发 >Python教程 >python 并发.futures

python 并发.futures

Linda Hamilton
Linda Hamilton原创
2024-11-04 04:13:29476浏览

python concurrent.futures

未来

Future 是一个容器,可以保存计算结果或计算期间发生的错误。创建 future 时,它​​以 PENDING 状态开始。该库不打算手动创建此对象,除非出于测试目的。

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

PENDING 状态表示用户请求的计算已注册到线程池中并放入队列中,但尚未被任何线程拾取执行。一旦空闲线程从队列中获取任务(回调),Future 就会转换为 RUNNING 状态。 Future 只能在处于 PENDING 状态时被取消。因此,在 PENDING 和 RUNNING 状态之间存在一个时间窗口,在此期间可以取消请求的计算。

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

线程池中请求的操作可以完成计算值或导致错误。无论结果如何,未来都会过渡到 FINISHED 状态。然后结果或错误将存储在相应的字段中。

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

要检索计算结果,请使用 result 方法。如果计算尚未完成,此方法将阻塞当前线程(从中调用结果),直到计算完成或等待超时。

如果计算成功完成且没有错误,则 result 方法返回计算值。

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_result('foo')
threading.Thread(target=target).start()
assert(f.result() == 'foo')

如果计算过程中发生异常,结果将引发该异常。

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_exception(NameError)
threading.Thread(target=target).start()
try:
    f.result()
    raise Exception()
except NameError:
    assert(True)

如果方法在等待时超时,则会引发 TimeoutError。

import concurrent.futures as futures

f = futures.Future()
try:
    f.result(1)
    raise Exception()
except TimeoutError:
    assert(f._result is None)
    assert(f._exception is None)

尝试获取已取消的计算结果将引发 CancelledError。

import concurrent.futures as futures

f = futures.Future()
assert(f.cancel())
try:
    f.result()
    raise Exception()
except futures.CancelledError:
    assert(True)

等待策略

在开发过程中,需要在线程池上运行N次计算并等待其完成是很常见的。为了实现这一点,该库提供了等待函数。有几种等待策略:FIRST_COMPLETED、FIRST_EXCEPTION、ALL_COMPLETED。

所有等待策略的共同点是,如果传递给 wait 方法的 future 已经完成,则无论选择何种策略,都会返回传递的 future 的集合。无论它们是如何完成的,无论是有错误、结果还是被取消,都无关紧要。

import concurrent.futures as futures

def test(return_when):
    f1, f2, f3 = futures.Future(), futures.Future(), futures.Future()
    f1.cancel()
    f1.set_running_or_notify_cancel() # required
    f2.set_result('foo')
    f3.set_exception(NameError)

    r = futures.wait([f1, f2, f3], return_when=return_when)
    assert(len(r.done) == 3)
    assert(len(r.not_done) == 0)

for return_when in [futures.ALL_COMPLETED, futures.FIRST_EXCEPTION, futures.FIRST_COMPLETED]:
    test(return_when)

ALL_COMPLETED 策略

ALL_COMPLETED 策略保证等待所有传递的 future 完成,或者在超时后退出,并收集截至该时刻完成的 future,这可能是不完整的。

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

FIRST_COMPLETED

FIRST_COMPLETED 策略保证返回至少有一个已完成的 future 的集合,或者在超时的情况下返回空集合。 此策略并不意味着返回的集合不能包含多个元素

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

FIRST_EXCEPTION

如果其中一个计算完成时出现错误,FIRST_EXCEPTION 策略会中断等待。如果没有发生异常,则行为与 ALL_COMPLETED 未来相同。

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

线程池执行器

该对象负责创建线程池。与该对象交互的主要方法是 Submit 方法。它允许在线程池中注册计算。作为响应,返回一个 Future 对象,用于监控计算状态并获取最终结果。

属性

  • 仅根据需要创建新线程:
    • 如果请求计算时至少有一个空闲线程,则不会创建新线程
    • 如果请求计算时没有空闲线程,则在未达到 maxWorkers 限制的情况下创建一个新线程。
    • 如果没有空闲线程并且已达到 maxWorkers 限制,则计算将放入队列中,并由下一个可用线程进行
  • 默认情况下为计算需求分配的最大线程数等于逻辑处理器核心数
  • 线程一旦创建,即使在低负载的情况下也不会被销毁

以上是python 并发.futures的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn