Home >Backend Development >Python Tutorial >Python concurrent programming thread pool/process pool

Python concurrent programming thread pool/process pool

巴扎黑
巴扎黑Original
2017-03-18 11:37:101893browse

Introduction

The Python standard library provides us with the threading and multiprocessing modules to write corresponding multi-threading/multi-process code. However, when the project reaches a certain scale, frequent creation/destruction of processes or threads is very resource intensive. Yes, at this time we have to write our own thread pool/process pool to trade space for time. But starting from Python3.2, the standard library provides us with the concurrent.futures module, which provides two classes: ThreadPoolExecutor and ProcessPoolExecutor, which realizes further abstraction of threading and multiprocessing. , provides direct support for writing thread pools/process pools.

Executor and Future

The basis of the concurrent.futures module is Executor. Executor is an abstract class and cannot be used directly. However, the two subclasses ThreadPoolExecutor and ProcessPoolExecutor it provides are very useful. As the names suggest, they are used to create thread pool and process pool codes respectively. We can put the corresponding tasks directly into the thread pool/process pool, and there is no need to maintain the Queue to worry about deadlocks. The thread pool/process pool will automatically schedule it for us.

Future I believe that friends who have experience in programming under java and nodejs will be familiar with this concept. You can understand it as an operation to be completed in the future, this It is the basis of asynchronous programming. In traditional programming mode, for example, when we operate queue.get, blocking will occur before waiting for the result to be returned, and the CPU cannot be freed to do other things. The introduction of Future helps us to complete the task during the waiting period. Other operations. Regarding asynchronous IO in Python, you can refer to my Python concurrent programming coroutine/asynchronous IO after reading this article.

p.s: If you are still sticking to Python2.x, please install the futures module first.

pip install futures

Use submit to operate thread pool/process pool

Let’s first understand the concept of thread pool through the following code

# example1.py
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ThreadPoolExecutor(max_workers=2)  # 创建一个最大可容纳2个task的线程池
future1 = pool.submit(return_future_result, ("hello"))  # 往线程池里面加入一个task
future2 = pool.submit(return_future_result, ("world"))  # 往线程池里面加入一个task
print(future1.done())  # 判断task1是否结束
time.sleep(3)
print(future2.done())  # 判断task2是否结束
print(future1.result())  # 查看task1返回的结果
print(future2.result())  # 查看task2返回的结果

We based on the running results Let’s analyze it. We use the submit method to add a task to the thread pool, and submit returns a Future object. The Future object can be simply understood as an operation completed in the future. In the first print statement, it is obvious that our future1 has not been completed because of time.sleep(2), because we used time.sleep(3) to pause the main thread, so when it comes to the second print statement, our thread pool All tasks here have been completed.

ziwenxie :: ~ » python example1.py
False
True
hello
world
# 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8361  7557  8361  3    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8362  0    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8363  0    3 19:45 pts/0    00:00:00 python example1.py

We can also rewrite the above code into a process pool form. The API and thread pool are exactly the same, so I won’t be wordy.

# example2.py
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(return_future_result, ("hello"))
future2 = pool.submit(return_future_result, ("world"))
print(future1.done())
time.sleep(3)
print(future2.done())
print(future1.result())
print(future2.result())

The following are the running results

ziwenxie :: ~ » python example2.py
False
True
hello
world
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8560  7557  8560  3    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8563  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8564  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8561  8560  8561  0    1 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8562  8560  8562  0    1 19:53 pts/0    00:00:00 python example2.py

Use map/wait to operate thread pool/process pool

In addition to submit, Exectuor also provides us with the map method and built-in The usage of map is similar. Let's compare the difference between the two through two examples.

Review of using submit operation

# example3.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

As can be seen from the running results, as_completed is not returned in the order of the URLS list elements.

ziwenxie :: ~ » python example3.py
'http://example.com/' page is 1270 byte
'https://api.github.com/' page is 2039 bytes
'http://httpbin.org' page is 12150 bytes

Use map

# example4.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for url, data in zip(URLS, executor.map(load_url, URLS)):
        print('%r page is %d bytes' % (url, len(data)))

As can be seen from the running results, map returns in the order of the URLS list elements, and the code written is more concise and intuitive. We You can choose any one according to your specific needs.

ziwenxie :: ~ » python example4.py
'http://httpbin.org' page is 12150 bytes
'http://example.com/' page is 1270 bytes
'https://api.github.com/' page is 2039 bytes

The third option is wait

The wait method will return a tuple (tuple). The tuple contains two sets (sets), one is completed (completed) and the other Is uncompleted (unfinished). One advantage of using the wait method is to gain greater freedom. It receives three parameters: FIRST_COMPLETED, FIRST_EXCEPTION and ALL_COMPLETE. The default setting is ALL_COMPLETED.

Let’s take a look at the difference between the three parameters through the following example

from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_random_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_random_secs, x))
print(wait(futures))
# print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))

If the default ALL_COMPLETED is used, the program will block until all tasks in the thread pool are completed.

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f0b06c9bc88 state=finished returned str>,
<Future at 0x7f0b06cbaa90 state=finished returned str>,
<Future at 0x7f0b06373898 state=finished returned str>,
<Future at 0x7f0b06352ba8 state=finished returned str>,
<Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())

If the FIRST_COMPLETED parameter is used, the program will not wait until all tasks in the thread pool are completed.

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f84109edb00 state=finished returned str>,
<Future at 0x7f840e2e9320 state=finished returned str>,
<Future at 0x7f840f25ccc0 state=finished returned str>},
not_done={<Future at 0x7f840e2e9ba8 state=running>,
<Future at 0x7f840e2e9940 state=running>})

Thinking Questions

Write a small program to compare the execution efficiency gap between multiprocessing.pool(ThreadPool) and ProcessPollExecutor(ThreadPoolExecutor), and think about why this happens based on the Future mentioned above result.

The above is the detailed content of Python concurrent programming thread pool/process pool. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn