Home >Common Problem >Python thread pool and its principles and uses
The cost of the system starting a new thread is relatively high because it involves interaction with the operating system. In this case, using a thread pool can greatly improve performance, especially when the program needs to create a large number of threads with short lifetimes, you should consider using the thread pool. The thread pool creates a large number of idle threads when the system starts. As long as the program submits a function to the thread pool, the thread pool will start an idle thread to execute it. When the function is executed, the thread will not die, but will return to the thread pool again and become idle waiting to execute the next function.
#The cost of the system starting a new thread is relatively high because it involves interaction with the operating system. In this case, using a thread pool can greatly improve performance, especially when the program needs to create a large number of threads with short lifetimes, you should consider using the thread pool.
The thread pool creates a large number of idle threads when the system starts. As long as the program submits a function to the thread pool, the thread pool will start an idle thread to execute it. When the execution of the function ends, the thread will not die, but will return to the thread pool and become idle, waiting for the execution of the next function.
In addition, using the thread pool can effectively control the number of concurrent threads in the system. When the system contains a large number of concurrent threads, it will cause a sharp decline in system performance and even cause Python to fail. The interpreter crashes, and the thread pool's maximum number of threads parameter can control the number of concurrent threads in the system not to exceed this number.
Use of thread pool
The base class of the thread pool is Executor in the concurrent.futures module. Executor provides two subclasses, namely ThreadPoolExecutor and ProcessPoolExecutor, where ThreadPoolExecutor is used to create a thread pool, and ProcessPoolExecutor is used to create a process pool.
If you use a thread pool/process pool to manage concurrent programming, then just submit the corresponding task function to the thread pool/process pool, and the thread pool/process pool will take care of the rest.
Exectuor provides the following common methods:
submit(fn, *args, **kwargs): Submit the fn function to the thread pool. *args represents the parameters passed to the fn function, *kwargs Indicates that parameters are passed in to the fn function in the form of keyword arguments.
map(func, *iterables, timeout=None, chunksize=1): This function is similar to the global function map(func, *iterables), but this function will start multiple threads to immediately perform map processing on iterables in an asynchronous manner.
shutdown(wait=True): Close the thread pool.
After the program submits the task function to the thread pool, the submit method will return a Future object. Future The class is mainly used to obtain the return value of the thread task function. Since the thread task will be executed asynchronously in the new thread, the function executed by the thread is equivalent to a "to be completed in the future" task, so Python uses Future to represent.
In fact, there is also Future in Java's multi-threaded programming. The Future here is similar to Java's Future.
Future provides the following methods:
cancel(): Cancel the thread task represented by the Future. If the task is being executed and cannot be canceled, the method returns False; otherwise, the program cancels the task and returns True.
cancelled(): Returns whether the thread task represented by Future is successfully canceled.
running(): If the thread task represented by the Future is being executed and cannot be canceled, this method returns True.
done(): If the thread task represented by the Funture is successfully canceled or completed, this method returns True.
result(timeout=None): Get the final result returned by the thread task represented by the Future. If Future If the thread task represented has not been completed, this method will block the current thread, where the timeout parameter specifies the maximum number of seconds to block.
exception(timeout=None): Get the exception caused by the thread task represented by the Future. If the task completes successfully without exception, the method returns None.
add_done_callback(fn): Register a "callback function" for the thread task represented by this Future. When the task is successfully completed, the program will automatically trigger the fn function.
After using up a thread pool, the thread pool's shutdown() method should be called, which will start the thread pool's shutdown sequence. call shutdown() The thread pool after the method will no longer receive new tasks, but will complete all previously submitted tasks. When all tasks in the thread pool have been executed, all threads in the thread pool will die.
The steps to use a thread pool to perform thread tasks are as follows:
Call the constructor of the ThreadPoolExecutor class to create a thread pool.
Define a normal function as a thread task.
Call the submit() method of the ThreadPoolExecutor object to submit the thread task.
When you do not want to submit any tasks, call the shutdown() method of the ThreadPoolExecutor object to shut down the thread pool.
The following program demonstrates how to use the thread pool to perform thread tasks:
from concurrent.futures import ThreadPoolExecutor import threading import time # 定义一个准备作为线程任务的函数 def action(max): my_sum = 0 for i in range(max): print(threading.current_thread().name + ' ' + str(i)) my_sum += i return my_sum # 创建一个包含2条线程的线程池 pool = ThreadPoolExecutor(max_workers=2) # 向线程池提交一个task, 50会作为action()函数的参数 future1 = pool.submit(action, 50) # 向线程池再提交一个task, 100会作为action()函数的参数 future2 = pool.submit(action, 100) # 判断future1代表的任务是否结束 print(future1.done()) time.sleep(3) # 判断future2代表的任务是否结束 print(future2.done()) # 查看future1代表的任务返回的结果 print(future1.result()) # 查看future2代表的任务返回的结果 print(future2.result()) # 关闭线程池 pool.shutdown()
In the above program, the 13th line of code creates a thread pool containing two threads. In the next two lines of code, just add action() The function is submitted to the thread pool, and the thread pool is responsible for starting the thread to execute the action() function. This method of starting threads is both elegant and more efficient.
When the program submits the action() function to the thread pool, the submit() method will return the Future object corresponding to the task, and the program immediately determines the futurel done() method, which will return False (indicating that the task has not been completed at this time). Next, the main program pauses for 3 seconds, and then determines the done() of future2 method, if the task has been completed at this time, then this method will return True.
The program finally obtains the results returned by the two asynchronous tasks through the result() method of Future.
Readers can run this code themselves to see the results, which will not be demonstrated here.
When the program uses Future's result() method to obtain the result, this method will block the current thread if timeout is not specified. parameters, the current thread will remain blocked until the task represented by Future returns.
Get execution results
The previous program called the result() method of Future to obtain the return value of the thread task, but this method will block the current main thread and will only wait until the money process task is completed. , result() The method's blocking will be released.
If the program does not want to directly call the result() method to block the thread, it can pass Future's add_done_callback() Method to add a callback function, the callback function looks like fn(future). When the thread task is completed, the program will automatically trigger the callback function and transfer the corresponding Future The object is passed as a parameter to the callback function.
The following program uses the add_done_callback() method to obtain the return value of the thread task:
from concurrent.futures import ThreadPoolExecutor import threading import time # 定义一个准备作为线程任务的函数 def action(max): my_sum = 0 for i in range(max): print(threading.current_thread().name + ' ' + str(i)) my_sum += i return my_sum # 创建一个包含2条线程的线程池 with ThreadPoolExecutor(max_workers=2) as pool: # 向线程池提交一个task, 50会作为action()函数的参数 future1 = pool.submit(action, 50) # 向线程池再提交一个task, 100会作为action()函数的参数 future2 = pool.submit(action, 100) def get_result(future): print(future.result()) # 为future1添加线程完成的回调函数 future1.add_done_callback(get_result) # 为future2添加线程完成的回调函数 future2.add_done_callback(get_result) print('--------------')
The main program above adds the same callback function to future1 and future2 respectively. This callback function will be used in the thread task. Get its return value when finished.
The last line of code in the main program prints a horizontal line. Because the program does not directly call result() of future1 and future2 method, so the main thread will not be blocked and you can immediately see the horizontal lines printed by the output main thread. Next, you will see two new threads executing concurrently. When the thread task is completed, get_result() The function is triggered and outputs the return value of the thread task.
In addition, since the thread pool implements the Context Manage Protocol, the program can use with statement to manage the thread pool, thus avoiding the need to manually close the thread pool, as shown in the program above.
In addition, Exectuor also provides a map(func, *iterables, timeout=None, chunksize=1) method. The function of this method is similar to the global function map(). The difference is that the map() method of the thread pool will start a thread for each element of iterables to execute func in a concurrent manner. function. This method is equivalent to starting len(iterables) threads and collecting the execution results of each thread.
For example, the following program uses the map() method of Executor to start threads and collects the return value of the thread task:
from concurrent.futures import ThreadPoolExecutor import threading import time # 定义一个准备作为线程任务的函数 def action(max): my_sum = 0 for i in range(max): print(threading.current_thread().name + ' ' + str(i)) my_sum += i return my_sum # 创建一个包含4条线程的线程池 with ThreadPoolExecutor(max_workers=4) as pool: # 使用线程执行map计算 # 后面元组有3个元素,因此程序启动3条线程来执行action函数 results = pool.map(action, (50, 100, 150)) print('--------------') for r in results: print(r)
The above program uses the map() method to start 3 threads (the The thread pool of the program contains 4 threads, if you continue to use a thread pool containing only two threads, one task will be in a waiting state at this time, and you must wait for one of the tasks to complete before the thread becomes free before it gets a chance to execute), map() The return value of the method will collect the return results of each thread task.
Run the above program, you can also see the results of the concurrent execution of 3 threads. Finally, you can see the return results of the 3 thread tasks through results.
As can be seen from the above program, using the map() method to start a thread and collect the execution results of the thread not only has the advantage of simple code, but also although the program will execute action() concurrently function, but the execution result of the action() function collected at the end is still consistent with the result of the passed in parameters. That is, the first element of results above is action(50) The second element is the result of action(100), and the third element is the result of action(150).
The above is the detailed content of Python thread pool and its principles and uses. For more information, please follow other related articles on the PHP Chinese website!