首頁  >  文章  >  後端開發  >  Python進程間的通訊方式是什麼

Python進程間的通訊方式是什麼

PHPz
PHPz轉載
2023-06-03 14:09:071832瀏覽

什麼是進程的通訊

這裡舉一個通訊機制的例子:我們都很熟悉通信​​這個詞,例如一個人想打電話給他的女友。一旦通話建立,便會形成一個隱式的佇列(請注意這個術語)。此時這個人就會透過對話的方式不停的將訊息告訴女友,而這個人的女友也是在傾聽。我認為在大多數情況下,情況可能是倒過來的。

這裡可以將他們兩個人比作是兩個進程,"這個人"的進程需要將訊息發送給"女友"的進程,就需要一個隊列的幫助。由於女友需要時刻接收隊列中的信息,因此她可以同時進行其他事情,這意味著兩個進程之間的通信主要依賴隊列。

這個隊列可以支援發送訊息與接收訊息,「這個人"負責發送訊息,反之"女友」 負責的是接收訊息。

既然佇列才是重點,那麼就來看看佇列要如何建立。

佇列的建立 - multiprocessing

仍使用 multiprocessing 模組,呼叫該模組的 Queue 函數來實現佇列的建立。

Queue隊列的建立mac_count佇列物件
函數名稱 介紹 參數 傳回值

Queue 函數功能介紹:呼叫Queue 可以創建隊列;它有一個參數mac_count 代表隊列最大可以創建多少訊息,如果不傳遞預設是無限長度。實例化一個佇列物件之後,需要操作這個佇列的物件進行放入與取出資料。 進程之間通訊的方法函數名稱#介紹返回值put將訊息放入佇列message無get取得佇列訊息無str
##########

put 函數功能介紹:將資料傳入。它有一個參數 message ,是一個字串類型。

get 函數功能介紹:用來接收佇列中的資料。 (其實這裡就是常用的json場景,有很多的資料傳輸都是字串的,佇列的插入與取得就是使用的字串,所以json 就非常適用這個場景。)

接下來就來練習一下隊列的使用。

進程間的通訊- 佇列示範案例

程式碼範例如下:

# coding:utf-8


import json
import multiprocessing


class Work(object):     # 定义一个 Work 类
    def __init__(self, queue):      # 构造函数传入一个 '队列对象' --> queue
            self.queue = queue

    def send(self, message):        # 定义一个 send(发送) 函数,传入 message
                                    # [这里有个隐藏的bug,就是只判断了传入的是否字符串类型;如果传入的是函数、类、集合等依然会报错]
        if not isinstance(message, str):    # 判断传入的 message 是否为字符串,若不是,则进行 json 序列化
            message = json.dumps(message)
        self.queue.put(message)     # 利用 queue 的队列实例化对象将 message 发送出去

    def receive(self):      # 定义一个 receive(接收) 函数,不需传入参数,但是因为接收是一个源源不断的过程,所以需要使用 while 循环
        while 1:
            result = self.queue.get()   # 获取 '队列对象' --> queue 传入的message
                                        # 由于我们接收的 message 可能不是一个字符串,所以要进程异常的捕获
            try:                        # 如果传入的 message 符合 JSON 格式将赋值给 res ;若不符合,则直接使用 result 赋值 res
                res = json.loads(result)
            except:
                res = result
            print('接收到的信息为:{}'.format(res))


if __name__ == '__main__':
    queue = multiprocessing.Queue()
    work = Work(queue)
    send = multiprocessing.Process(target=work.send, args=({'message': '这是一条测试的消息'},))
    receive = multiprocessing.Process(target=work.receive)

    send.start()
    receive.start()

使用佇列建立進程間通訊遇到的異常

但是這裡會出現一個報錯,如下圖:

報錯截圖範例如下:

Python進程間的通訊方式是什麼

這裡的報錯提示是檔案沒有被發現的意思。其實這裡是我們使用 隊列做 put() 和 get()的時候 有一把無形的鎖加了上去,就是上圖中圈中的 .SemLock 。我們不需要去關心造成這個錯誤的具體原因,要解決這個問題其實也很簡單。

FileNotFoundError: [Errno 2] No such file or directory 異常的解決

需要阻塞進程的只是send 或receive 子進程中的一個,只要阻塞其中一個即可,這是理論上的情況。但是我們的 receive子程序是一個 while循環,它會一直執行,所以只需要給 send 子程序加上一個 join 即可。

解決示意圖如下:

Python進程間的通訊方式是什麼

PS:雖然解決了報錯問題,但是程式沒有正常退出。

其實由於我們的 receive 進程是個 while循環,並不知道要處理到什麼時候,沒有辦法立刻終止。所以我們需要在 receive 進程 使用 terminate() 函數來終結接收端。

運行結果如下:

Python進程間的通訊方式是什麼

批次給send 函數加入資料

新建一個函數,寫入for迴圈模擬批次新增要傳送的訊息

然後再給這個模擬批次發送資料的函數加入一個執行緒。

範例程式碼如下:

# coding:utf-8


import json
import time
import multiprocessing


class Work(object):     # 定义一个 Work 类
    def __init__(self, queue):      # 构造函数传入一个 '队列对象' --> queue
            self.queue = queue

    def send(self, message):        # 定义一个 send(发送) 函数,传入 message
                                    # [这里有个隐藏的bug,就是只判断了传入的是否字符串类型;如果传入的是函数、类、集合等依然会报错]
        if not isinstance(message, str):    # 判断传入的 message 是否为字符串,若不是,则进行 json 序列化
            message = json.dumps(message)
        self.queue.put(message)     # 利用 queue 的队列实例化对象将 message 发送出去


    def send_all(self):             # 定义一个 send_all(发送)函数,然后通过for循环模拟批量发送的 message
        for i in range(20):
            self.queue.put('第 {} 次循环,发送的消息为:{}'.format(i, i))
            time.sleep(1)



    def receive(self):      # 定义一个 receive(接收) 函数,不需传入参数,但是因为接收是一个源源不断的过程,所以需要使用 while 循环
        while 1:
            result = self.queue.get()   # 获取 '队列对象' --> queue 传入的message
                                        # 由于我们接收的 message 可能不是一个字符串,所以要进程异常的捕获
            try:                        # 如果传入的 message 符合 JSON 格式将赋值给 res ;若不符合,则直接使用 result 赋值 res
                res = json.loads(result)
            except:
                res = result
            print('接收到的信息为:{}'.format(res))


if __name__ == '__main__':
    queue = multiprocessing.Queue()
    work = Work(queue)
    send = multiprocessing.Process(target=work.send, args=({'message': '这是一条测试的消息'},))
    receive = multiprocessing.Process(target=work.receive)
    send_all = multiprocessing.Process(target=work.send_all,)


    send_all.start()    # 这里因为 send 只执行了1次,然后就结束了。而 send_all 却要循环20次,它的执行时间是最长的,信息也是发送的最多的
    send.start()
    receive.start()

    # send.join()       # 使用 send 的阻塞会造成 send_all 循环还未结束 ,receive.terminate() 函数接收端就会终结。
    send_all.join()     # 所以我们只需要阻塞最长使用率的进程就可以了
    receive.terminate()

運行結果如下:

Python進程間的通訊方式是什麼

#從上圖我們可以看到send 與send_all 兩個進程都可以透過queue這個實例化的Queue 物件傳送訊息,同樣的receive接收函數也會將兩個行程傳入的message 列印輸出出來。

小節

在這一章節,我們成功運用佇列實現了跨進程通信,同時也掌握了佇列的操作技巧。一個隊列中,有一端(這裡我們示範的是 send端)透過 put方法實現添加相關的信息,另一端使用 get 方法獲取相關的信息;兩個進程相互配合達到一個進程通信的效果。

除了佇列,進程之間還可以使用管道、信號量和共享記憶體等方式進行通信,如果您有興趣,可以了解這些方法。可以自行拓展一下。

進程間通訊的其他方式- 補充

python提供了多種進程通訊的方式,包括訊號,管道,訊息佇列,信號量,共享內存,socket等

主要Queue和Pipe這兩種方式,Queue用於多個進程間實現通信,Pipe是兩個進程的通信。

1.管道:分為匿名管道和命名管道

匿名管道:在核心中申請一塊固定大小的緩衝區,程式擁有寫入和讀取的權利,一般使用fock函數實現父子程序的通訊

命名管道:在記憶體中申請一塊固定大小的緩衝區,程式擁有寫入和讀取的權利,沒有血緣關係的進程也可以進程間通訊

特點:面向位元組流;生命週期隨核心;自帶同步互斥機制;半雙工,單向通信,兩個管道實現雙向通信

#一種重寫方式是:在操作系統核心中建立一個佇列,佇列包含多個資料封包元素,多個行程可以透過特定句柄來存取該佇列。訊息佇列可以用來將資料從一個行程傳送到另一個行程。每個資料塊被認為是有一個類型,接收者進程接收的資料塊可以有不同的類型。訊息佇列也有管道一樣的不足,就是每個訊息的最大長度是有上限的,每個訊息佇列的總的位元組數是有上限的,系統上訊息佇列的總數也有一個上限

特點:訊息佇列可以被認為是一個全域的一個鍊錶,鍊錶節點中存放著資料封包的類型和內容,有訊息佇列的識別碼進行標記;訊息佇列​​允許一個或多個行程寫入或讀取訊息;訊息佇列的生命週期隨核心;訊息佇列​​可實現雙向通訊

3.信號量:在內核中建立一個信號量集合(本質上是數組),數組的元素(信號量)都是1,使用P操作進行-1,使用V操作1

P(sv):如果sv的值大於零,就給它減1;如果它的值為零,就掛起該程式的執行

V(sv):如果有其他進程因等待sv而被掛起,就讓它恢復運行,如果沒有進程因等待sv而掛起,就給它加1

#PV操作用於同一個進程,實現互斥;PV操作用於不同進程,實現同步

功能:對臨界資源進行保護

4.共享記憶體:將同一塊實體記憶體一塊映射到不同的進程的虛擬位址空間中,實現不同進程間對同一資源的共享。說到進程間通訊方式,共享記憶體可以說是最有用的,也是最快速的IPC形式

特點:不同從用戶態到內核態的頻繁切換和拷貝數據,直接從記憶體中讀取就可以;共享記憶體是臨界資源,所以需要操作時必須要確保原子性。使用信號量或互斥鎖都可以.

以上是Python進程間的通訊方式是什麼的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除