Python3.2中引入的concurrent非常的好用,只用几行代码就可以编写出线程池/进程池,并且计算型任务效率和mutiprocessing.pool提供的poll和ThreadPoll相比不分伯仲,而且在IO型任务由于引入了Future的概念效率要高数倍。
而threading的话还要自己维护相关的队列防止死锁,代码的可读性也会下降,相反concurrent提供的线程池却非常的便捷,不用自己操心死锁以及编写线程池代码,由于异步的概念IO型任务也更有优势。
既然如此,如果不是为了向下兼容2.x,是不是可以完全没有必要继续使用mutiprocessing和threading了?concurrent如此的优秀。
阿神2017-04-18 10:13:53
concurrent is indeed very useful, mainly providing ThreadPoolExecutor and ProcessPoolExecutor. A multi-thread, a multi-process. But concurrent is essentially an encapsulation of threading and mutiprocessing. You can find out by looking at its source code.
ThreadPoolExecutor provides its own task queue, so there is no need to write it yourself. The so-called thread pool simply compares the current number of threads with the defined size of max_workers. If the size is smaller than max_workers, the task is allowed to create threads to execute the task. You can see the source code
def _adjust_thread_count(self):
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
So if you maintain the queue yourself, it’s no problem. Cocurrent also maintains a queue internally and it’s just written for you.
As for the deadlock problem, concurrent can also cause deadlock problems. Let me give you an example, run it and see
import time
from concurrent.futures import ThreadPoolExecutor
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
ProcessPoolExecutor also uses mutiprocessing internally. It can make full use of the characteristics of multi-core and get rid of the restrictions of GIL. Note that when defining ProcessPoolExecutor(max_workers=2), max_workers is slightly larger than the number of CPU cores and cannot be too large. ProcessPoolExecutor internally maintains a call_queue to maintain the task queue, and its type is multiprocessing.Queue. There is also a thread that manages the queue. This can be said to be an optimization of cocurrent.
You can see the source code for details. self._adjust_process_count() actually starts the process to execute the task. Click on _adjust_process_count and you will know it at a glance. self._queue_management_thread is the thread that manages the queue
if self._queue_management_thread is None:
# Start the processes so that their sentinels are known.
self._adjust_process_count()
self._queue_management_thread = threading.Thread(
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue
So cocurrent is easy to use, that is, it does some better processing by itself, such as maintaining queues and managing queue threads, so you don’t need to worry about it anymore. Of course you can also implement it yourself. You can use cocurrent to achieve this. It can be achieved with threading and mutiprocessing. At most, you have to do extra work yourself. Because cocurrent essentially uses these two cores. Of course, it would be best if you have a better cocurrent that is already available. You can use it directly instead of reinventing the wheel yourself. So which one to use depends on your personal familiarity. For example, I use python2, but I can’t use cocurrent. Had to use threading.
阿神2017-04-18 10:13:53
The person above has already said it very clearly, I just want to add a little bit.
Concurrent.future uses the concept of asynchronous to manage threads/processes, but it does not actually encapsulate asynchronous IO, so the IO efficiency mentioned by the question is improved. It’s actually wrong.