Heim  >  Artikel  >  Backend-Entwicklung  >  Einführung in das prozessübergreifende Datenkommunikationsmodul multiprocessing.Manager in Python

Einführung in das prozessübergreifende Datenkommunikationsmodul multiprocessing.Manager in Python

不言
不言nach vorne
2019-03-23 11:08:474378Durchsuche

Dieser Artikel bietet Ihnen eine Einführung in das prozessübergreifende Datenkommunikationsmodul in Python. Ich hoffe, dass es für Freunde hilfreich ist.

Derzeit gibt es während der Entwicklung Situationen, in denen Daten zwischen Prozessen ausgetauscht werden müssen. Deshalb habe ich Multiprocessing.Manager untersucht und dabei hauptsächlich dict als Beispiel verwendet, um die gemeinsame Nutzung zwischen Prozessen (gleicher übergeordneter Prozess) zu veranschaulichen.

Anleitung zur Verwendung von dict

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
# 3. 创建一个测试程序
def test(idx, test_dict):
    test_dict[idx] = idx
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

zu einfach.

Einfache Quellcode-Analyse

Schauen wir uns nun ein weiteres Beispiel an

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict):
    test_dict['test'][idx] = idx
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

, um die Ausgabe zu sehen Das Ergebnis ist seltsam{'test': {}}
Wenn wir einfach den Code ändern

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict):
    row = test_dict['test']
    row[idx] = idx
    test_dict['test'] = row
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

Das Ausgabeergebnis wird den Erwartungen entsprechen.

Um die Gründe für dieses Phänomen zu verstehen, habe ich Einfach lesen Nachdem wir uns den Quellcode angesehen haben, sind die folgenden Codeteile der Schlüssel.

def Manager():
    '''
    Returns a manager associated with a running server process

    The managers methods such as `Lock()`, `Condition()` and `Queue()`
    can be used to create shared objects.
    '''
    from multiprocessing.managers import SyncManager
    m = SyncManager()
    m.start()
    return m
    
...
    def start(self, initializer=None, initargs=()):
        '''
        Spawn a server process for this manager object
        '''
        assert self._state.value == State.INITIAL

        if initializer is not None and not hasattr(initializer, '__call__'):
            raise TypeError('initializer must be a callable')

        # pipe over which we will retrieve address of server
        reader, writer = connection.Pipe(duplex=False)

        # spawn process which runs a server
        self._process = Process(
            target=type(self)._run_server,
            args=(self._registry, self._address, self._authkey,
                  self._serializer, writer, initializer, initargs),
            )
        ident = ':'.join(str(i) for i in self._process._identity)
        self._process.name = type(self).__name__  + '-' + ident
        self._process.start()
...

Wie aus dem obigen Code ersichtlich ist, startet das Programm tatsächlich einen Serverdienst in einem anderen Prozess, wenn wir ein Manager-Objekt deklarieren . Dieser Server ist blockierend, um die Datensicherheit zwischen Prozessen zu erreichen.
Nach meinem Verständnis schließen sich Vorgänge zwischen verschiedenen Prozessen gegenseitig aus Es wird an den Server gesendet, und dann verarbeitet der Server Anforderungen von anderen Prozessen.

Zurück zum seltsamen Phänomen oben: Dieser Vorgangtest_dict['test'][idx] = idx wird tatsächlich geändert, nachdem die Daten auf dem Server abgerufen wurden, aber sie werden nicht zurückgegeben an den Server, sodass sich die Daten von temp_dict überhaupt nicht geändert haben. Dies entspricht der Anforderung von Daten vom Server und der anschließenden Übertragung der geänderten Daten an den Server

Prozess Zwischendatensicherheit

Was passiert, wenn zu diesem Zeitpunkt eine Situation auftritt und zwei Prozesse gleichzeitig dieselben Daten anfordern, diese separat ändern und sie dann natürlich an den Server übermitteln? Daten sind abnormal. Auf dieser Grundlage benötigen wir ein weiteres Objekt von Manager, Lock(). Dieses Objekt ist nicht schwer zu verstehen, und sowohl dict als auch lock kommen von diesem Server, also wenn Sie es sperren Wenn die Daten empfangen werden, tritt die oben genannte Ausnahmesituation natürlich nicht auf.

Codebeispiel:

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
lock = manager.Lock()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict, lock):
    lock.acquire()
    row = test_dict['test']
    row[idx] = idx
    test_dict['test'] = row
    lock.release()
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict, lock))
pool.close()
pool.join()
print(temp_dict)

Erstellen Sie im Prozess kein neues Sperrobjekt. Verwenden Sie ein einheitliches Sperrobjekt.

Dieser Artikel ist hier zu Ende. Weitere spannende Inhalte finden Sie in der Spalte

Python-Video-Tutorial auf der chinesischen PHP-Website!

Das obige ist der detaillierte Inhalt vonEinführung in das prozessübergreifende Datenkommunikationsmodul multiprocessing.Manager in Python. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:segmentfault.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen