Maison  >  Article  >  développement back-end  >  Introduction au module de communication de données inter-processus multiprocessing.Manager en Python

Introduction au module de communication de données inter-processus multiprocessing.Manager en Python

不言
不言avant
2019-03-23 11:08:474324parcourir

Cet article vous présente le module de communication de données inter-processus multiprocessing.Manager en Python. Il a une certaine valeur de référence. Les amis dans le besoin peuvent s'y référer.

Actuellement, au cours du développement, il existe des situations où les données doivent être partagées entre les processus. J'ai donc étudié le multiprocessing.Manager, en utilisant principalement dict comme exemple pour illustrer le partage inter-processus (même processus parent).

Instructions d'utilisation de dict

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
# 3. 创建一个测试程序
def test(idx, test_dict):
    test_dict[idx] = idx
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

trop simples.

Analyse simple du code source

Regardons maintenant un autre exemple

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict):
    test_dict['test'][idx] = idx
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

Vous pouvez voir que le résultat de sortie est étrange{'test': {}}
Si nous modifions simplement le code

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict):
    row = test_dict['test']
    row[idx] = idx
    test_dict['test'] = row
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

Le résultat de sortie sera conforme aux attentes.

Afin de comprendre la raison de ce phénomène La raison est que j'ai simplement lu le code source et trouvé que les morceaux de code suivants sont essentiels

def Manager():
    '''
    Returns a manager associated with a running server process

    The managers methods such as `Lock()`, `Condition()` and `Queue()`
    can be used to create shared objects.
    '''
    from multiprocessing.managers import SyncManager
    m = SyncManager()
    m.start()
    return m
    
...
    def start(self, initializer=None, initargs=()):
        '''
        Spawn a server process for this manager object
        '''
        assert self._state.value == State.INITIAL

        if initializer is not None and not hasattr(initializer, '__call__'):
            raise TypeError('initializer must be a callable')

        # pipe over which we will retrieve address of server
        reader, writer = connection.Pipe(duplex=False)

        # spawn process which runs a server
        self._process = Process(
            target=type(self)._run_server,
            args=(self._registry, self._address, self._authkey,
                  self._serializer, writer, initializer, initargs),
            )
        ident = ':'.join(str(i) for i in self._process._identity)
        self._process.name = type(self).__name__  + '-' + ident
        self._process.start()
...

Comme le montre le code ci-dessus, lorsque nous déclarons un objet Manager. , le programme démarre en fait dans d'autres processus. Un service serveur est créé. Ce serveur est bloqué pour assurer la sécurité des données inter-processus.
Je crois comprendre que les opérations entre différents processus s'excluent mutuellement. le serveur, puis Cette partie des données est modifiée et renvoyée au serveur, puis le serveur traite les requêtes des autres processus

Revenons au phénomène étrange ci-dessus, cette opération test_dict['test'][idx] = idx est en fait effectuée après. extraire les données sur le serveur, mais non renvoyées au serveur, donc les données de temp_dict n'ont pas changé du tout dans le deuxième code normal, cela équivaut à demander d'abord les données au serveur, puis à transmettre les données modifiées. au serveur. Cela peut expliquer ce phénomène.

Sécurité des données inter-processus

Que se passe-t-il si une situation se produit à ce moment et que deux processus demandent les mêmes données en même temps, modifiez-les. les séparément, puis les soumettre au serveur ? Bien sûr, les données sont anormales. Sur cette base, nous avons besoin d'un autre objet de Manager, Lock(). Cet objet n'est pas difficile à comprendre. dict et lock proviennent tous deux de ce serveur, donc lorsque vous verrouillez, les autres processus ne peuvent pas obtenir les données et, naturellement, la situation anormale ci-dessus ne se produira pas.

Exemple de code :

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
lock = manager.Lock()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict, lock):
    lock.acquire()
    row = test_dict['test']
    row[idx] = idx
    test_dict['test'] = row
    lock.release()
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict, lock))
pool.close()
pool.join()
print(temp_dict)

Ne créez pas vous-même un nouveau verrou dans le processus Objet, utilisez un objet de verrouillage unifié

Cet article est terminé ici Pour d'autres contenus passionnants, vous pouvez faire attention au . tutoriel vidéo python colonne du site PHP chinois !

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer