搜尋
首頁後端開發Python教學如何在 Python 中使用 multiprocessing 實現進程間通訊呢?

    1、為什麼要掌握進程間通訊

    python的多執行緒程式碼效率由於受制於GIL,不能利用多核心CPU來加速,而多進程方式可以繞過GIL, 發揮多CPU加速的優勢,能夠明顯提升程式的效能

    但進程間通訊卻是必須考慮的問題。進程不同於線程,進程有自己的獨立記憶體空間,不能使用全域變數在進程間傳遞資料。

    如何在 Python 中使用 multiprocessing 實現進程間通訊呢?

    實際專案需求中,常常存在密集運算、或即時性任務,進程之間有時需要傳遞大量數據,如圖片、大物件等,傳遞資料如果透過檔案序列化、或網路介面來進行,難以滿足即時性要求,採用redis,或者kaffka, rabbitMQ 之第3方訊息佇列包,又使系統複雜化了。

    Python multiprocessing 模組本身就提供了訊息機制、同步機制、共享記憶體等各種非常高效的進程間通訊方式

    了解並掌握 python 進程間通訊的各類方式的使用,以及安全機制,可以幫助大幅提升程式運作效能。

    2、進程間各類通訊方式簡介

    進程間通訊的主要方式總結如下

    如何在 Python 中使用 multiprocessing 實現進程間通訊呢?

    關於進程間通訊的記憶體安全性
    記憶體安全意味著,多進程間可能會因同搶,意外銷毀等原因造成共享變數異常。
    Multiprocessing 模組提供的Queue, Pipe, Lock, Event 對象,都實現了進程間通訊安全機制。
    採用共享記憶體方式通信,需要在程式碼中自已來追蹤、銷毀這些共享記憶體變量,否則可能會出同搶、未正常銷毀等。造成系統異常。除非開發者很清楚共享記憶體使用特點,否則不建議直接使用此共享內存,而是透過Manager管理器來使用共享內存。

    記憶體管理器Manager
    Multiprocessing提供了記憶體管理器Manager類,可統一解決進程通訊的記憶體安全性問題,可以將各種共享資料加入管理器,包括list , dict, Queue, Lock, Event, Shared Memory 等,由其統一追蹤與銷毀。

    3、訊息機制通訊

    1) 管道 Pipe 通訊方式

    #類似1上簡單的socket頻道,雙端皆可收發訊息。
    Pipe 物件的建構方法:

    parent_conn, child_conn = Pipe(duplex=True/False)

    參數說明

    • #duplex=True, 管道為雙向通訊

    • # duplex=False, 管道為單向通信,只有child_conn可以發訊息,parent_conn只能接收。

    範例程式碼:

    from multiprocessing import Process, Pipe
       def myfunction(conn):
          conn.send(['hi!! I am Python'])
          conn.close()
    
    if __name__ == '__main__':
          parent_conn, child_conn = Pipe()
          p = Process(target=myfunction, args=(child_conn,))
          p.start()
      	print (parent_conn.recv() )
    	p.join()

    2) 訊息佇列Queue 通訊方式

    Multiprocessing 的Queue 類,是在python queue 3.0版本上修改的,可以很容易實現生產者– 訊息者間傳遞數據,而且Multiprocessing的Queue 模組實現了lock安全機制。

    如何在 Python 中使用 multiprocessing 實現進程間通訊呢?

    Queue模組共提供了3種類型的佇列。

    (1) FIFO queue , 先進先出,

    class queue.Queue(maxsize=0)

    (2) LIFO queue, 後進先出, 其實就是堆疊

    class queue.LifoQueue(maxsize=0)

    (3)帶優先權佇列, 優先權最低entry value lowest 先了列

    class queue.PriorityQueue(maxsize=0)

    Multiprocessing.Queue類別的主要方法:

    ##queue.get() 將資料拋出佇列,queue.put_nowait(item), queue.get_nowait()無等待寫入或拋出

    说明:

    • put(), get() 是阻塞方法, 而put_notwait(), get_nowait()是非阻塞方法。

    • Multiprocessing 的Queue类没有提供Task_done, join方法

    Queue模块的其它队列类:
    (1) SimpleQueue
    简洁版的FIFO队列, 适事简单场景使用

    (2) JoinableQueue子类
    Python 3.5 后新增的 Queue的子类,拥有 task_done(), join() 方法

    • task_done()表示,最近读出的1个任务已经完成。

    • join()阻塞队列,直到queue中的所有任务都已完成。

    producer – consumer 场景,使用Queue的示例

    import multiprocessing
    
    def producer(numbers, q):
        for x in numbers:
            if x % 2 == 0:
                if q.full():
                    print("queue is full")
                    break
                q.put(x)
                print(f"put {x} in queue by producer")
        return None
    
    def consumer(q):
        while not q.empty():
            print(f"take data {q.get()} from queue by consumer")
        return None
    
    if __name__ == "__main__":
        # 设置1个queue对象,最大长度为5
        qu = multiprocessing.Queue(maxsize=5,) 
    
        # 创建producer子进程,把queue做为其中1个参数传给它,该进程负责写
        p5 = multiprocessing.Process(
            name="producer-1",
            target=producer,
            args=([random.randint(1, 100) for i in range(0, 10)], qu)
        )
        p5.start()
        p5.join()
        #创建consumer子进程,把queue做为1个参数传给它,该进程中队列中读
        p6 = multiprocessing.Process(
            name="consumer-1",
            target=consumer,
            args=(qu,)
        )
        p6.start()
        p6.join()
    
        print(qu.qsize())

    4、同步机制通信

    (1) 进程间同步锁 – Lock

    Multiprocessing也提供了与threading 类似的同步锁机制,确保某个时刻只有1个子进程可以访问某个资源或执行某项任务, 以避免同抢。

    例如:多个子进程同时访问数据库表时,如果没有同步锁,用户A修改1条数据后,还未提交,此时,用户B也进行了修改,可以预见,用户A提交的将是B个修改的数据。

    添加了同步锁,可以确保同时只有1个子进程能够进行写入数据库与提交操作。

    如下面的示例,同时只有1个进程可以执行打印操作。

    from multiprocessing import Process, Lock
    
    def f(l, i):
        l.acquire()
        try:
            print('hello world', i)
        finally:
            l.release()
    
    if __name__ == '__main__':
        lock = Lock()
    
        for num in range(10):
            Process(target=f, args=(lock, num)).start()

    (2) 子进程间协调机制 – Event

    Event 机制的工作原理:

    1个event 对象实例管理着1个 flag标记, 可以用set()方法将其置为true, 用clear()方法将其置为false, 使用wait()将阻塞当前子进程,直至flag被置为true.
    这样由1个进程通过event flag 就可以控制、协调各子进程运行。

    Event object的使用方法:
    1)主函数: 创建1个event 对象, flag = multiprocessing.Event() , 做为参数传给各子进程
    2) 子进程A: 不受event影响,通过event 控制其它进程的运行
    o 先clear(),将event 置为False, 占用运行权.
    o 完成工作后,用set()把flag置为True。
    3) 子进程B, C: 受event 影响
    o 设置 wait() 状态,暂停运行
    o 直到flag重新变为True,恢复运行

    主要方法:

    • set(), clear()设置 True/False,

    • wait() 使进程等待,直到flag被改为true.

    • is_set(), Return True if and only if the internal flag is true.

    验证进程间通信 – Event

    import multiprocessing
    import time
    import random
    
    def joo_a(q, ev):
        print("subprocess joo_a start")
        if not ev.is_set():
            ev.wait()
        q.put(random.randint(1, 100))
        print("subprocess joo_a ended")
    
    def joo_b(q, ev):
        print("subprocess joo_b start")
        ev.clear()
        time.sleep(2)
        q.put(random.randint(200, 300))
        ev.set()
        print("subprocess joo_b ended")
    
    def main_event():
        qu = multiprocessing.Queue()
        ev = multiprocessing.Event()
        sub_a = multiprocessing.Process(target=joo_a, args=(qu, ev))
        sub_b = multiprocessing.Process(target=joo_b, args=(qu, ev,))
        sub_a.start()
        sub_b.start()
        # ev.set()
        sub_a.join()
        sub_b.join()
        while not qu.empty():
            print(qu.get())
    
    if __name__ == "__main__":
        main_event()

    5、共享内存方式通信

    (1) 共享变量

    子进程之间共存内存变量,要用 multiprocessing.Value(), Array() 来定义变量。 实际上是ctypes 类型,由multiprocessing.sharedctypes模块提供相关功能

    注意 使用 share memory 要考虑同抢等问题,释放等问题,需要手工实现。因此在使用共享变量时,建议使用Manager管程来管理这些共享变量。

    def  func(num):
        num.value=10.78   #子进程改变数值的值,主进程跟着改变
     
    if  __name__=="__main__":
    num = multiprocessing.Value("d", 10.0) 
    # d表示数值,主进程与子进程可共享这个变量。
    
        p=multiprocessing.Process(target=func,args=(num,))
        p.start()
        p.join()
     
        print(num.value)

    进程之间共享数据(数组型):

    import multiprocessing
     
    def  func(num):
        num[2]=9999   #子进程改变数组,主进程跟着改变
     
    if  __name__=="__main__":
        num=multiprocessing.Array("i",[1,2,3,4,5])   
    
        p=multiprocessing.Process(target=func,args=(num,))
        p.start() 
        p.join()
     
        print(num[:])

    (2) 共享内存 Shared_memory

    如果进程间需要共享对象数据,或共享内容,数据较大,multiprocessing 提供了SharedMemory类来实现进程间实时通信,不需要通过发消息,读写磁盘文件来实现,速度更快。
    注意:直接使用SharedMemory 存在着同抢、泄露隐患,应通过SharedMemory Manager 管程类来使用, 以确保内存安全。

    创建共享内存区:

    multiprocessing.shared_memory.SharedMemory(name=none, create=False, size=0)

    方法:
    父进程创建shared_memory 后,子进程可以使用它,当不再需要后,使用close(), 删除使用unlink()方法
    相关属性:
    获取内存区内容: shm.buf
    获取内存区名称: shm.name
    获取内存区字节数: shm.size

    示例:

    >>> from multiprocessing import shared_memory
    >>> shm_a = shared_memory.SharedMemory(create=True, size=10)
    >>> type(shm_a.buf)
    <class &#39;memoryview&#39;>
    >>> buffer = shm_a.buf
    >>> len(buffer)
    10
    >>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
    >>> buffer[4] = 100                           # Modify single byte at a time
    >>> # Attach to an existing shared memory block
    >>> shm_b = shared_memory.SharedMemory(shm_a.name)
    >>> import array
    >>> array.array(&#39;b&#39;, shm_b.buf[:5])  # Copy the data into a new array.array
    array(&#39;b&#39;, [22, 33, 44, 55, 100])
    >>> shm_b.buf[:5] = b&#39;howdy&#39;  # Modify via shm_b using bytes
    >>> bytes(shm_a.buf[:5])      # Access via shm_a
    b&#39;howdy&#39;
    >>> shm_b.close()   # Close each SharedMemory instance
    >>> shm_a.close()
    >>> shm_a.unlink()  # Call unlink only once to release the shared memory

    3) ShareableList 共享列表

    sharedMemory类还提供了1个共享列表类型,这样就更方便了,进程间可以直接共享python强大的列表
    构建方法:
    multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)

    from multiprocessing import shared_memory
    >>> a = shared_memory.ShareableList([&#39;howdy&#39;, b&#39;HoWdY&#39;, -273.154, 100, None, True, 42])
    >>> [ type(entry) for entry in a ]
    [<class &#39;str&#39;>, <class &#39;bytes&#39;>, <class &#39;float&#39;>, <class &#39;int&#39;>, <class &#39;NoneType&#39;>, <class &#39;bool&#39;>, <class &#39;int&#39;>]
    >>> a[2]
    -273.154
    >>> a[2] = -78.5
    >>> a[2]
    -78.5
    >>> a[2] = &#39;dry ice&#39;  # Changing data types is supported as well
    >>> a[2]
    &#39;dry ice&#39;
    >>> a[2] = &#39;larger than previously allocated storage space&#39;
    Traceback (most recent call last):
      ...
    ValueError: exceeds available storage for existing str
    >>> a[2]
    &#39;dry ice&#39;
    >>> len(a)
    7
    >>> a.index(42)
    6
    >>> a.count(b&#39;howdy&#39;)
    0
    >>> a.count(b&#39;HoWdY&#39;)
    1
    >>> a.shm.close()
    >>> a.shm.unlink()
    >>> del a  # Use of a ShareableList after call to unlink() is unsupported
    
    
    b = shared_memory.ShareableList(range(5))         # In a first process
    >>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
    >>> c
    ShareableList([0, 1, 2, 3, 4], name=&#39;...&#39;)
    >>> c[-1] = -999
    >>> b[-1]
    -999
    >>> b.shm.close()
    >>> c.shm.close()
    >>> c.shm.unlink()

    6、共享内存管理器Manager

    Multiprocessing 提供了 Manager 内存管理器类,当调用1个Manager实例对象的start()方法时,会创建1个manager进程,其唯一目的就是管理共享内存, 避免出现进程间共享数据不同步,内存泄露等现象。

    其原理如下:

    如何在 Python 中使用 multiprocessing 實現進程間通訊呢?

    Manager管理器相当于提供了1个共享内存的服务,不仅可以被主进程创建的多个子进程使用,还可以被其它进程访问,甚至跨网络访问。本文仅聚焦于由单一主进程创建的各进程之间的通信。

    1) Manager的主要数据结构

    相关类:multiprocessing.Manager
    子类有:

    • multiprocessing.managers.SharedMemoryManager

    • multiprocessing.managers.BaseManager

    支持共享变量类型:

    • python基本类型 int, str, list, tuple, list

    • 进程通信对象: Queue, Lock, Event,

    • Condition, Semaphore, Barrier ctypes类型: Value, Array

    2) 使用步骤

    1)创建管理器对象

    snm = Manager()
    snm = SharedMemoryManager()

    2)创建共享内存变量
    新建list, dict

    sl = snm.list(), snm.dict()

    新建1块bytes共享内存变量,需要指定大小

    sx = snm.SharedMemory(size)

    新建1个共享列表变量,可用列表来初始化

    sl = snm.ShareableList(sequence) 如
    sl = smm.ShareableList([‘howdy&#39;, b&#39;HoWdY&#39;, -273.154, 100, True])

    新建1个queue, 使用multiprocessing 的Queue类型

    snm = Manager()
    q = snm.Queue()

    示例 :

    from multiprocessing import Process, Manager
    
    def f(d, l):
        d[1] = &#39;1&#39;
        d[&#39;2&#39;] = 2
        d[0.25] = None
        l.reverse()
    
    if __name__ == &#39;__main__&#39;:
        with Manager() as manager:
            d = manager.dict()
            l = manager.list(range(10))
    
            p = Process(target=f, args=(d, l))
            p.start()
            p.join()
    
            print(d)
            print(l)

    将打印

    {0.25: None, 1: '1', '2': 2}
    [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

    3) 销毁共享内存变量

    方法一:
    调用snm.shutdown()方法,会自动调用每个内存块的unlink()方法释放内存。或者 snm.close()
    方法二
    使用with语句,结束后会自动释放所有manager变量

    >>> with SharedMemoryManager() as smm:
    ...     sl = smm.ShareableList(range(2000))
    ...     # Divide the work among two processes, storing partial results in sl
    ...     p1 = Process(target=do_work, args=(sl, 0, 1000))
    ...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
    ...     p1.start()
    ...     p2.start()  # A multiprocessing.Pool might be more efficient
    ...     p1.join()
    ...     p2.join()   # Wait for all work to complete in both processes
    ...     total_result = sum(sl)  # Consolidate the partial results now in sl

    4) 向管理器注册自定义类型

    managers的子类BaseManager提供register()方法,支持注册自定义数据类型。如下例,注册1个自定义MathsClass类,并生成实例。

    from multiprocessing.managers import BaseManager
    
    class MathsClass:
        def add(self, x, y):
            return x + y
        def mul(self, x, y):
            return x * y
    
    class MyManager(BaseManager):
        pass
    
    MyManager.register(&#39;Maths&#39;, MathsClass)
    
    if __name__ == &#39;__main__&#39;:
        with MyManager() as manager:
            maths = manager.Maths()
            print(maths.add(4, 3))         # prints 7
            print(maths.mul(7, 8))
    # method Description
    queue.qsize() 傳回佇列長度
    queue.full() 佇列滿,傳回True, 否則回傳False
    queue.empty() 佇列空,回傳True , 否則返回False
    queue.put(item) 將資料寫入佇列

    以上是如何在 Python 中使用 multiprocessing 實現進程間通訊呢?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    陳述
    本文轉載於:亿速云。如有侵權,請聯絡admin@php.cn刪除
    Python:自動化,腳本和任務管理Python:自動化,腳本和任務管理Apr 16, 2025 am 12:14 AM

    Python在自動化、腳本編寫和任務管理中表現出色。 1)自動化:通過標準庫如os、shutil實現文件備份。 2)腳本編寫:使用psutil庫監控系統資源。 3)任務管理:利用schedule庫調度任務。 Python的易用性和豐富庫支持使其在這些領域中成為首選工具。

    Python和時間:充分利用您的學習時間Python和時間:充分利用您的學習時間Apr 14, 2025 am 12:02 AM

    要在有限的時間內最大化學習Python的效率,可以使用Python的datetime、time和schedule模塊。 1.datetime模塊用於記錄和規劃學習時間。 2.time模塊幫助設置學習和休息時間。 3.schedule模塊自動化安排每週學習任務。

    Python:遊戲,Guis等Python:遊戲,Guis等Apr 13, 2025 am 12:14 AM

    Python在遊戲和GUI開發中表現出色。 1)遊戲開發使用Pygame,提供繪圖、音頻等功能,適合創建2D遊戲。 2)GUI開發可選擇Tkinter或PyQt,Tkinter簡單易用,PyQt功能豐富,適合專業開發。

    Python vs.C:申請和用例Python vs.C:申請和用例Apr 12, 2025 am 12:01 AM

    Python适合数据科学、Web开发和自动化任务,而C 适用于系统编程、游戏开发和嵌入式系统。Python以简洁和强大的生态系统著称,C 则以高性能和底层控制能力闻名。

    2小時的Python計劃:一種現實的方法2小時的Python計劃:一種現實的方法Apr 11, 2025 am 12:04 AM

    2小時內可以學會Python的基本編程概念和技能。 1.學習變量和數據類型,2.掌握控制流(條件語句和循環),3.理解函數的定義和使用,4.通過簡單示例和代碼片段快速上手Python編程。

    Python:探索其主要應用程序Python:探索其主要應用程序Apr 10, 2025 am 09:41 AM

    Python在web開發、數據科學、機器學習、自動化和腳本編寫等領域有廣泛應用。 1)在web開發中,Django和Flask框架簡化了開發過程。 2)數據科學和機器學習領域,NumPy、Pandas、Scikit-learn和TensorFlow庫提供了強大支持。 3)自動化和腳本編寫方面,Python適用於自動化測試和系統管理等任務。

    您可以在2小時內學到多少python?您可以在2小時內學到多少python?Apr 09, 2025 pm 04:33 PM

    兩小時內可以學到Python的基礎知識。 1.學習變量和數據類型,2.掌握控制結構如if語句和循環,3.了解函數的定義和使用。這些將幫助你開始編寫簡單的Python程序。

    如何在10小時內通過項目和問題驅動的方式教計算機小白編程基礎?如何在10小時內通過項目和問題驅動的方式教計算機小白編程基礎?Apr 02, 2025 am 07:18 AM

    如何在10小時內教計算機小白編程基礎?如果你只有10個小時來教計算機小白一些編程知識,你會選擇教些什麼�...

    See all articles

    熱AI工具

    Undresser.AI Undress

    Undresser.AI Undress

    人工智慧驅動的應用程序,用於創建逼真的裸體照片

    AI Clothes Remover

    AI Clothes Remover

    用於從照片中去除衣服的線上人工智慧工具。

    Undress AI Tool

    Undress AI Tool

    免費脫衣圖片

    Clothoff.io

    Clothoff.io

    AI脫衣器

    AI Hentai Generator

    AI Hentai Generator

    免費產生 AI 無盡。

    熱門文章

    R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
    4 週前By尊渡假赌尊渡假赌尊渡假赌
    R.E.P.O.最佳圖形設置
    4 週前By尊渡假赌尊渡假赌尊渡假赌
    R.E.P.O.如果您聽不到任何人,如何修復音頻
    1 個月前By尊渡假赌尊渡假赌尊渡假赌
    R.E.P.O.聊天命令以及如何使用它們
    1 個月前By尊渡假赌尊渡假赌尊渡假赌

    熱工具

    ZendStudio 13.5.1 Mac

    ZendStudio 13.5.1 Mac

    強大的PHP整合開發環境

    PhpStorm Mac 版本

    PhpStorm Mac 版本

    最新(2018.2.1 )專業的PHP整合開發工具

    SecLists

    SecLists

    SecLists是最終安全測試人員的伙伴。它是一個包含各種類型清單的集合,這些清單在安全評估過程中經常使用,而且都在一個地方。 SecLists透過方便地提供安全測試人員可能需要的所有列表,幫助提高安全測試的效率和生產力。清單類型包括使用者名稱、密碼、URL、模糊測試有效載荷、敏感資料模式、Web shell等等。測試人員只需將此儲存庫拉到新的測試機上,他就可以存取所需的每種類型的清單。

    DVWA

    DVWA

    Damn Vulnerable Web App (DVWA) 是一個PHP/MySQL的Web應用程序,非常容易受到攻擊。它的主要目標是成為安全專業人員在合法環境中測試自己的技能和工具的輔助工具,幫助Web開發人員更好地理解保護網路應用程式的過程,並幫助教師/學生在課堂環境中教授/學習Web應用程式安全性。 DVWA的目標是透過簡單直接的介面練習一些最常見的Web漏洞,難度各不相同。請注意,該軟體中

    VSCode Windows 64位元 下載

    VSCode Windows 64位元 下載

    微軟推出的免費、功能強大的一款IDE編輯器