Heim  >  Artikel  >  Backend-Entwicklung  >  Wie implementiert man Multiprocessing, um die Kommunikation zwischen Prozessen in Python zu implementieren?

Wie implementiert man Multiprocessing, um die Kommunikation zwischen Prozessen in Python zu implementieren?

王林
王林nach vorne
2023-05-08 21:31:062126Durchsuche

    1. Warum müssen wir die Kommunikation zwischen Prozessen beherrschen? Die Multithread-Code-Effizienz von Python wird durch GIL eingeschränkt und kann durch Multi-Core-CPUs nicht beschleunigt werden Nutzen Sie die Vorteile der Multi-CPU-Beschleunigung. Verbessert die Leistung des Programms erheblich

    , aber die Kommunikation zwischen Prozessen ist ein Problem, das berücksichtigt werden muss. Ein Prozess unterscheidet sich von einem Thread. Ein Prozess verfügt über einen eigenen unabhängigen Speicherbereich und kann keine globalen Variablen zum Übertragen von Daten zwischen Prozessen verwenden.

    Wie implementiert man Multiprocessing, um die Kommunikation zwischen Prozessen in Python zu implementieren?Bei tatsächlichen Projektanforderungen müssen häufig große Datenmengen wie Bilder, große Objekte usw. zwischen Prozessen übertragen werden Durch Dateiserialisierung oder Netzwerkschnittstelle ist es schwierig, die Echtzeitanforderungen zu erfüllen, und es werden die Nachrichtenwarteschlangenpakete von Drittanbietern wie Redis, Kaffka und RabbitMQ verwendet, was das System erneut verkompliziert.

    Das Python-Multiprocessing-Modul selbst bietet verschiedene sehr effiziente Kommunikationsmethoden zwischen Prozessen wie Nachrichtenmechanismus, Synchronisierungsmechanismus und gemeinsam genutzten Speicher.

    Das Verstehen und Beherrschen der Verwendung verschiedener Methoden der Python-Interprozesskommunikation sowie der Sicherheitsmechanismen kann dazu beitragen, die Leistung der Programmausführung erheblich zu verbessern. 2. Einführung in verschiedene Kommunikationsmethoden zwischen Prozessen

    Die wichtigsten Methoden der Kommunikation zwischen Prozessen werden wie folgt zusammengefasst

    Über die Speichersicherheit der Kommunikation zwischen ProzessenWie implementiert man Multiprocessing, um die Kommunikation zwischen Prozessen in Python zu implementieren?

    Speichersicherheit bedeutet, dass mehrere Prozesse konkurrieren können Aus dem gleichen Grund treten Ausnahmen bei gemeinsam genutzten Variablen aufgrund versehentlicher Zerstörung und aus anderen Gründen auf.

    Die vom Multiprocessing-Modul bereitgestellten Warteschlangen-, Pipe-, Sperr- und Ereignisobjekte verfügen alle über implementierte Sicherheitsmechanismen für die Kommunikation zwischen Prozessen. Wenn Sie Shared Memory zur Kommunikation verwenden, müssen Sie diese Shared-Memory-Variablen selbst im Code verfolgen und zerstören, da sie sonst möglicherweise durcheinander geraten oder nicht normal zerstört werden. Verursacht eine Systemanomalie. Sofern sich der Entwickler nicht über die Nutzungsmerkmale des gemeinsam genutzten Speichers im Klaren ist, wird nicht empfohlen, diesen gemeinsam genutzten Speicher direkt zu verwenden, sondern den gemeinsam genutzten Speicher über den Manager-Manager zu verwenden.

    Memory Manager Manager

    Multiprocessing stellt die Speichermanager-Manager-Klasse bereit, mit der die Speichersicherheitsprobleme der Prozesskommunikation einheitlich gelöst werden können. Dem Manager können verschiedene gemeinsam genutzte Daten hinzugefügt werden, darunter Liste, Diktat, Warteschlange, Sperre, Ereignis und gemeinsam genutzte Daten Speicher usw. werden einheitlich verfolgt und zerstört.

    3. Nachrichtenmechanismus-Kommunikation
    1) Die Pipe-Kommunikationsmethode

    ähnelt dem einfachen Socket-Kanal in 1, beide Enden können Nachrichten senden und empfangen.

    Pipe-Objektkonstruktionsmethode:

    parent_conn, child_conn = Pipe(duplex=True/False)

    Parameterbeschreibung


    duplex=True, die Pipeline ist bidirektionale Kommunikation

    • duplex=Falsch, die Pipeline ist unidirektionale Kommunikation, nur child_conn kann Nachrichten senden, parent_conn kann nur erhalten. 🔜 implementiert den Sperrsicherheitsmechanismus.

    • Das Warteschlangenmodul bietet insgesamt 3 Arten von Warteschlangen.

    (1) FIFO-Warteschlange, First In First Out,

    from multiprocessing import Process, Pipe
       def myfunction(conn):
          conn.send(['hi!! I am Python'])
          conn.close()
    
    if __name__ == '__main__':
          parent_conn, child_conn = Pipe()
          p = Process(target=myfunction, args=(child_conn,))
          p.start()
      	print (parent_conn.recv() )
    	p.join()

    (2) LIFO-Warteschlange, Last In First Out, eigentlich ein Stapel

    class queue.Queue(maxsize=0)

    (3) Bei der Prioritätswarteschlange wird der Eintragswert mit der niedrigsten Priorität zuerst in die Warteschlange gestellt

    class queue.LifoQueue(maxsize=0)
    Wie implementiert man Multiprocessing, um die Kommunikation zwischen Prozessen in Python zu implementieren?

    Hauptmethode der Multiprocessing.Queue-Klasse:

    method

    Description

    queue.qsize()gibt die Warteschlangenlänge zurückqueue Wenn die Warteschlange leer ist, geben Sie True zurück, andernfalls geben Sie False zurück
    queue.full()
    queue.put(item) Schreiben Sie Daten in die Warteschlange
    queue .get() Daten aus der Warteschlange werfen,
    queue.put_nowait(item), queue.get_nowait() Keine Wartezeit zum Schreiben oder Werfen

    说明:

    • put(), get() 是阻塞方法, 而put_notwait(), get_nowait()是非阻塞方法。

    • Multiprocessing 的Queue类没有提供Task_done, join方法

    Queue模块的其它队列类:
    (1) SimpleQueue
    简洁版的FIFO队列, 适事简单场景使用

    (2) JoinableQueue子类
    Python 3.5 后新增的 Queue的子类,拥有 task_done(), join() 方法

    • task_done()表示,最近读出的1个任务已经完成。

    • join()阻塞队列,直到queue中的所有任务都已完成。

    producer – consumer 场景,使用Queue的示例

    import multiprocessing
    
    def producer(numbers, q):
        for x in numbers:
            if x % 2 == 0:
                if q.full():
                    print("queue is full")
                    break
                q.put(x)
                print(f"put {x} in queue by producer")
        return None
    
    def consumer(q):
        while not q.empty():
            print(f"take data {q.get()} from queue by consumer")
        return None
    
    if __name__ == "__main__":
        # 设置1个queue对象,最大长度为5
        qu = multiprocessing.Queue(maxsize=5,) 
    
        # 创建producer子进程,把queue做为其中1个参数传给它,该进程负责写
        p5 = multiprocessing.Process(
            name="producer-1",
            target=producer,
            args=([random.randint(1, 100) for i in range(0, 10)], qu)
        )
        p5.start()
        p5.join()
        #创建consumer子进程,把queue做为1个参数传给它,该进程中队列中读
        p6 = multiprocessing.Process(
            name="consumer-1",
            target=consumer,
            args=(qu,)
        )
        p6.start()
        p6.join()
    
        print(qu.qsize())

    4、同步机制通信

    (1) 进程间同步锁 – Lock

    Multiprocessing也提供了与threading 类似的同步锁机制,确保某个时刻只有1个子进程可以访问某个资源或执行某项任务, 以避免同抢。

    例如:多个子进程同时访问数据库表时,如果没有同步锁,用户A修改1条数据后,还未提交,此时,用户B也进行了修改,可以预见,用户A提交的将是B个修改的数据。

    添加了同步锁,可以确保同时只有1个子进程能够进行写入数据库与提交操作。

    如下面的示例,同时只有1个进程可以执行打印操作。

    from multiprocessing import Process, Lock
    
    def f(l, i):
        l.acquire()
        try:
            print('hello world', i)
        finally:
            l.release()
    
    if __name__ == '__main__':
        lock = Lock()
    
        for num in range(10):
            Process(target=f, args=(lock, num)).start()

    (2) 子进程间协调机制 – Event

    Event 机制的工作原理:

    1个event 对象实例管理着1个 flag标记, 可以用set()方法将其置为true, 用clear()方法将其置为false, 使用wait()将阻塞当前子进程,直至flag被置为true.
    这样由1个进程通过event flag 就可以控制、协调各子进程运行。

    Event object的使用方法:
    1)主函数: 创建1个event 对象, flag = multiprocessing.Event() , 做为参数传给各子进程
    2) 子进程A: 不受event影响,通过event 控制其它进程的运行
    o 先clear(),将event 置为False, 占用运行权.
    o 完成工作后,用set()把flag置为True。
    3) 子进程B, C: 受event 影响
    o 设置 wait() 状态,暂停运行
    o 直到flag重新变为True,恢复运行

    主要方法:

    • set(), clear()设置 True/False,

    • wait() 使进程等待,直到flag被改为true.

    • is_set(), Return True if and only if the internal flag is true.

    验证进程间通信 – Event

    import multiprocessing
    import time
    import random
    
    def joo_a(q, ev):
        print("subprocess joo_a start")
        if not ev.is_set():
            ev.wait()
        q.put(random.randint(1, 100))
        print("subprocess joo_a ended")
    
    def joo_b(q, ev):
        print("subprocess joo_b start")
        ev.clear()
        time.sleep(2)
        q.put(random.randint(200, 300))
        ev.set()
        print("subprocess joo_b ended")
    
    def main_event():
        qu = multiprocessing.Queue()
        ev = multiprocessing.Event()
        sub_a = multiprocessing.Process(target=joo_a, args=(qu, ev))
        sub_b = multiprocessing.Process(target=joo_b, args=(qu, ev,))
        sub_a.start()
        sub_b.start()
        # ev.set()
        sub_a.join()
        sub_b.join()
        while not qu.empty():
            print(qu.get())
    
    if __name__ == "__main__":
        main_event()

    5、共享内存方式通信

    (1) 共享变量

    子进程之间共存内存变量,要用 multiprocessing.Value(), Array() 来定义变量。 实际上是ctypes 类型,由multiprocessing.sharedctypes模块提供相关功能

    注意 使用 share memory 要考虑同抢等问题,释放等问题,需要手工实现。因此在使用共享变量时,建议使用Manager管程来管理这些共享变量。

    def  func(num):
        num.value=10.78   #子进程改变数值的值,主进程跟着改变
     
    if  __name__=="__main__":
    num = multiprocessing.Value("d", 10.0) 
    # d表示数值,主进程与子进程可共享这个变量。
    
        p=multiprocessing.Process(target=func,args=(num,))
        p.start()
        p.join()
     
        print(num.value)

    进程之间共享数据(数组型):

    import multiprocessing
     
    def  func(num):
        num[2]=9999   #子进程改变数组,主进程跟着改变
     
    if  __name__=="__main__":
        num=multiprocessing.Array("i",[1,2,3,4,5])   
    
        p=multiprocessing.Process(target=func,args=(num,))
        p.start() 
        p.join()
     
        print(num[:])

    (2) 共享内存 Shared_memory

    如果进程间需要共享对象数据,或共享内容,数据较大,multiprocessing 提供了SharedMemory类来实现进程间实时通信,不需要通过发消息,读写磁盘文件来实现,速度更快。
    注意:直接使用SharedMemory 存在着同抢、泄露隐患,应通过SharedMemory Manager 管程类来使用, 以确保内存安全。

    创建共享内存区:

    multiprocessing.shared_memory.SharedMemory(name=none, create=False, size=0)

    方法:
    父进程创建shared_memory 后,子进程可以使用它,当不再需要后,使用close(), 删除使用unlink()方法
    相关属性:
    获取内存区内容: shm.buf
    获取内存区名称: shm.name
    获取内存区字节数: shm.size

    示例:

    >>> from multiprocessing import shared_memory
    >>> shm_a = shared_memory.SharedMemory(create=True, size=10)
    >>> type(shm_a.buf)
    <class &#39;memoryview&#39;>
    >>> buffer = shm_a.buf
    >>> len(buffer)
    10
    >>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
    >>> buffer[4] = 100                           # Modify single byte at a time
    >>> # Attach to an existing shared memory block
    >>> shm_b = shared_memory.SharedMemory(shm_a.name)
    >>> import array
    >>> array.array(&#39;b&#39;, shm_b.buf[:5])  # Copy the data into a new array.array
    array(&#39;b&#39;, [22, 33, 44, 55, 100])
    >>> shm_b.buf[:5] = b&#39;howdy&#39;  # Modify via shm_b using bytes
    >>> bytes(shm_a.buf[:5])      # Access via shm_a
    b&#39;howdy&#39;
    >>> shm_b.close()   # Close each SharedMemory instance
    >>> shm_a.close()
    >>> shm_a.unlink()  # Call unlink only once to release the shared memory

    3) ShareableList 共享列表

    sharedMemory类还提供了1个共享列表类型,这样就更方便了,进程间可以直接共享python强大的列表
    构建方法:
    multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)

    from multiprocessing import shared_memory
    >>> a = shared_memory.ShareableList([&#39;howdy&#39;, b&#39;HoWdY&#39;, -273.154, 100, None, True, 42])
    >>> [ type(entry) for entry in a ]
    [<class &#39;str&#39;>, <class &#39;bytes&#39;>, <class &#39;float&#39;>, <class &#39;int&#39;>, <class &#39;NoneType&#39;>, <class &#39;bool&#39;>, <class &#39;int&#39;>]
    >>> a[2]
    -273.154
    >>> a[2] = -78.5
    >>> a[2]
    -78.5
    >>> a[2] = &#39;dry ice&#39;  # Changing data types is supported as well
    >>> a[2]
    &#39;dry ice&#39;
    >>> a[2] = &#39;larger than previously allocated storage space&#39;
    Traceback (most recent call last):
      ...
    ValueError: exceeds available storage for existing str
    >>> a[2]
    &#39;dry ice&#39;
    >>> len(a)
    7
    >>> a.index(42)
    6
    >>> a.count(b&#39;howdy&#39;)
    0
    >>> a.count(b&#39;HoWdY&#39;)
    1
    >>> a.shm.close()
    >>> a.shm.unlink()
    >>> del a  # Use of a ShareableList after call to unlink() is unsupported
    
    
    b = shared_memory.ShareableList(range(5))         # In a first process
    >>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
    >>> c
    ShareableList([0, 1, 2, 3, 4], name=&#39;...&#39;)
    >>> c[-1] = -999
    >>> b[-1]
    -999
    >>> b.shm.close()
    >>> c.shm.close()
    >>> c.shm.unlink()

    6、共享内存管理器Manager

    Multiprocessing 提供了 Manager 内存管理器类,当调用1个Manager实例对象的start()方法时,会创建1个manager进程,其唯一目的就是管理共享内存, 避免出现进程间共享数据不同步,内存泄露等现象。

    其原理如下:

    Wie implementiert man Multiprocessing, um die Kommunikation zwischen Prozessen in Python zu implementieren?

    Manager管理器相当于提供了1个共享内存的服务,不仅可以被主进程创建的多个子进程使用,还可以被其它进程访问,甚至跨网络访问。本文仅聚焦于由单一主进程创建的各进程之间的通信。

    1) Manager的主要数据结构

    相关类:multiprocessing.Manager
    子类有:

    • multiprocessing.managers.SharedMemoryManager

    • multiprocessing.managers.BaseManager

    支持共享变量类型:

    • python基本类型 int, str, list, tuple, list

    • 进程通信对象: Queue, Lock, Event,

    • Condition, Semaphore, Barrier ctypes类型: Value, Array

    2) 使用步骤

    1)创建管理器对象

    snm = Manager()
    snm = SharedMemoryManager()

    2)创建共享内存变量
    新建list, dict

    sl = snm.list(), snm.dict()

    新建1块bytes共享内存变量,需要指定大小

    sx = snm.SharedMemory(size)

    新建1个共享列表变量,可用列表来初始化

    sl = snm.ShareableList(sequence) 如
    sl = smm.ShareableList([‘howdy&#39;, b&#39;HoWdY&#39;, -273.154, 100, True])

    新建1个queue, 使用multiprocessing 的Queue类型

    snm = Manager()
    q = snm.Queue()

    示例 :

    from multiprocessing import Process, Manager
    
    def f(d, l):
        d[1] = &#39;1&#39;
        d[&#39;2&#39;] = 2
        d[0.25] = None
        l.reverse()
    
    if __name__ == &#39;__main__&#39;:
        with Manager() as manager:
            d = manager.dict()
            l = manager.list(range(10))
    
            p = Process(target=f, args=(d, l))
            p.start()
            p.join()
    
            print(d)
            print(l)

    将打印

    {0.25: None, 1: '1', '2': 2}
    [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

    3) 销毁共享内存变量

    方法一:
    调用snm.shutdown()方法,会自动调用每个内存块的unlink()方法释放内存。或者 snm.close()
    方法二
    使用with语句,结束后会自动释放所有manager变量

    >>> with SharedMemoryManager() as smm:
    ...     sl = smm.ShareableList(range(2000))
    ...     # Divide the work among two processes, storing partial results in sl
    ...     p1 = Process(target=do_work, args=(sl, 0, 1000))
    ...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
    ...     p1.start()
    ...     p2.start()  # A multiprocessing.Pool might be more efficient
    ...     p1.join()
    ...     p2.join()   # Wait for all work to complete in both processes
    ...     total_result = sum(sl)  # Consolidate the partial results now in sl

    4) 向管理器注册自定义类型

    managers的子类BaseManager提供register()方法,支持注册自定义数据类型。如下例,注册1个自定义MathsClass类,并生成实例。

    from multiprocessing.managers import BaseManager
    
    class MathsClass:
        def add(self, x, y):
            return x + y
        def mul(self, x, y):
            return x * y
    
    class MyManager(BaseManager):
        pass
    
    MyManager.register(&#39;Maths&#39;, MathsClass)
    
    if __name__ == &#39;__main__&#39;:
        with MyManager() as manager:
            maths = manager.Maths()
            print(maths.add(4, 3))         # prints 7
            print(maths.mul(7, 8))

    Das obige ist der detaillierte Inhalt vonWie implementiert man Multiprocessing, um die Kommunikation zwischen Prozessen in Python zu implementieren?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Stellungnahme:
    Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen