Home  >  Article  >  Backend Development  >  python concurrent.futures

python concurrent.futures

Linda Hamilton
Linda HamiltonOriginal
2024-11-04 04:13:29358browse

python concurrent.futures

Future

Future is a container that can hold either the result of a computation or an error that occurred during that computation. When a future is created, it starts in a PENDING state. The library does not intend for this object to be created manually, except perhaps for testing purposes.

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

The PENDING status indicates that a computation requested by the user has been registered in the thread pool and placed in a queue, but it has not yet been picked up by any thread for execution. Once a free thread takes the task (callback) from the queue, the future transitions to the RUNNING state. A future can only be canceled while it is in the PENDING state. Therefore, there is a window of time between the PENDING and RUNNING states during which the requested computation can be canceled.

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

A requested operation in the thread pool can either complete with a computed value or result in an error. Regardless of the outcome, the future transitions to the FINISHED state. The result or error is then stored in the corresponding fields.

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

To retrieve the result of a computation, the result method is used. If the computation is not yet complete, this method will block the current thread (from which result was called) until the computation finishes or the wait times out.

If the computation completes successfully without errors, the result method returns the computed value.

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_result('foo')
threading.Thread(target=target).start()
assert(f.result() == 'foo')

If an exception occurred during the computation, result will raise that exception.

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_exception(NameError)
threading.Thread(target=target).start()
try:
    f.result()
    raise Exception()
except NameError:
    assert(True)

If the method times out while waiting, a TimeoutError is raised.

import concurrent.futures as futures

f = futures.Future()
try:
    f.result(1)
    raise Exception()
except TimeoutError:
    assert(f._result is None)
    assert(f._exception is None)

The attempt to obtain the result of a computation that was canceled will raise a CancelledError.

import concurrent.futures as futures

f = futures.Future()
assert(f.cancel())
try:
    f.result()
    raise Exception()
except futures.CancelledError:
    assert(True)

Waiting strategy

In the development process, it is quite common to need to run N computations on a thread pool and wait for their completion. To achieve this, the library provides wait function. There are several waiting strategies: FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED.

Common to all waiting strategies is that if the futures passed to the wait method are already completed, the collection of the passed futures is returned regardless of the chosen strategy. It does not matter how they were completed whether with an error, a result, or if they were canceled.

import concurrent.futures as futures

def test(return_when):
    f1, f2, f3 = futures.Future(), futures.Future(), futures.Future()
    f1.cancel()
    f1.set_running_or_notify_cancel() # required
    f2.set_result('foo')
    f3.set_exception(NameError)

    r = futures.wait([f1, f2, f3], return_when=return_when)
    assert(len(r.done) == 3)
    assert(len(r.not_done) == 0)

for return_when in [futures.ALL_COMPLETED, futures.FIRST_EXCEPTION, futures.FIRST_COMPLETED]:
    test(return_when)

ALL_COMPLETED strategy

The ALL_COMPLETED strategy guarantees waiting for the completion of all the passed futures, or exiting after a timeout with a collection of the futures completed up to that moment, which may be incomplete.

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

FIRST_COMPLETED

The FIRST_COMPLETED strategy guarantees the return of a collection with at least one completed future or an empty collection in case of a timeout. This strategy DOES NOT imply that the returned collection cannot contain multiple elements.

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

FIRST_EXCEPTION

The FIRST_EXCEPTION strategy interrupts the wait if one of the computations finishes with an error. If no exceptions occur, the behavior is identical to the ALL_COMPLETED future.

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

ThreadPoolExecutor

The object is responsible for creating a thread pool. The main method for interacting with this object is the Submit method. It allows to register a computation in the thread pool. In response, a Future object is returned, which is used to monitor the computation's status and obtain the final result.

Properties

  • New threads are created ONLY as needed:
    • If there is at least one free thread when a computation is requested, no new thread is created
    • If there are no free threads when a computation is requested, a new thread is created, provided that the maxWorkers limit has not been reached.
    • If there are no free threads and the maxWorkers limit has been reached, the computation is placed in a queue and will be taken by the next available thread
  • The maximum number of threads allocated for computational needs by default equals the number of logical processor cores
  • Once created, a thread is not destroyed, even in case of low load

The above is the detailed content of python concurrent.futures. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn