ホームページ  >  記事  >  バックエンド開発  >  マルチプロセッシングを使用して Python でプロセス間通信を実装するにはどうすればよいですか?

マルチプロセッシングを使用して Python でプロセス間通信を実装するにはどうすればよいですか?

王林
王林転載
2023-05-08 21:31:062120ブラウズ

    1. プロセス間通信をマスターする必要があるのはなぜですか?

    Python のマルチスレッド コードの効率は GIL によって制限されており、マルチスレッドによって高速化することはできません。 -core CPU、マルチプロセス この方法では、GIL をバイパスし、マルチ CPU アクセラレーションを活用し、プログラムのパフォーマンスを大幅に向上させることができますが、プロセス間通信については考慮する必要があります。プロセスはスレッドとは異なり、独自の独立したメモリ空間を持ち、グローバル変数を使用してプロセス間でデータを転送することはできません。

    マルチプロセッシングを使用して Python でプロセス間通信を実装するにはどうすればよいですか?実際のプロジェクト要件では、集中的な計算やリアルタイム タスクが頻繁に発生し、場合によっては、プロセス間で大量のデータ (画像、大容量データなど) を転送する必要があります。オブジェクトなど

    、データがファイルシリアル化またはネットワークインターフェイスを介して転送される場合、リアルタイム要件を満たすことが困難です.redisまたはkaffkaのサードパーティメッセージキューパッケージを使用すると、rabbitMQはシステムを複雑化します。

    Python マルチプロセッシング モジュール自体は、メッセージ メカニズム、同期メカニズム、共有メモリなど、さまざまな非常に効率的なプロセス間通信方法を提供します。

    Python のプロセス間通信のさまざまな方法とセキュリティ メカニズムの使用を理解し、習得することは、プログラムの実行パフォーマンスを大幅に向上させるのに役立ちます。

    2. プロセス間のさまざまな通信方法の紹介

    プロセス間通信の主な方法をまとめると次のようになります

    マルチプロセッシングを使用して Python でプロセス間通信を実装するにはどうすればよいですか?プロセス間通信のメモリ安全性について

    メモリ安全性とは、複数のプロセス間で、同時取得や偶発的な破壊などにより共有変数例外が発生する可能性があることを意味します。 Multiprocessing モジュールによって提供される Queue、Pipe、Lock、および Event オブジェクトには、すべてプロセス間通信セキュリティ メカニズムが実装されています。 共有メモリ通信を使用する場合、コード内でこれらの共有メモリ変数を自分で追跡して破棄する必要があります。そうでないと、変数がスクランブルされたり、正常に破棄されなかったりする可能性があります。システム異常の原因となります。開発者が共有メモリの使用特性をよく理解していない限り、この共有メモリを直接使用するのではなく、Manager マネージャを介して共有メモリを使用することをお勧めします。


    Memory Manager Manager

    Multiprocessing は、プロセス通信のメモリ セキュリティ問題を統一的に解決できるメモリ マネージャ Manager クラスを提供します。マネージャには、リストを含むさまざまな共有データを追加できます。 、dict、Queue、Lock、Event、Shared Memory などが均一に追跡され、破棄されます。 3. メッセージメカニズム通信

    1) パイプパイプ通信メソッド

    は、1の単純なソケットチャネルに似ており、両端でメッセージを送受信できます。

    パイプ オブジェクト構築メソッド:

    parent_conn, child_conn = Pipe(duplex=True/False)

    パラメータの説明

    duplex=True、パイプラインは双方向通信です
    • duplex=False、パイプラインは一方向通信であり、child_conn のみがメッセージを送信でき、parent_conn はメッセージを受信のみできます。
    • サンプルコード:
    • from multiprocessing import Process, Pipe
         def myfunction(conn):
            conn.send(['hi!! I am Python'])
            conn.close()
      
      if __name__ == '__main__':
            parent_conn, child_conn = Pipe()
            p = Process(target=myfunction, args=(child_conn,))
            p.start()
        	print (parent_conn.recv() )
      	p.join()
    2) Message Queue Queueの通信方法

    MultiprocessingのQueueクラスがPython queue 3.0版で修正されました。プロデューサとメッセージ送信者の間でデータを転送するために簡単に実装でき、マルチプロセッシングのキュー モジュールはロック セキュリティ メカニズムを実装します。

    #Queue モジュールは合計 3 種類のキューを提供します。 マルチプロセッシングを使用して Python でプロセス間通信を実装するにはどうすればよいですか?

    (1) FIFO キュー、先入れ先出し、

    class queue.Queue(maxsize=0)
    (2) LIFO キュー、後入れ先出し、実際にはスタック

    class queue.LifoQueue(maxsize=0)

    (3) ) 優先キューでは、優先順位の最も低いエントリ値が最初にリストされます。

    class queue.PriorityQueue(maxsize=0)

    Multiprocessing.Queue クラスのメイン メソッド:

    メソッド説明queue.qsize()戻りキューの長さキューがいっぱいの場合は True を返し、それ以外の場合は False を返しますキューが空の場合は True を返し、それ以外の場合は False を返します#queue.put(item)データをキューに書き込みますqueue.get() キューからデータをスローします、queue.put_nowait(item)、queue.get_nowait()いいえ書かれるか投げられるのを待っています

    说明:

    • put(), get() 是阻塞方法, 而put_notwait(), get_nowait()是非阻塞方法。

    • Multiprocessing 的Queue类没有提供Task_done, join方法

    Queue模块的其它队列类:
    (1) SimpleQueue
    简洁版的FIFO队列, 适事简单场景使用

    (2) JoinableQueue子类
    Python 3.5 后新增的 Queue的子类,拥有 task_done(), join() 方法

    • task_done()表示,最近读出的1个任务已经完成。

    • join()阻塞队列,直到queue中的所有任务都已完成。

    producer – consumer 场景,使用Queue的示例

    import multiprocessing
    
    def producer(numbers, q):
        for x in numbers:
            if x % 2 == 0:
                if q.full():
                    print("queue is full")
                    break
                q.put(x)
                print(f"put {x} in queue by producer")
        return None
    
    def consumer(q):
        while not q.empty():
            print(f"take data {q.get()} from queue by consumer")
        return None
    
    if __name__ == "__main__":
        # 设置1个queue对象,最大长度为5
        qu = multiprocessing.Queue(maxsize=5,) 
    
        # 创建producer子进程,把queue做为其中1个参数传给它,该进程负责写
        p5 = multiprocessing.Process(
            name="producer-1",
            target=producer,
            args=([random.randint(1, 100) for i in range(0, 10)], qu)
        )
        p5.start()
        p5.join()
        #创建consumer子进程,把queue做为1个参数传给它,该进程中队列中读
        p6 = multiprocessing.Process(
            name="consumer-1",
            target=consumer,
            args=(qu,)
        )
        p6.start()
        p6.join()
    
        print(qu.qsize())

    4、同步机制通信

    (1) 进程间同步锁 – Lock

    Multiprocessing也提供了与threading 类似的同步锁机制,确保某个时刻只有1个子进程可以访问某个资源或执行某项任务, 以避免同抢。

    例如:多个子进程同时访问数据库表时,如果没有同步锁,用户A修改1条数据后,还未提交,此时,用户B也进行了修改,可以预见,用户A提交的将是B个修改的数据。

    添加了同步锁,可以确保同时只有1个子进程能够进行写入数据库与提交操作。

    如下面的示例,同时只有1个进程可以执行打印操作。

    from multiprocessing import Process, Lock
    
    def f(l, i):
        l.acquire()
        try:
            print('hello world', i)
        finally:
            l.release()
    
    if __name__ == '__main__':
        lock = Lock()
    
        for num in range(10):
            Process(target=f, args=(lock, num)).start()

    (2) 子进程间协调机制 – Event

    Event 机制的工作原理:

    1个event 对象实例管理着1个 flag标记, 可以用set()方法将其置为true, 用clear()方法将其置为false, 使用wait()将阻塞当前子进程,直至flag被置为true.
    这样由1个进程通过event flag 就可以控制、协调各子进程运行。

    Event object的使用方法:
    1)主函数: 创建1个event 对象, flag = multiprocessing.Event() , 做为参数传给各子进程
    2) 子进程A: 不受event影响,通过event 控制其它进程的运行
    o 先clear(),将event 置为False, 占用运行权.
    o 完成工作后,用set()把flag置为True。
    3) 子进程B, C: 受event 影响
    o 设置 wait() 状态,暂停运行
    o 直到flag重新变为True,恢复运行

    主要方法:

    • set(), clear()设置 True/False,

    • wait() 使进程等待,直到flag被改为true.

    • is_set(), Return True if and only if the internal flag is true.

    验证进程间通信 – Event

    import multiprocessing
    import time
    import random
    
    def joo_a(q, ev):
        print("subprocess joo_a start")
        if not ev.is_set():
            ev.wait()
        q.put(random.randint(1, 100))
        print("subprocess joo_a ended")
    
    def joo_b(q, ev):
        print("subprocess joo_b start")
        ev.clear()
        time.sleep(2)
        q.put(random.randint(200, 300))
        ev.set()
        print("subprocess joo_b ended")
    
    def main_event():
        qu = multiprocessing.Queue()
        ev = multiprocessing.Event()
        sub_a = multiprocessing.Process(target=joo_a, args=(qu, ev))
        sub_b = multiprocessing.Process(target=joo_b, args=(qu, ev,))
        sub_a.start()
        sub_b.start()
        # ev.set()
        sub_a.join()
        sub_b.join()
        while not qu.empty():
            print(qu.get())
    
    if __name__ == "__main__":
        main_event()

    5、共享内存方式通信

    (1) 共享变量

    子进程之间共存内存变量,要用 multiprocessing.Value(), Array() 来定义变量。 实际上是ctypes 类型,由multiprocessing.sharedctypes模块提供相关功能

    注意 使用 share memory 要考虑同抢等问题,释放等问题,需要手工实现。因此在使用共享变量时,建议使用Manager管程来管理这些共享变量。

    def  func(num):
        num.value=10.78   #子进程改变数值的值,主进程跟着改变
     
    if  __name__=="__main__":
    num = multiprocessing.Value("d", 10.0) 
    # d表示数值,主进程与子进程可共享这个变量。
    
        p=multiprocessing.Process(target=func,args=(num,))
        p.start()
        p.join()
     
        print(num.value)

    进程之间共享数据(数组型):

    import multiprocessing
     
    def  func(num):
        num[2]=9999   #子进程改变数组,主进程跟着改变
     
    if  __name__=="__main__":
        num=multiprocessing.Array("i",[1,2,3,4,5])   
    
        p=multiprocessing.Process(target=func,args=(num,))
        p.start() 
        p.join()
     
        print(num[:])

    (2) 共享内存 Shared_memory

    如果进程间需要共享对象数据,或共享内容,数据较大,multiprocessing 提供了SharedMemory类来实现进程间实时通信,不需要通过发消息,读写磁盘文件来实现,速度更快。
    注意:直接使用SharedMemory 存在着同抢、泄露隐患,应通过SharedMemory Manager 管程类来使用, 以确保内存安全。

    创建共享内存区:

    multiprocessing.shared_memory.SharedMemory(name=none, create=False, size=0)

    方法:
    父进程创建shared_memory 后,子进程可以使用它,当不再需要后,使用close(), 删除使用unlink()方法
    相关属性:
    获取内存区内容: shm.buf
    获取内存区名称: shm.name
    获取内存区字节数: shm.size

    示例:

    >>> from multiprocessing import shared_memory
    >>> shm_a = shared_memory.SharedMemory(create=True, size=10)
    >>> type(shm_a.buf)
    <class &#39;memoryview&#39;>
    >>> buffer = shm_a.buf
    >>> len(buffer)
    10
    >>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
    >>> buffer[4] = 100                           # Modify single byte at a time
    >>> # Attach to an existing shared memory block
    >>> shm_b = shared_memory.SharedMemory(shm_a.name)
    >>> import array
    >>> array.array(&#39;b&#39;, shm_b.buf[:5])  # Copy the data into a new array.array
    array(&#39;b&#39;, [22, 33, 44, 55, 100])
    >>> shm_b.buf[:5] = b&#39;howdy&#39;  # Modify via shm_b using bytes
    >>> bytes(shm_a.buf[:5])      # Access via shm_a
    b&#39;howdy&#39;
    >>> shm_b.close()   # Close each SharedMemory instance
    >>> shm_a.close()
    >>> shm_a.unlink()  # Call unlink only once to release the shared memory

    3) ShareableList 共享列表

    sharedMemory类还提供了1个共享列表类型,这样就更方便了,进程间可以直接共享python强大的列表
    构建方法:
    multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)

    from multiprocessing import shared_memory
    >>> a = shared_memory.ShareableList([&#39;howdy&#39;, b&#39;HoWdY&#39;, -273.154, 100, None, True, 42])
    >>> [ type(entry) for entry in a ]
    [<class &#39;str&#39;>, <class &#39;bytes&#39;>, <class &#39;float&#39;>, <class &#39;int&#39;>, <class &#39;NoneType&#39;>, <class &#39;bool&#39;>, <class &#39;int&#39;>]
    >>> a[2]
    -273.154
    >>> a[2] = -78.5
    >>> a[2]
    -78.5
    >>> a[2] = &#39;dry ice&#39;  # Changing data types is supported as well
    >>> a[2]
    &#39;dry ice&#39;
    >>> a[2] = &#39;larger than previously allocated storage space&#39;
    Traceback (most recent call last):
      ...
    ValueError: exceeds available storage for existing str
    >>> a[2]
    &#39;dry ice&#39;
    >>> len(a)
    7
    >>> a.index(42)
    6
    >>> a.count(b&#39;howdy&#39;)
    0
    >>> a.count(b&#39;HoWdY&#39;)
    1
    >>> a.shm.close()
    >>> a.shm.unlink()
    >>> del a  # Use of a ShareableList after call to unlink() is unsupported
    
    
    b = shared_memory.ShareableList(range(5))         # In a first process
    >>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
    >>> c
    ShareableList([0, 1, 2, 3, 4], name=&#39;...&#39;)
    >>> c[-1] = -999
    >>> b[-1]
    -999
    >>> b.shm.close()
    >>> c.shm.close()
    >>> c.shm.unlink()

    6、共享内存管理器Manager

    Multiprocessing 提供了 Manager 内存管理器类,当调用1个Manager实例对象的start()方法时,会创建1个manager进程,其唯一目的就是管理共享内存, 避免出现进程间共享数据不同步,内存泄露等现象。

    其原理如下:

    マルチプロセッシングを使用して Python でプロセス間通信を実装するにはどうすればよいですか?

    Manager管理器相当于提供了1个共享内存的服务,不仅可以被主进程创建的多个子进程使用,还可以被其它进程访问,甚至跨网络访问。本文仅聚焦于由单一主进程创建的各进程之间的通信。

    1) Manager的主要数据结构

    相关类:multiprocessing.Manager
    子类有:

    • multiprocessing.managers.SharedMemoryManager

    • multiprocessing.managers.BaseManager

    支持共享变量类型:

    • python基本类型 int, str, list, tuple, list

    • 进程通信对象: Queue, Lock, Event,

    • Condition, Semaphore, Barrier ctypes类型: Value, Array

    2) 使用步骤

    1)创建管理器对象

    snm = Manager()
    snm = SharedMemoryManager()

    2)创建共享内存变量
    新建list, dict

    sl = snm.list(), snm.dict()

    新建1块bytes共享内存变量,需要指定大小

    sx = snm.SharedMemory(size)

    新建1个共享列表变量,可用列表来初始化

    sl = snm.ShareableList(sequence) 如
    sl = smm.ShareableList([‘howdy&#39;, b&#39;HoWdY&#39;, -273.154, 100, True])

    新建1个queue, 使用multiprocessing 的Queue类型

    snm = Manager()
    q = snm.Queue()

    示例 :

    from multiprocessing import Process, Manager
    
    def f(d, l):
        d[1] = &#39;1&#39;
        d[&#39;2&#39;] = 2
        d[0.25] = None
        l.reverse()
    
    if __name__ == &#39;__main__&#39;:
        with Manager() as manager:
            d = manager.dict()
            l = manager.list(range(10))
    
            p = Process(target=f, args=(d, l))
            p.start()
            p.join()
    
            print(d)
            print(l)

    将打印

    {0.25: None, 1: '1', '2': 2}
    [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

    3) 销毁共享内存变量

    方法一:
    调用snm.shutdown()方法,会自动调用每个内存块的unlink()方法释放内存。或者 snm.close()
    方法二
    使用with语句,结束后会自动释放所有manager变量

    >>> with SharedMemoryManager() as smm:
    ...     sl = smm.ShareableList(range(2000))
    ...     # Divide the work among two processes, storing partial results in sl
    ...     p1 = Process(target=do_work, args=(sl, 0, 1000))
    ...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
    ...     p1.start()
    ...     p2.start()  # A multiprocessing.Pool might be more efficient
    ...     p1.join()
    ...     p2.join()   # Wait for all work to complete in both processes
    ...     total_result = sum(sl)  # Consolidate the partial results now in sl

    4) 向管理器注册自定义类型

    managers的子类BaseManager提供register()方法,支持注册自定义数据类型。如下例,注册1个自定义MathsClass类,并生成实例。

    from multiprocessing.managers import BaseManager
    
    class MathsClass:
        def add(self, x, y):
            return x + y
        def mul(self, x, y):
            return x * y
    
    class MyManager(BaseManager):
        pass
    
    MyManager.register(&#39;Maths&#39;, MathsClass)
    
    if __name__ == &#39;__main__&#39;:
        with MyManager() as manager:
            maths = manager.Maths()
            print(maths.add(4, 3))         # prints 7
            print(maths.mul(7, 8))
    queue.full()
    queue.empty()

    以上がマルチプロセッシングを使用して Python でプロセス間通信を実装するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。