この記事では、Python のプロセス間データ通信モジュール multiprocessing.Manager について紹介します。一定の参考価値があります。困っている友人は参考にしてください。お役に立てれば幸いです。
現在開発中、プロセス間でデータを共有する必要がある場面があるため、主にdictを例にしてプロセス間共有(同一親プロセス)を説明するためにmultiprocessing.Managerを勉強しました。
dict の使用方法import multiprocessing # 1. 创建一个Manger对象 manager = multiprocessing.Manager() # 2. 创建一个dict temp_dict = manager.dict() # 3. 创建一个测试程序 def test(idx, test_dict): test_dict[idx] = idx # 4. 创建进程池进行测试 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict)) pool.close() pool.join() print(temp_dict)単純すぎます。簡単なソース コード分析次に、別の例を見てみましょう
import multiprocessing # 1. 创建一个Manger对象 manager = multiprocessing.Manager() # 2. 创建一个dict temp_dict = manager.dict() temp_dict['test'] = {} # 3. 创建一个测试程序 def test(idx, test_dict): test_dict['test'][idx] = idx # 4. 创建进程池进行测试 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict)) pool.close() pool.join() print(temp_dict)出力を確認できます。結果は奇妙です
{'test': {}}コードを変更するだけなら
import multiprocessing # 1. 创建一个Manger对象 manager = multiprocessing.Manager() # 2. 创建一个dict temp_dict = manager.dict() temp_dict['test'] = {} # 3. 创建一个测试程序 def test(idx, test_dict): row = test_dict['test'] row[idx] = idx test_dict['test'] = row # 4. 创建进程池进行测试 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict)) pool.close() pool.join() print(temp_dict)出力結果は期待どおりになります。この現象の背後にある理由を理解するために、ソース コードを簡単に読んでみました。次のコードが重要です。
def Manager(): ''' Returns a manager associated with a running server process The managers methods such as `Lock()`, `Condition()` and `Queue()` can be used to create shared objects. ''' from multiprocessing.managers import SyncManager m = SyncManager() m.start() return m ... def start(self, initializer=None, initargs=()): ''' Spawn a server process for this manager object ''' assert self._state.value == State.INITIAL if initializer is not None and not hasattr(initializer, '__call__'): raise TypeError('initializer must be a callable') # pipe over which we will retrieve address of server reader, writer = connection.Pipe(duplex=False) # spawn process which runs a server self._process = Process( target=type(self)._run_server, args=(self._registry, self._address, self._authkey, self._serializer, writer, initializer, initargs), ) ident = ':'.join(str(i) for i in self._process._identity) self._process.name = type(self).__name__ + '-' + ident self._process.start() ...上記のコードからわかるように、Manager オブジェクトを宣言すると、実際にはプログラムが他のプロセスがサーバー サービスを開始し、このサーバーはプロセス間のデータ セキュリティを達成するためにブロックされます。
私の理解では、異なるプロセス間の操作は相互に排他的であり、プロセスはサーバーにデータのこの部分を要求します。その後、変更します。データのこの部分を取得してサーバーに返すと、サーバーは他のプロセスのリクエストを処理します。
test_dict['test'][idx ] = idx は実際にはサーバーからデータを取得した後に変更されますが、サーバーには返されないため、temp_dict のデータはまったく変更されていません。通常のコードの 2 番目の部分では、リクエストと同等です。最初にサーバーからデータを送信し、次に変更されたデータをサーバーに送信します。これにより、この現象が説明できます。
import multiprocessing # 1. 创建一个Manger对象 manager = multiprocessing.Manager() # 2. 创建一个dict temp_dict = manager.dict() lock = manager.Lock() temp_dict['test'] = {} # 3. 创建一个测试程序 def test(idx, test_dict, lock): lock.acquire() row = test_dict['test'] row[idx] = idx test_dict['test'] = row lock.release() # 4. 创建进程池进行测试 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict, lock)) pool.close() pool.join() print(temp_dict)
プロセス内で新しいロック オブジェクトを作成しないで、統合されたロック オブジェクトを使用してください。
この記事はここで終了です。他にも興味深い記事があります。 PHP 中国語 Web サイトの Python ビデオ チュートリアル
列に注目してください。以上がPython のプロセス間データ通信モジュール multiprocessing.Manager の紹介の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。