Heim > Artikel > Backend-Entwicklung > Einführung in das prozessübergreifende Datenkommunikationsmodul multiprocessing.Manager in Python
Dieser Artikel bietet Ihnen eine Einführung in das prozessübergreifende Datenkommunikationsmodul in Python. Ich hoffe, dass es für Freunde hilfreich ist.
Derzeit gibt es während der Entwicklung Situationen, in denen Daten zwischen Prozessen ausgetauscht werden müssen. Deshalb habe ich Multiprocessing.Manager untersucht und dabei hauptsächlich dict als Beispiel verwendet, um die gemeinsame Nutzung zwischen Prozessen (gleicher übergeordneter Prozess) zu veranschaulichen.
Anleitung zur Verwendung von dict
import multiprocessing # 1. 创建一个Manger对象 manager = multiprocessing.Manager() # 2. 创建一个dict temp_dict = manager.dict() # 3. 创建一个测试程序 def test(idx, test_dict): test_dict[idx] = idx # 4. 创建进程池进行测试 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict)) pool.close() pool.join() print(temp_dict)
zu einfach.
Schauen wir uns nun ein weiteres Beispiel an
import multiprocessing # 1. 创建一个Manger对象 manager = multiprocessing.Manager() # 2. 创建一个dict temp_dict = manager.dict() temp_dict['test'] = {} # 3. 创建一个测试程序 def test(idx, test_dict): test_dict['test'][idx] = idx # 4. 创建进程池进行测试 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict)) pool.close() pool.join() print(temp_dict)
, um die Ausgabe zu sehen Das Ergebnis ist seltsam{'test': {}}
Wenn wir einfach den Code ändern
import multiprocessing # 1. 创建一个Manger对象 manager = multiprocessing.Manager() # 2. 创建一个dict temp_dict = manager.dict() temp_dict['test'] = {} # 3. 创建一个测试程序 def test(idx, test_dict): row = test_dict['test'] row[idx] = idx test_dict['test'] = row # 4. 创建进程池进行测试 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict)) pool.close() pool.join() print(temp_dict)
Das Ausgabeergebnis wird den Erwartungen entsprechen.
Um die Gründe für dieses Phänomen zu verstehen, habe ich Einfach lesen Nachdem wir uns den Quellcode angesehen haben, sind die folgenden Codeteile der Schlüssel.
def Manager(): ''' Returns a manager associated with a running server process The managers methods such as `Lock()`, `Condition()` and `Queue()` can be used to create shared objects. ''' from multiprocessing.managers import SyncManager m = SyncManager() m.start() return m ... def start(self, initializer=None, initargs=()): ''' Spawn a server process for this manager object ''' assert self._state.value == State.INITIAL if initializer is not None and not hasattr(initializer, '__call__'): raise TypeError('initializer must be a callable') # pipe over which we will retrieve address of server reader, writer = connection.Pipe(duplex=False) # spawn process which runs a server self._process = Process( target=type(self)._run_server, args=(self._registry, self._address, self._authkey, self._serializer, writer, initializer, initargs), ) ident = ':'.join(str(i) for i in self._process._identity) self._process.name = type(self).__name__ + '-' + ident self._process.start() ...
Wie aus dem obigen Code ersichtlich ist, startet das Programm tatsächlich einen Serverdienst in einem anderen Prozess, wenn wir ein Manager-Objekt deklarieren . Dieser Server ist blockierend, um die Datensicherheit zwischen Prozessen zu erreichen.
Nach meinem Verständnis schließen sich Vorgänge zwischen verschiedenen Prozessen gegenseitig aus Es wird an den Server gesendet, und dann verarbeitet der Server Anforderungen von anderen Prozessen.
Zurück zum seltsamen Phänomen oben: Dieser Vorgangtest_dict['test'][idx] = idx
wird tatsächlich geändert, nachdem die Daten auf dem Server abgerufen wurden, aber sie werden nicht zurückgegeben an den Server, sodass sich die Daten von temp_dict überhaupt nicht geändert haben. Dies entspricht der Anforderung von Daten vom Server und der anschließenden Übertragung der geänderten Daten an den Server
import multiprocessing # 1. 创建一个Manger对象 manager = multiprocessing.Manager() # 2. 创建一个dict temp_dict = manager.dict() lock = manager.Lock() temp_dict['test'] = {} # 3. 创建一个测试程序 def test(idx, test_dict, lock): lock.acquire() row = test_dict['test'] row[idx] = idx test_dict['test'] = row lock.release() # 4. 创建进程池进行测试 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict, lock)) pool.close() pool.join() print(temp_dict)
Erstellen Sie im Prozess kein neues Sperrobjekt. Verwenden Sie ein einheitliches Sperrobjekt.
Dieser Artikel ist hier zu Ende. Weitere spannende Inhalte finden Sie in der SpaltePython-Video-Tutorial auf der chinesischen PHP-Website!
Das obige ist der detaillierte Inhalt vonEinführung in das prozessübergreifende Datenkommunikationsmodul multiprocessing.Manager in Python. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!