>  기사  >  백엔드 개발  >  Python의 프로세스 간 데이터 통신 모듈 multiprocessing.Manager 소개

Python의 프로세스 간 데이터 통신 모듈 multiprocessing.Manager 소개

不言
不言앞으로
2019-03-23 11:08:474406검색

이 글은 프로세스간 데이터 통신 모듈 멀티프로세싱에 대해 소개합니다. Python의 관리자입니다. 도움이 필요한 친구들이 참고할 수 있기를 바랍니다.

현재 개발 중에 프로세스 간에 데이터를 공유해야 하는 상황이 있습니다. 그래서 프로세스 간 공유(동일한 상위 프로세스)를 설명하기 위해 주로 dict를 예로 사용하여 multiprocessing.Manager를 연구했습니다.

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
# 3. 创建一个测试程序
def test(idx, test_dict):
    test_dict[idx] = idx
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)
너무 간단합니다.

간단한 소스 코드 분석

이제 또 다른 예를 살펴보겠습니다

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict):
    test_dict['test'][idx] = idx
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

출력 결과가 이상하다는 것을 알 수 있습니다{'test': {}}
간단히 코드를 수정해 보면

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict):
    row = test_dict['test']
    row[idx] = idx
    test_dict['test'] = row
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)
출력 결과는 기대한 것과 일치할 것입니다.

{'test': {}}
如果我们简单修改一下代码

def Manager():
    '''
    Returns a manager associated with a running server process

    The managers methods such as `Lock()`, `Condition()` and `Queue()`
    can be used to create shared objects.
    '''
    from multiprocessing.managers import SyncManager
    m = SyncManager()
    m.start()
    return m
    
...
    def start(self, initializer=None, initargs=()):
        '''
        Spawn a server process for this manager object
        '''
        assert self._state.value == State.INITIAL

        if initializer is not None and not hasattr(initializer, '__call__'):
            raise TypeError('initializer must be a callable')

        # pipe over which we will retrieve address of server
        reader, writer = connection.Pipe(duplex=False)

        # spawn process which runs a server
        self._process = Process(
            target=type(self)._run_server,
            args=(self._registry, self._address, self._authkey,
                  self._serializer, writer, initializer, initargs),
            )
        ident = ':'.join(str(i) for i in self._process._identity)
        self._process.name = type(self).__name__  + '-' + ident
        self._process.start()
...

这时输出结果就符合预期了.

为了了解这个现象背后的原因, 我简单去读了一下源码, 主要有以下几段代码很关键.

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
lock = manager.Lock()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict, lock):
    lock.acquire()
    row = test_dict['test']
    row[idx] = idx
    test_dict['test'] = row
    lock.release()
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
    pool.apply_async(test, args=(i, temp_dict, lock))
pool.close()
pool.join()
print(temp_dict)

上面代码可以看出, 当我们声明了一个Manager对象的时候, 程序实际在其他进程启动了一个server服务, 这个server是阻塞的, 以此来实现进程间数据安全.
我的理解就是不同进程之间操作都是互斥的, 一个进程向server请求到这部分数据, 再把这部分数据修改, 返回给server, 之后server再去处理其他进程的请求.

回到上面的奇怪现象上, 这个操作test_dict['test'][idx] = idx이 현상의 원인을 이해하기 위해서는 주로 다음 핵심 코드 부분이 중요합니다.

rrreee

위 코드에서는 Manager 개체를 선언하면 프로그램이 실제로 다른 프로세스에서 서버 서비스를 시작한다는 것을 알 수 있습니다. 이 서버는 프로세스 간 데이터 보안을 달성하기 위해 차단됩니다.
제가 이해하는 바는 서로 다른 것입니다. 프로세스 모든 작업은 상호 배타적입니다. 프로세스는 서버에서 이 데이터 부분을 요청한 다음 이 데이터 부분을 수정하여 서버로 반환합니다.

위의 이상한 현상으로 돌아갑니다. , 이 test_dict['test'][idx] = idx 작업은 실제로 데이터를 서버로 가져온 후 수정되지만 서버로 반환되지 않으므로 temp_dict의 데이터는 변경되지 않았습니다. 두 번째 일반 코드는 먼저 서버에 데이터를 요청한 다음 수정된 데이터를 서버에 보내는 것과 같습니다.

프로세스 간 데이터 보안

상황이 발생하는 경우 이번에는 두 개의 프로세스가 동시에 동일한 데이터를 요청하고 별도로 수정한 다음 서버에 제출하면 어떻게 될까요? 물론 데이터에서는 예외입니다. 이를 기반으로 또 다른 개체가 필요합니다. Manager, Lock() 이 개체는 이해하기 어렵습니다. Manager 자체는 서버이고 dict와 lock은 모두 이 서버에서 나오므로 잠그면 다른 프로세스에서 데이터를 얻을 수 없으며 당연히 위의 내용도 마찬가지입니다. 코드 예:

rrreee

프로세스에서 새 잠금 개체를 만들지 말고 통합 잠금 개체를 사용하세요.

이 문서는 여기까지입니다. 더 많은 흥미로운 콘텐츠를 확인하세요. PHP 중국어 웹사이트 칼럼에서

python 비디오 튜토리얼🎜을 시청해 보세요! 🎜🎜🎜

위 내용은 Python의 프로세스 간 데이터 통신 모듈 multiprocessing.Manager 소개의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 segmentfault.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제