首頁 >後端開發 >Python教學 >python中進程間資料通訊模組multiprocessing.Manager的介紹

python中進程間資料通訊模組multiprocessing.Manager的介紹

不言
不言轉載
2019-03-23 11:08:474436瀏覽

本篇文章帶給大家的內容是關於python中進程間資料通訊模組multiprocessing.Manager的介紹,有一定的參考價值,有需要的朋友可以參考一下,希望對你有幫助。

目前開發中有遇到進程間需要共享資料的情況. 所以研究了下multiprocessing.Manager, 主要會以dict為例子, 說明下進程間共享(同一個父進程).

dict使用說明

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
# 3. 创建一个测试程序
def test(idx, test_dict):
    test_dict[idx] = idx
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

too simple.

簡單的原始碼分析

這時我們再看一個例子

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict):
    test_dict['test'][idx] = idx
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

可以看到輸出結果是奇怪的{'test': {}}
如果我們簡單修改一下程式碼

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict):
    row = test_dict['test']
    row[idx] = idx
    test_dict['test'] = row
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

這時輸出結果就符合預期了.

為了了解這個現象背後的原因, 我簡單去讀了一下源碼, 主要有以下幾段代碼很關鍵.

def Manager():
    '''
    Returns a manager associated with a running server process

    The managers methods such as `Lock()`, `Condition()` and `Queue()`
    can be used to create shared objects.
    '''
    from multiprocessing.managers import SyncManager
    m = SyncManager()
    m.start()
    return m
    
...
    def start(self, initializer=None, initargs=()):
        '''
        Spawn a server process for this manager object
        '''
        assert self._state.value == State.INITIAL

        if initializer is not None and not hasattr(initializer, '__call__'):
            raise TypeError('initializer must be a callable')

        # pipe over which we will retrieve address of server
        reader, writer = connection.Pipe(duplex=False)

        # spawn process which runs a server
        self._process = Process(
            target=type(self)._run_server,
            args=(self._registry, self._address, self._authkey,
                  self._serializer, writer, initializer, initargs),
            )
        ident = ':'.join(str(i) for i in self._process._identity)
        self._process.name = type(self).__name__  + '-' + ident
        self._process.start()
...

上面代碼可以看出, 當我們聲明了一個Manager對象的時候, 程序實際在其他進程啟動了一個server服務, 這個server是阻塞的, 以此來實現進程間數據安全.
我的理解就是不同進程之間操作都是互斥的, 一個進程向server請求到這部分數據, 再把這部分資料修改, 回傳給server, 之後server再去處理其他行程的請求.

回到上面的奇怪現像上, 這個操作test_dict['test'][idx] = idx實際上在拉取到server上的資料後進行了修改, 但並沒有返回給server, 所以temp_dict的資料根本沒有變化. 在第二段正常程式碼, 就相當於先向伺服器請求數據, 再向伺服器傳送修改後的資料. 這樣就可以解釋這個現象了.

進程間資料安全

這個時候如果出現一種情況, 兩個進程同時請求了一份相同的資料, 分別進行修改, 再提交到server上會怎麼樣呢? 那當然是資料產生異常. 基於此, 我們需要Manager的另一個物件, Lock(). 這個物件也不難理解, Manager本身就是一個server, dict跟lock都來自於這個server, 所以當你lock住的時候, 其他進程是不能取到資料, 自然也不會出現上面那種異常情況.

程式碼範例:

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
lock = manager.Lock()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict, lock):
    lock.acquire()
    row = test_dict['test']
    row[idx] = idx
    test_dict['test'] = row
    lock.release()
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict, lock))
pool.close()
pool.join()
print(temp_dict)

切忌不要進程裡自己新建lock物件, 要使用統一的lock物件.

本篇文章到這裡就已經全部結束了,更多其他精彩內容可以追蹤PHP中文網的python影片教學專欄!

#

以上是python中進程間資料通訊模組multiprocessing.Manager的介紹的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:segmentfault.com。如有侵權,請聯絡admin@php.cn刪除