Home > Article > Backend Development > How to solve the thread pool problem of Python's ThreadPoolExecutor
Python
already has the threading
module, why do we need a thread pool? What is a pool?
Taking the crawler as an example, you need to control the number of threads crawled at the same time. In the example, 20 threads are created, and only 3 threads are allowed to run at the same time, but all 20 threads need to be created and destroyed. Creation requires system resources. Is there a better solution?
In fact, only three threads are needed. Each thread is assigned a task, and the remaining tasks are queued to wait. When a thread completes the task, the queued tasks can be arranged for this thread to continue execution. .
This is the idea of a thread pool (of course it is not that simple), but it is difficult to write a thread pool perfectly by yourself. You also need to consider thread synchronization in complex situations, and deadlocks can easily occur.
Starting from Python3.2
, the standard library provides us with the concurrent.futures
module, which provides ThreadPoolExecutor
and ProcessPoolExecutor
The two classes realize further abstraction of threading
and multiprocessing
(the main focus here is the thread pool). They can not only help us automatically schedule threads, but also do:
The main thread can obtain the status of a certain thread (or task) and the return value.
When a thread completes, the main thread can know immediately.
Make the coding interfaces of multi-threads and multi-processes consistent.
from concurrent.futures import ThreadPoolExecutor import time # 参数times用来模拟网络请求的时间 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞 task1 = executor.submit(get_html, (3)) task2 = executor.submit(get_html, (2)) # done方法用于判定某个任务是否完成 print(task1.done()) # cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功 print(task2.cancel()) time.sleep(4) print(task1.done()) # result方法可以获取task的执行结果 print(task1.result()) # 执行结果 # False # 表明task1未执行完成 # False # 表明task2取消失败,因为已经放入了线程池中 # get page 2s finished # get page 3s finished # True # 由于在get page 3s finished之后才打印,所以此时task1必然完成了 # 3 # 得到task1的任务返回值
When ThreadPoolExecutor constructs an instance, pass in the max_workers parameter to set the maximum number of threads that can run simultaneously in the thread pool .
Use the submit function to submit the task (function name and parameters) that the thread needs to perform to the thread pool, and return the handle of the task (similar to files and drawings). Note that submit() is not blocking, but Return immediately.
Through the task handle returned by the submit function, you can use the done() method to determine whether the task has ended. As can be seen from the above example, since the task has a delay of 2s, it is judged immediately after task1 is submitted that task1 has not been completed, but after a delay of 4s, it is judged that task1 is completed.
Use the cancel() method to cancel the submitted task. If the task is already running in the thread pool, it cannot be canceled. In this example, the thread pool size is set to 2 and the task is already running, so the cancellation fails. If you change the size of the thread pool to 1, then task1 is submitted first, and task2 is still waiting in the queue. At this time, it can be successfully canceled.
Use the result() method to obtain the return value of the task. Looking at the internal code, we found that this method is blocking.
Although the above provides a method to determine whether the task is completed, it cannot always be determined in the main thread.
Sometimes when we know that a certain task is over, we get the result instead of always judging whether each task is over.
This is how you can use the as_completed
method to retrieve the results of all tasks at once.
from concurrent.futures import ThreadPoolExecutor, as_completed import time # 参数times用来模拟网络请求的时间 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] # 并不是真的url all_task = [executor.submit(get_html, (url)) for url in urls] for future in as_completed(all_task): data = future.result() print("in main: get page {}s success".format(data)) # 执行结果 # get page 2s finished # in main: get page 2s success # get page 3s finished # in main: get page 3s success # get page 4s finished # in main: get page 4s success
as_completed()
The method is a generator. When no task is completed, it will block. When a certain task is completed, yield
will Task, you can execute the statement below the for loop, and then continue to block until all tasks are completed.
It can also be seen from the results that the task completed first will notify the main thread first.
In addition to the above as_completed
method, you can also use the executor.map
method, but there is a little difference.
from concurrent.futures import ThreadPoolExecutor import time # 参数times用来模拟网络请求的时间 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] # 并不是真的url for data in executor.map(get_html, urls): print("in main: get page {}s success".format(data)) # 执行结果 # get page 2s finished # get page 3s finished # in main: get page 3s success # in main: get page 2s success # get page 4s finished # in main: get page 4s success
Use the map
method without using the submit
method in advance. The map
method is the same as python
in the standard library. ##mapThe meaning is the same, each element in the sequence is executed with the same function.
get_html function for each element of
urls and allocate each thread pool. You can see that the execution result is different from the result of the
as_completed method above.
The output order is the same as the order of the urlslist, even if it is 2s If the task is executed and completed first, the 3s task will be printed first, and then the 2s task will be printed. The wait
method allows the main thread to block until the set requirements are met. <pre class="brush:py;">from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
import time
# 参数times用来模拟网络请求的时间
def get_html(times):
time.sleep(times)
print("get page {}s finished".format(times))
return times
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 并不是真的url
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=ALL_COMPLETED)
print("main")
# 执行结果
# get page 2s finished
# get page 3s finished
# get page 4s finished
# main</pre>
The method receives 3 parameters, the waiting task sequence, timeout time and waiting conditions. Waiting condition
The default is ALL_COMPLETED
, indicating that you want to wait for all tasks to end. You can see in the running results that all tasks are indeed completed, and the main thread prints out
. The waiting condition can also be set to
, which means that the wait will stop when the first task is completed. Source code analysis
The After the thread pool can also be called the return container of task, which will store the results and status of task. Then The following is a brief introduction to part of the code of The meaning of this method is easy to understand, mainly to create the specified number of threads. However, the implementation is a bit difficult to understand. For example, weakref.ref in the thread execution function involves concepts such as weak references, which will be understood later. This is the function entry specified when the thread pool creates a thread, mainly from the queue in sequence Take out task and execute it, but the first parameter of the function is not very clear yet. Leave that for later. future
in the module means a future object, which can be understood as a one in the future Completed operation
, which is the basis of asynchronous programming. submit()
, this future
object is returned. The task is not completed when it is returned, but it will be completed in the future. ThreadPoolExecutor
How does this object operate internally? ThreadPoolExecutor
: 1.init method
init
The main important thing in the method is the task Queues and thread collections are needed in other methods. 2. There are two important objects in the submit method
submit
, _base.Future()
And _WorkItem()
objects, the _WorkItem()
object is responsible for running tasks and setting the future
object, and finally the future
object will be Return, you can see that the entire process returns immediately without blocking. 3.adjust_thread_count method
4._WorkItem object
_WorkItem
The responsibility of the object is to perform tasks and set results. The main complexity here is self.future.set_result(result)
. 5. Thread execution function--_worker
The above is the detailed content of How to solve the thread pool problem of Python's ThreadPoolExecutor. For more information, please follow other related articles on the PHP Chinese website!