搜尋

首頁  >  問答  >  主體

Python有了concurrent的话mutiprocessing和threading还有存在的意义吗?

Python3.2中引入的concurrent非常的好用,只用几行代码就可以编写出线程池/进程池,并且计算型任务效率和mutiprocessing.pool提供的poll和ThreadPoll相比不分伯仲,而且在IO型任务由于引入了Future的概念效率要高数倍。

而threading的话还要自己维护相关的队列防止死锁,代码的可读性也会下降,相反concurrent提供的线程池却非常的便捷,不用自己操心死锁以及编写线程池代码,由于异步的概念IO型任务也更有优势。

既然如此,如果不是为了向下兼容2.x,是不是可以完全没有必要继续使用mutiprocessing和threading了?concurrent如此的优秀。

高洛峰高洛峰2803 天前941

全部回覆(3)我來回復

  • 阿神

    阿神2017-04-18 10:13:53

    concurrent的確很好用,主要提供了ThreadPoolExecutor和ProcessPoolExecutor。一個多線程,一個多進程。但concurrent本質上都是threading和mutiprocessing的封裝。看它的源碼可以知道。
    ThreadPoolExecutor自己提供了任務佇列,不需要自己寫了。而所謂的線程池,它只是簡單的比較當前的threads數量和定義的max_workers的大小,小於max_workers就允許任務創建線程執行任務。可以看源碼

    def _adjust_thread_count(self):

    # When the executor gets lost, the weakref callback will wake up
    # the worker threads.
    def weakref_cb(_, q=self._work_queue):
        q.put(None)
    # TODO(bquinlan): Should avoid creating new threads if there are more
    # idle threads than items in the work queue.
    if len(self._threads) < self._max_workers:
        t = threading.Thread(target=_worker,
                             args=(weakref.ref(self, weakref_cb),
                                   self._work_queue))
        t.daemon = True
        t.start()
        self._threads.add(t)
        _threads_queues[t] = self._work_queue
      

    所以如果你自己維護隊列的話也沒問題,cocurrent內部也是自己維護了一個隊列,它給你寫好了而已。
    至於死鎖問題concurrent也會造成死鎖的問題。給你一個例子,跑看看

    import time
    from concurrent.futures import ThreadPoolExecutor
    
    def wait_on_b():
        time.sleep(5)
        print(b.result()) # b will never complete because it is waiting on a.
        return 5
    
    def wait_on_a():
        time.sleep(5)
        print(a.result()) # a will never complete because it is waiting on b.
        return 6
    
    
    executor = ThreadPoolExecutor(max_workers=2)
    a = executor.submit(wait_on_b)
    b = executor.submit(wait_on_a)
    
    

    ProcessPoolExecutor 內部也是使用的mutiprocessing。能夠從充分利用多核心的特性,擺脫GIL的限制。注意定義ProcessPoolExecutor(max_workers=2)的時候max_workers稍大於CPU的核數,不能太大。 ProcessPoolExecutor內部維持了一個call_queue用來保持任務佇列,其型別是multiprocessing.Queue。還有一個管理隊列的執行緒。這可以說是cocurrent的最佳化。
    具體可以看​​源碼,self._adjust_process_count()其實就是開啟進程執行任務,點進去_adjust_process_count一看就知道。 self._queue_management_thread是管理佇列的執行緒

    if self._queue_management_thread is None:
                # Start the processes so that their sentinels are known.
                self._adjust_process_count()
                self._queue_management_thread = threading.Thread(
                        target=_queue_management_worker,
                        args=(weakref.ref(self, weakref_cb),
                              self._processes,
                              self._pending_work_items,
                              self._work_ids,
                              self._call_queue,
                              self._result_queue))
                self._queue_management_thread.daemon = True
                self._queue_management_thread.start()
                _threads_queues[self._queue_management_thread] = self._result_queue
    

    所以說cocurrent好用,就是它自己做了一些更好的處理,譬如維持隊列,管理隊列線程,不需要你再操心。當然你也可以自己實現。你能用cocurrent實現的。用threading和mutiprocessing都能實現,大不了自己再做些額外的工作。因為cocurrent本質上核心也是用的這2個。當然有了現成的更好的cocurrent最好了,直接拿來使用,省的自己再造輪子。所以說用哪個看個人熟悉程度,譬如我用的python2,就用不了cocurrent。只好用threading。

    回覆
    0
  • 阿神

    阿神2017-04-18 10:13:53

    上面那位已經說的很清楚了,我只是稍微補充一下.
    Concurrent.future使用了異步的概念管理了線程/進程,但它實際上並沒有封裝異步IO,所以題主說的IO效率提高實際上是有誤的.

    回覆
    0
  • 伊谢尔伦

    伊谢尔伦2017-04-18 10:13:53

    concurrent是協程,不是線程,兩個概念。

    回覆
    0
  • 取消回覆