首頁  >  文章  >  後端開發  >  Python 支援重啟的非同步 IO

Python 支援重啟的非同步 IO

高洛峰
高洛峰原創
2016-10-18 11:50:131150瀏覽

摘要


這是一份從Python3.3開始的Python3非同步I/O提案。研究從PEP 3153缺失的具體提議。 這項提案包括了一個可插入式的事件循環API,傳輸和與Twisted相似的協議抽象,以及來自(PEP 380) 基於yield的更高級的調度器。一份作品裡的參考實現,它的程式碼命名為Tulip(Tulip的repo的連結放在文章最後的參考文獻分段)。

介紹


事件循環常用於互通性較高的地方。對於像Twisted、Tornado或ZeroMQ這類(基於Python3.3的)框架,它應該是容易去根據框架的需求通過輕量級封裝或者代理去適配默認的事件循環實現,或者是用它們自己的事件循環實作去替代預設的實作。 (一些像Twisted的框架,擁有多種的事件循環實現。由於這些實現都具有統一的接口,所以這應該不會成為問題。)

事件循環甚至有可能存在兩個不同的第三方框架交互,透過共享預設事件循環實現(各自使用自己的適配器),或者是透過共享其中一個框架的事件循環實現。在後者,兩種不同層級的適配可能會存在(從框架A的事件循環到標準事件循環接口,然後從標準的再到框架B的事件循環)。被使用的事件循環實作應該在主程式的控制之下(儘管提供了事件循環可以選擇的預設策略)。

因此,兩個單獨的API被定義:

獲取和設定當前事件的循環物件;

一個確認事件循環物件的介面和它的最低保證

一個事件循環實作可能提供額外的方法和保證。

事件循環介面不取決於產量,相反,它使用一個回調,額外介面(傳輸協定和協定)以及期貨(?)的組合。後者類似於在PEP3148中定義的接口,但有不同的實作而且不綁定到線程。特別是,它們沒有wait()方法,用戶將使用回調。

對於那些不喜歡用回調函數的人(包括我),(Python)提供了一個寫異步I/O程式碼的調度器作為協同程序,它使用了PEP 380的yield from表達式。這個調度器並不是可插入的;可插入性存在於事件循環級別,同時調度器也應該工作在任何符合標準的事件循環實現上。

對於那些在使用協同程式和其他非同步框架的程式碼的互通性,調度器有一個行為上類似Future的Task類別。一個在事件循環層級進行互動操作的框架能夠透過加入回呼函數到Future中,同時等待一個Future來完成。同樣的,調度器提供一個操作來掛起協同程序,直到回調函數被呼叫。

透過事件循環介面來為執行緒間互動操作提供限制;(Python裡)有一個API能夠提交一個函數到一個能夠傳回相容事件循環的Future的執行器(看 PEP 3148)。

沒有目的的


像Stackless Python或 greenlets/gevent 的系統互通性不是本教學的目的。

規範


依賴


Python3.3是必要的。不需超過Python3.3範圍的新語言或標準函式庫。不需要第三方模組或包。

模組命名空間


這裡的規範會放在一個新的頂層包。不同的組件會放在這個包的不同子模組裡。套件會從各自的子模組中導入常用的API,同時使他們能作為包的可用屬性(類似電子郵件包的做法)。

頂層包的名字目前還沒指定。參考實作使用「tulip」來命名,但這個名字可能會在這個實作加入到標準函式庫的時候改變為其他更煩人的名字(有希望在Python3.4中)。

在煩人的名字選好之前,這篇教學會使用「tulip」作為頂層包的名字。假定沒有給定模組名的類別和函數都透過頂層包來存取。

事件循環策略:取得並設定事件循環


要取得目前的事件循環,可以使用get_event_loop()。這函數傳回一個在下面有定義的EventLoop類別的實例或一個等價的物件。 get_event_loop()可能根據當前線程返回不同的對象,或根據其他上下文的概念返回不同對象。

要設定目前事件循環,可以使用set_event_loop(event_loop),這裡的event_loop是EventLoop類別的實例或是等價的實例物件。這裡使用與get_event_loop()相同的上下文的概念。

還有第三個策略函數:new_event_loop(),有利於單元測試和其他特別的情況,它會建立和傳回一個新的基於該策略的預設規則的EventLoop實例。要使它成為目前的事件循環,你需要呼叫set_event_loop()。

要改變上述三個函數的工作方式(包括他們的上下文的概念),可以透過呼叫set_event_loop_policy(policy),其中參數policy是一個事件循環策略物件。這個策略物件可以是任何包含了類似上面描述的函數表現(get_event_loop(),set_event_loop(event_loop)和new_event_loop())的物件。預設的事件循環策略是DefaultEventLoopPolicy類別的實例。目前事件循環策略物件能夠透過呼叫get_event_loop_policy()來取回。

一個事件循環策略沒強制要求只能有一個事件循環存在。預設的事件循環策略也沒強制要求這樣做,但是它強制要求每個執行緒只能有一個事件循環。

事件循環介面


關於時間:在Python 中,所有的超時(timeout),間隔(interval)和延時(delay)都是以秒來計算的,可以是整型也可以是浮點型。時鐘的精確度依賴於具體的實作;預設使用 time.monotonic()。

關於回呼(callbacks)和處理函數(handlers):如果一個函數接受一個回呼函數和任意個數的變數作為參數,那麼你也可以用一個處理函數物件(Handler)來取代回調函數。這樣的話就不需要再傳遞那些參數。這個處理函數物件應該是一個立即傳回的函數(fromcall_soon()),而不是延遲回傳的(fromcall_later())。如果處理函數已經取消,那麼這個呼叫將無法運作。

一個符合標準的事件循環物件擁有以下的方法:

run()。 執行事件循環,知道沒啥好做了。具體的意思是:

除了取消呼叫外,沒有更多透過call_later(),call_repeatedly(),call_soon(), orcall_soon_threadsafe()這些方法調度的呼叫。

沒有更多的註冊了的文件描述符。 當它關閉的時候會由註冊方來註銷文件描述符。

備註:直到遇到終止條件或呼叫stop(),run()會一直阻塞。

備註: 如果你使用call_repeatedly()來執行一個調用,run()不會在你調用stop()前退出。

需要詳細說明: 有多少類似的真正需要我們做的?

run_forever()。直到呼叫stop()前一直運行事件循環。

run_until_complete(future, timeout=None)。在Future完成前一直執行事件循環。如果給了timeout的值,它會等待timeout的時間。 如果Future完成了,它的結果會回傳或它的異常拋出;如果在逾時前完成Future, 或者stop()被呼叫,會拋出TimeoutError (但Future不會被取消). 在事件循環已經在運行的時候,這個方法不能呼叫。

備註: 這個API比較用來做測試或類似的工作。 它不應該用作從yield from 表達式的future替代品或其他等待一個Future的方法。 (例如 註冊一個完成的回調)。

run_once(timeout=None)。運行事件循環一段事件。 如果給了timeout的值, I/O輪詢會阻塞一段時間; 否則, I/O輪詢不會受時間約束。

備註:準確來說,這裡做了多少工作是根據具體實現的。 一個限制是:如果一個使用call_soon()來直接調度自己,會導致死順壞,run_once()仍然會回傳。

stop()。盡可能快速地停止事件循環。隨後可以使用run()重啟迴圈(或的一個變體)。

備註: 有多塊來停止是根據它具體實現。所有在stop()前已經在運行的直接回呼函數必定仍在運行,但是在stop()調用後的調度的回調函數(或者延遲運行的)不會運行。

close()。關閉事件循環,釋放它所保持的所有資源,例如被epoll()或kqueue()使用的檔案描述符。這個方法不應該在事件循環運行期間呼叫。它可以被多次調用。

call_later(delay, callback, *args)。為callback(*args)安排延遲大約delay秒後調用,一旦,除非被取消了。傳回一個Handler物件代表回呼函數,Handler物件的cancel()方法常用來取消回呼函數。

call_repeatedly(interval, callback, **args)。和call_later()類似,但會在每個interval秒重複呼叫回呼函數,直到傳回的Handler被取消。第一次呼叫是在interval秒內。

call_soon(callback, *args)。類似call_later(0, callback, *args)。

call_soon_threadsafe(callback, *args)。類似call_soon(callback, *args),但是當事件循環在阻塞等待IO的時候在另外的線程調用,事件循環的阻塞會被取消。這是唯一安全地從另外的執行緒呼叫的方法。 (要在一個執行緒安全的方式去延遲一段時間調度回調函數,你可以使用ev.call_soon_threadsafe(ev.call_later,when,callback,*args)。)但它在訊號處理器中呼叫並不安全(因為它可以使用鎖)。

add_signal_handler(sig, callback, *args)。無論何時接收到訊號 ``sigis , callback(*args)會被安排呼叫。傳回一個能來取消訊號回呼函數的Handler。 (取消返回的處理器回導致在下個訊號到來的時候調用remove_signal_handler()。優先明確地調用remove_signal_handler()。)為相同的信號定義另外一個回到函數來替代之前的handler(每個信號只能激活一個handler)。 sig參數必備是一個在訊號模組裡定義的有效的訊號值。如果訊號不能處理,這會拋出一個異常:如果它不是一個有效的訊號或如果它是一個不能捕獲的訊號(例如SIGKILL),就會拋出ValueError。如果這個特別的事件循環實例不能處理訊號(因​​為訊號是每個處理器的全域變量,只有在主執行緒的事件循環才能處理這些訊號),它會拋出RuntimeError。

remove_signal_handler(sig)。為訊號sig移除handler,當有設定的時候。拋出和add_signal_handler()一樣的例外(除了在不能不錯的訊號時回傳False代替拋出RuntimeError)。如果handler移除成功,返回True,如果沒有設定handler則回傳False。

一些符合標準介面回傳Future的方法:

wrap_future(future)。 這裡需要在PEP 3148 所描述的Future (例如一個concurrent.futures.Future的實例) 同時傳回一個相容事件循環的Future (例如, 一個tulip.Future實例)。

run_in_executor(executor, callback, *args)。安排在一個執行器中呼叫callback(*args) (請看 PEP 3148)。傳回的Future的成功的結果是呼叫的回傳值。 這個方法等價於wrap_future(executor.submit(callback, *args))。 如果沒有執行器,則會是也能夠一個預設為5個執行緒的ThreadPoolExecutor。

set_default_executor(executor). 設定一個被run_in_executor()使用的預設執行器。

getaddrinfo(host, port, family=0, type=0, proto=0, flags=0). 類似socket.getaddrinfo()函數,但是傳回一個Future。 Future的成功結果為一列與socket.getaddrinfo()的回傳值有相同格式資料。 預設的實作透過run_in_executor()來呼叫socket.getaddrinfo(),但其他實作可能會選擇使用他們自己的DNS查找。可選參數必需是指定的關鍵字參數。

getnameinfo(sockaddr, flags=0). 類似socket.getnameinfo(),但回傳一個Future。 Future的成功的結果將會是一個(host, port)的陣列。 與forgetaddrinfo()有相同的實作。

create_connection(protocol_factory, host, port, **kwargs). 使用給定的主機和連接埠建立一個流連結。這會創建一個依賴Transport的實現來表示鏈接, 然後調用protocol_factory()來實例化(或取回)用戶的Protocol實現, 然後把兩者綁定到一起。 (看下面對Transport和Protocol的定義。)使用者的Protocol實作透過呼叫無參數(*)的protocol_factory()來建立或取回。傳回值是Future,它的成功結果是(transport, protocol)對; 如果有錯誤阻止創建一個成功的鏈接,Future會包含一個適合的異常集。注意,當Future完成的適合,協議的connection_made()方法不會呼叫;那會發生在連結握手完成的適合。

(*) 沒有要求protocol_factory是一個類別。如果你的協定類別需要定義參數傳遞到建構函數,你可以使用lambda或functool.partial()。你也可以傳入一個之前建構好的Protocol實例的lambda。

可選關鍵參數:

family,proto,flags:地址簇,協議, 和混合了標誌的參數傳遞到getaddrinfo()。這些全是預設為0的。 ((socket類型總是SOCK_STREAM。)

ssl:傳入True來建立一個SSL傳輸(透過預設一個無格式的TCP來建立)。或傳入一個ssl.SSLConteext物件來重載預設的SSL上下文物件來建立)。或傳入一個ssl.SSLConteext物件來重載預設的SSL上下文物件來使用。 ,無參數(*)的protocol_factory被呼叫來創建一個Protocol,一個代表了連結網路端的Transport會被創建,以及兩個物件透過呼叫protocol.connection_made(transport)綁定到一起。面對create_connection()的補充說明。

family,proto,flags:地址簇,協議, 和混合了標誌的參數傳遞到getaddrinfo()。我並不知道怎麼來非同步地支援(SSL),我建議這需要一個憑證。

補充:也許可以使一個Future的結果對象能夠用來控制服務循環,例如停止服務,終止所有的活動連接,以及(如果支援)調整積壓(的服務)或其他參數?它也可以有一個API來查詢活動連線。另外,如果循環因為錯誤而停止服務,或者如果不能啟動,則回傳一個僅僅完成了的Future(子類別?)?取消了它可能會導致停止循環。

補充:有些平台可能沒興趣實現所有的這些方法, 例如 移動APP就對start_serving()不太感興趣。 (儘管我的iPad上有一個Minecraft的伺服器...)

以下這些註冊檔案描述符的回呼函數的方法不是必需的。如果沒有實作這些方法,存取這些方法(而不是呼叫它們)時會傳回屬性錯誤(AttributeError)。預設的實作提供了這些方法,但是使用者一般不會直接用到它們,只有傳輸層獨家使用。同樣,在 Windows 平台,這些方法不一定實現,要看到底是否使用了 select 或 IOCP 的事件循環模型。這兩個模型接收整型的檔案描述符,而不是 fileno() 方法傳回的物件。檔案描述符最好是可查詢的,例如,磁碟檔案就不行。

add_reader(fd, callback, *args). 在檔案描述符 fd 準備好可以進行讀取操作時呼叫指定的回調函數 callback(*args)。傳回一個處理函數對象,可以用來取消回呼函數。請注意,不同於 call_later(),這個回呼函數可以被多次呼叫。在同一個檔案描述子上再次呼叫 add_reader() 將會取消先前設定的回呼函數。注意:取消處理函數有可能會等到處理函數呼叫後。如果你要關閉 fd,你應該呼叫 remove_reader(fd)。 (TODO:如果已經設定了處理函數,拋出一個異常)。

add_writer(fd, callback, *args).  類似 add_reader(),不過是在可以寫入操作之前呼叫回呼函數。

remove_reader(fd). 為檔案描述子 fd 刪除已設定的讀取操作回呼函數。如果沒有設定回呼函數,則不進行操作。 (提供這樣的替代介面是因為記錄檔案描述符比記錄處理函數更方便簡單)。刪除成功則回傳 True,失敗則傳回 False。

remove_writer(fd).  為檔案描述子 fd 刪除已設定的寫入操作回呼函數。

未完成的:如果一個檔案描述子裡麵包含了多個回呼函數,那該怎麼辦呢?目前的機制是替換先前的回呼函數,如果已經註冊了回呼函數則應該招募異常。

接下來下面的方法在socket的非同步I/O中是可選的。他們是替代上面提到的可選方法的,目的是在Windows的傳輸實作中使用IOCP(如果事件循環支援)。 socket參數必需是不阻塞socket。

sock_recv(sock, n)。從套接字sock接收位元組。回傳一個Future,Future在成功的時候會是一個位元組物件。

sock_sendall(sock, data)。發送位元組資料到套接字sock。回傳一個Future,Future的結果在成功後會是None。 (補充:讓它去模擬sendall()或send()會更好嗎?但是我認為sendall()——也許它扔應該命名為send()?)

sock_connect(sock, address)。連接到給定的地址。回傳一個Future,Future的成功結果就是None。

sock_accept(sock)。從socket接收一個連結。這socket必需在監聽模式以及綁定到一個定制。回傳一個Future,Future的成功結果會是一個(conn,peer)的數組,conn是一個已連接的無阻塞socket以及peer是對等的位址。 (補充:人們告訴我這個API風格對於高水準的伺服器是很慢的。所以上面也有start_sering()。我們還需要這個嗎?)

補充:可選方法都不是太好的。也許這些都是需要的?它仍然依賴平台的更有效設定。另外的可能是:文件標註這些“僅提供給傳輸”,然後其他的“可提供給任何的情況”。

回呼順序


當在同一時間調度兩個回調函數時,它們會按照註冊的順序去執行。例如:

ev.call_soon(foo)

ev.call_soon(bar)

保證foo()會在bar()執行。

如果使用call_soon(),即使系統時鐘要逆行,這個保證還是成立的。這同樣對call_later(0,callback,*args)有效。然而,如果在系統時脈要逆行下,零延遲地使用call_later(),那就無法得到保證了。 (一個好的事件循環實作應該使用time.monotonic()來避免系統時鐘逆行的情況所導致的問題。參考 PEP 418 。)

上下文


所有的事件循環都有上下文的概念。對於預設的事件循環實作來說,上下文就是一個執行緒。一個事件循環實作應該在相同的上下問中運行所有的回呼。一個事件循環實作應該在同一時刻只運行一個回調,所以,回調要負責保持與相同事件循環裡調度的其他回呼自動互斥。

異常


在Python裡有兩類異常:從Exception類別到處的和從BaseException導出的。從Exception導出的異常通常能適當地被捕獲和處理;例如,異常會透過Future傳遞,以及當他們在一個回調了出現時,會被記錄和忽略。

然而,從BaseException到處的異常從來不會被捕獲到,他們通常帶有一個錯誤回溯訊息,同時導致程式終止。 (這類的例子包括KeyboardInterrupt和SystemExit;如果把這些異常與其他大部分異常同樣對待,那時不明智的)。

Handler類別


有各樣註冊回呼函數的方法(例如call_later())都會傳回一個物件來表示註冊,改物件能夠用來取消回呼函數。儘管使用者從來不用去實例化這個類,但還是想要給這個物件一個好的名字:Handler。這個類別有一個公用的方法:

cancel(). 嘗試取消回呼函數。 補充:準確的規範。

只讀的公共屬性:

callback。 要被呼叫的回調函數。

args。呼叫回調函數的參數數組。

cancelled。如果cancel()表呼叫了,它的值為True。


要注意的是一些回調函數(例如透過call_later()註冊的)意味著只會被呼叫一次。其他的(如透過add_reader()註冊的)意味著可以多次被呼叫。

補充:一個呼叫回呼函數的API(是否有必要封裝異常處理)?它是不是要記錄自己被呼叫了多少次?也許這個API應該是_call_()這樣? (但它應該抑制異常。)

補充:當回調函數在調度的時候有沒有一些公共的屬性來記錄那些即時的值? (因為這需要一些方法來把它保存到堆裡面的。)

Futures


ulip.Future 特意設計成和 PEP 3148中的 concurrent.futures.Future 類似,只是有細微的不同。這個PEP中談到 Future 時,都是指 tulip.Future,除非明確指定是  concurrent.futures.Future。 tulip.Future支援的公開API如下,同時也指出了和 PEP 3148 的不同:

cancel().  如果該 Future 已經完成(或者被取消了),則返回 False。否則,將該 Future 的狀態改為取消狀態(也可以理解成已完成),調度回呼函數,並傳回 True。

cancelled(). 如果該 Future 已經被取消了,回傳 True。

running(). 總是回傳False。和 PEP 3148 不同,這裡沒有 running 狀態。

done(). 如果該Future已經完成了,返回True。注意:取消了的Future也認為是已經完成了的(這裡和其他地方都是這樣)。

result(). 傳回 set_result() 設定的結果,或傳回 set_exception() 設定的例外。如果已經被取消了,則拋出CancelledError。和 PEP 3148 不同,這裡沒有超時參數,不等待。如果該Future尚未完成,則拋出一個異常。

exception(). 同上,回傳的是異常。

add_done_callback(fn).  新增一個回呼函數,在Future完成(或取消)時執行。如果該Future已經完成(或取消),則透過 call_soon() 來調度回呼函數。不同於 PEP 3148,新增的回呼函數不會立即被調用,而且總是會在調用者的上下文中運行。 (典型地,一個上下文是一個線程)。你可以理解為,使用 call_soon() 來呼叫該回呼函數。注意:新增的回呼函數(不同於本PEP其他的回呼函數,且忽略下面"回呼風格(Callback Style)"小節的約定)總是會接收到一個 Future 作為參數,且這個回呼函數不應該是 Handler 物件。

set_result(result). T該Future不能處於完成(或取消)狀態。這個方法將使當前Future進入完成狀態,並準備呼叫相關的回呼函數。不同於 PEP 3148:這是一個公開的API。

set_exception(exception).  同上,設定的是異常。

內部的方法 set_running_or_notify_cancel() 不再被支援;現在已經沒有方法直接設定成 running  狀態。

這個PEP定義了以下的異常:

InvalidStateError. 當呼叫的方法不接受這個Future 的狀態時,將會拋出該異常(例如:在一個已經完成的Future 中調用set_result() 方法,或者在一個未完成的Future 中呼叫result()方法)。

InvalidTimeoutError. 當呼叫 result() 或 exception() 時傳遞一個非零參數時拋出該異常。

CancelledError.  concurrent.futures.CancelledError 的別名。在一個已經取消的 Future 上面呼叫 result() 或 exception() 方法時拋出該例外。

TimeoutError. concurrent.futures.TimeoutError 的別名。有可能由 EventLoop.run_until_complete() 方法拋出。

建立一個 Future 時將會與預設的事件循環關聯起來。 (尚未完成的:允許傳遞一個事件循環作為參數?)。

concurrent.futures 套件裡面的 wait() 和 as_completed() 方法不接受 tulip.Future 物件作為參數。然而,有類似的 API tulip.wait() 和 tulip.as_completed(), 如下所述。

在子程式(coroutine)中可以將 tulip.Future 物件套用到 yield from 表達式中。這個是透過 Future 中的 __iter__() 介面來實現的。請參考下面的「子程式和調度器」小節。

當Future 物件被回收時,如果有相關聯的異常但是並沒有呼叫result() 、 exception() 或__iter__() 方法(或者說產生了異常但是還沒有拋出),那麼應該將該異常記錄到日誌中。 TBD:記錄成什麼等級?

將來,我們可能會把 tulip.Future 和 concurrent.futures.Future 統一起來。例如,為後面個物件新增一個 __iter__() 方法,以支援 yield from 表達式。為了防止意外呼叫尚未完成的 result() 而阻塞事件循環,阻塞機制需要偵測目前執行緒是否存在活動的事件循環,否則拋出例外。然而,這個PEP為了盡量減少外部依賴(只依賴 Python3.3),所以目前不會對 concurrent.futures.Future 做出改變。

傳輸層


傳輸層是指基於 socket 或其他類似的機制(例如,管道或SSL連接)的抽象層。這裡的傳輸層深受 Twisted 和 PEP 3153 的影響。使用者很少會直接實作或實例化傳輸層,事件循環提供了設定傳輸層的相關方法。

傳輸層是用來和協定一起工作的。典型的協定不關心底層傳輸層的具體細節,而傳輸層可以用來和多種的協定一起工作。例如,HTTP 用戶端實作可以使用普通的 socket 傳輸層,也可以使用SSL傳輸層。普通 socket 傳輸層可以與 HTTP 協定以外的大量協定一起工作(例如,SMTP, IMAP, POP, FTP, IRC, SPDY)。

大多數連線有不對稱特性:客戶端和伺服器通常有不同的角色和行為。因此,傳輸層和協定之間的介面也是不對稱的。從協定的視角來看,發送資料是透過呼叫傳輸層物件的 write() 方法來完成。 write() 方法將資料放進緩衝區後立即傳回。在讀取資料時,傳輸層將扮演一個更主動的角色:當從 socket(或其他資料來源) 接到資料後,傳輸層將呼叫協定的 data_received() 方法。

傳輸層有以下公開方法:

write(data). 寫入資料。參數必須是一個 bytes 物件。返回 None。傳輸層可以自由快取 bytes 數據,但必須確保資料傳送到另一端,並且維護資料流的行為。即:t.write(b'abc'); t.write(b'def') 等價於t.write(b'abcdef'),也等價於:

t.write(b'a')

t.write(b'b')

t.write(b'c')

t.write(b'd')

t.write(b'e')

t.write(t.write(t.write(t.write( b'f')

writelines(iterable). 等價於:

for data in iterable:

   self.write(data)

write_eof()。 ) 方法。當所有緩衝的資料傳輸之後,傳輸層將會向另一端發出訊號,表示已經沒有其他資料了。有些協定不支援此操作;那樣的話,呼叫 write_eof() 將會拋出異常。 (注意:這個方法以前叫做half_close(),除非你明確知道具體含義,這個方法名字並不明確表示哪一端會被關閉。)

can_write_eof().  如果協議支持write_eof(),返回True ;否則返回False。 (當write_eof() 不可用時,有些協定需要改變相應的行為,所以需要這個方法。例如,HTTP中,為了傳送目前大小未知的數據,通常會使用write_eof() 表示資料傳送完畢。但是,SSL 不支援這種行為,對應的HTTP協定實作需要使用分段(chunked)編碼。 . 暫停發送數據,直接呼叫了resume() 方法。在 pause() 調用,再到調用 resume() 之間,不會再調用協定的 data_received() 方法。在 write()  方法中無效。

resume(). 使用協定的 data_received() 重新開始傳輸資料。

close(). 關閉連線。在所有使用 write() 緩衝好的資料傳送完畢之前,不會關閉連線。連線關閉後,協定的 data_received() 方法不會再被呼叫。當所有緩衝的資料傳送完畢後,將會以 None 作為參數呼叫協定的 connection_lost() 方法。注意:這個方法不確保會呼叫上面所有的方法。

abort(). 中斷連線。所有在緩衝區內尚未傳輸的資料都會被丟棄。不久後,協定的 connection_lost() 將會被調用,傳入一個 None 參數。 (待定:在 close(), abort() 或另一端的關閉動作中,對 connection_lost() 傳入不同的參數? 或添加一個方法專門用來查詢這個? Glyph 建議傳入不同的異常)

尚未完成的:提供另一種流量控制的方法:傳輸層在緩衝區資料成為負擔的情況下有可能暫停協定層。建議:如果協定有 pause() 和 resume() 方法的話,允許傳輸層呼叫它們;如果不存在,則協定不支援流量控制。 (對 pause() 和 resume(),可能協定層和傳輸層使用不同的名稱會好一點?)

協定 (Protocols)


協定通常和傳輸層一起配合使用。這裡提供了幾個常用的協定(例如,幾個有用的HTTP客戶端和伺服器實作),大多數協定需要使用者或第三方函式庫來實作。

一個協定必須實作以下的方法,這些方法將會被傳輸層呼叫。這些回調函數將會被事件循環在正確的上下文中呼叫(參考上面的上下文("Context")小節)。

connection_made(transport). 表示傳輸層已經準備好且連接到一個實作的另一端。協定應該將傳輸層引用作為一個變數保存(這樣後面就可以呼叫它的 write() 及其他方法),也可以在這時發送握手請求。

data_received(data). 傳輸層已經從讀取了部分資料。參數是一個不為空的 bytes 物件。這個參數的大小並沒有明確的限制。 p.data_received(b'abcdef') 應該等價於下面的語句:

p.data_received(b'abc')

p.data_received(b'def')

eof_received(). write_eof() 或其他等價的方法時,這個方法將會被呼叫。預設的實作將呼叫傳輸層的 close() 方法,close() 方法呼叫了協定中的 connection_lost() 方法。

connection_lost(exc). 傳輸層已經關閉或中斷了,另一端已經安全地關閉了連接,或者發生了異常。在前三種情況中,參數為 None;在發生了異常的情況下,參數是導致傳輸層中斷的異常。 (待定:是否需要區分對待前三種情況?)

這裡是一個表示了調用的順序和多樣性的圖:

connection_made()-- exactly once

data_received()-- zero or more times

data_received()-- zero or more times

data_received()-- zero or more times

data_received()-- zero or more times

eof_received()-- at most once

connection_lost()-- exactly once

補充: 討論用戶的程式碼是否要做一些事情保證協議和傳輸協議不會被過早地GC(垃圾回收)。

回呼函數風格

大部分介面採取回呼函數也採取位置參數。舉例來說,要安排foor("abc",42)馬上調用,你可以調用ev.call_soon(foo,"abc",42)。要計劃呼叫foo(),則使用ev.call_soon(foo)。這種約定大大減少了需要典型的回呼函數程式設計的小lambda表達式的數量。

這種約定明確的不支援關鍵字參數。關鍵字參數常用來傳遞可選的關於回呼函數的額外資訊。這允許API的優雅的改革,不用擔心是否一個關鍵字在某些地方被一個呼叫者聲明。如果你有一個必要使用關鍵字參數呼叫的回呼函數,你可以使用lambda表達式或functools.partial。例如:

ev.call_soon(functools.partial(foo, "abc", repeat=42))

選擇一種事件循環的實作方式

待完成。 (關於使用select/poll/epoll,以及如何改變選擇。屬於事件循環策略)

協程和調度器


這是一個獨立的頂層部分,因為它的狀態與事件循環介面不同。協程是可選的,而且只用回呼的方式來寫程式碼也是很好的。另一方面,只有一種調度器/協程的API實現,如果你選擇了協程,那就是你唯一要使用的。

協同程序

一個協同程序是遵循一下約定的生產者。為了良好的文件的目的,所有的協同程序應該用@tulip.coroutine來修飾,但這並沒嚴格要求。

協同程式使用在 PEP 380 裡介紹的yield from語法啦來取代原始的yield語法。

這個「協同程序」的字和「生產者」的意思類似,是用來描述兩個不同(儘管是有關係)的概念。

定義了協同程序的函數(使用tulip.coroutine修飾定義的一個函數)。如果要消除歧義的話,我們可以稱這個函數為協同程式函數(coroutine function)。

透過一個協同函數獲得物件。這個物件代表了一個最終會完成的計算或I/O操作(通常兩者都有)。我們可以稱這個物件為協同程式物件(coroutine object)來消除歧義。

協程能做的事情:

結果= 從future使用yield-- 直到future完成,掛起協程,然後返回future的結果,或者拋出它要傳遞的異常。

🎜結果 = 從coroutine使用yield--等待另外的協程產生結果 (或拋出一個要傳遞的異常)。這協程的異常必須是對另一個協同程序的呼叫。 🎜🎜回傳結果-- 為使用yield from表達式等待結果的協程傳回一個結果。 🎜🎜拋出異常-- 在協程裡為使用yield from表達式等待的(程式)拋出一個異常。 🎜

呼叫一個協程不會馬上執行它的程式碼-它只是一個生產者,同時透過呼叫而傳回的協程確實只是一個生產者對象,它在你迭代它之前不會做任何事。對於協程來說,有兩種基本的方法來讓它開始運行:從別的協程裡調用yield(假設另外的協程已經在運行了!),或者把它轉換為一個Task(看下面) 。

協程只有在事件循環在運作時才能運作。

等待多個協同程序


有兩個類似wait()和as_completed(),在包concurrent.futures裡的API提供來等待多個協同程序或者Future:

tulip.wait(fs, timeout=None, return_when=ALL_COMPLETED)。 這是一個由fs提供等待Future或其他協同程式完成的協同程式。協同程式參數會封裝在Task裡(看下面)。這個方法會傳回一個Future,Future的成功結果是一個包含了兩個Future集的元組(做完(done),掛起(pending)),done是一個原始Future(或封裝過的協同程式)的集合表示完成(或取消),以及pending表示休息,例如還沒完成(或取消)。可選參數timeout和return_when與concurrent.futures.wait()裡的參數都有相同的意思和預設值:timeout,如果不是None,則為全部的操作指定一個timeout;return_when,指定何時停止。常數FIRST_COMPLETED,FIRST_EXCEPTION,ALL_COMPLETED使用相同的值定義同時在 PEP 3148裡有相同的意思:

ALL_COMPLETED(default): 等待,知道所有的Future處理了或者完成了 (或者直到超時發生)。

FIRST_COMPLETED: 等待,知道至少一個Future做好了或取消(或直到超時發生)。

FIRST_EXCEPTION: 等待,知道至少一個Future由於異常而做好(非取消) (這種從過濾器中排除取消的Future是很神奇的,但PEP 3148 就用這種方法裡做了。)

tulip.as_completed(fs, timeout=None). 傳回一個值為Future的迭代器; 等待成功的值,直到下一個Future或協同程式從fs完成都處於等待狀態, 同時傳回它自己的結果(或拋出它的異常)。可選參數timeout與concurrent.futures.wait()裡的參數都有相同的意思和預設值: 當存在超時的時候, 下一個由迭代器傳回的Future在等待的時候會拋出異常TimeoutError。 使用欄位:

for f in as_completed(fs):

   result = yield from f  # May raise an exception.

   

result.


任務(Task)是一個管理獨立運行子程序的物件。 Task介面和Future介面是一樣的。如果子程式完成或拋出異常,那麼與之關聯的任務也就完成了。回傳的結果就是對應任務的結果,拋出的異常就是對應任務的異常。

如果取消一個尚未完成的任務,將會阻止關聯的子程式繼續執行。在這種情況下,子程式將會接收到一個異常,以便更好地處理這個取消命令。當然,子程式不一定要處理這個異常。這個機制透過呼叫產生器標準的  close() 方法,這個在 PEP 342 中有描述。

任務對於子程式間的交互作用以及基於回呼的框架(例如 Twisted)也很有用。將一個子程序轉換成任務之後,就可以將回呼函數加到任務裡。

你可能會問,為什麼不將所有子程式都轉換成任務? @tulip.coroutinedecorator 可以實現這個任務。這樣的話,如果一個子程式呼叫了另外的子程式(或其他複雜情況),那麼整個程式將會變得相當緩慢。在複雜情況下,保持子程序比轉換成任務要快得多。

調度器

調度器沒有公開的介面。你可以使用 future 和 task 的 yield 和調度器進行互動。實際上,調度器並沒有一個具體的類別實作。透過使用事件循環的公共接口,Future 和 Task 類別表現了調度器的行為。所以即使換了第三方的事件循環實現,調度器特性也可以使用。

睡眠


尚未完成的:yield sleep(秒). 可以用 sleep(0) 來掛起並查詢I/O。

協同程序與協議


使用協同程序去實現協議的最好方法坑農是使用一個流緩存,這緩存使用data_received()來填充數據,同時能夠使用像read(n)和readline( )等回傳一個Future的方法來非同步讀取資料。當連結關閉時,read()方法應該傳回一個Future,這Future的結果會是'',或者如果connection_closed()由於異常而被調用,則拋出一個異常。

要寫入資料的話,在transport上的write()(及同類型的)方法能夠使用-這些不會傳回一個Future。應該提供用於設定和在呼叫了connection_made()時開始協同程序的一個標準的協定實作。

補充:更多的規範。

取消

補充。當一個任務被取消了,它的協同程序會在它從調度程序中放棄的任何一個地方中看到異常(例如可能在操作中放棄)。我們需要講清楚要拋出什麼異常。

再次補充:超時。

已知問題


調試的API? 例如 一些能夠記錄大量材料的或記錄不常用條件的 (就像隊列填充比排空快)或甚至回調函數耗費大量時間...

我們需要內省的API嗎?例如請求讀回調函數傳回一個檔案描述符。或當下一個調度的(回調函數)被呼叫時。或由回呼函數註冊了的一些檔案描述符。

傳輸可能需要一個方法來試著回傳socket的位址(和另外的方法回傳對等的位址)。儘管這是依賴socket的類型,同時這並不總是一個socket;然後就應該返回None。 (作為選擇,可以有個方法來返回socket自身——但可以想到一個沒有使用socket來實現IP鏈接,那它應該怎麼做呢?)

需要處理os.fokd()。 (這可能在Tulip的情況下會上升到選擇器類別。)

也許start_serving()需要一個方法來傳入到一個目前的socket(例如gunicorn就需要這個)。 create_connection()也有同樣的問題。

我們或許會介紹一些明確的鎖,儘管使用起來有點痛苦,因為我們不能使用with鎖:阻塞語法(因為要等待一個鎖的話,我們就要用yield from了,這是with語句不能做到的)。

是否要支援資料報協定、連結。可能要更多的套接字I/O方法,例如sock_sendto()和sock_recvfrom()。或是使用者客戶自己寫(這不是火箭科學)。有它的理由去覆寫write(),writelines(),data_received()到一個單一的資料報? (Glyph推薦後者。)然後用什麼來代替write()?最後,我們需要支援無連線資料報協定嗎? (這意味著要封裝sendto()和recvfrom()。)

我們可能需要API來控制各種超市。例如我們可能想要限制在解析DNS、連結、SSL握手、空閒連結、甚至是每個會話裡消耗的時間。也許有能充分的加入timeout的關鍵字參數到一些方法裡,和其他能夠巧妙地透過call_later和Task.cancel()來實現逾時。但可能有些方法需要預設的超時時間,同時我們可能會想改變這種規範的全域操作的預設值。 (例如,每個事件循環)。

一個NodeJS風格的事件觸發器?或把這個當作一個單獨的教學?這其實在使用者空間做是夠容易的,雖然可能放在標準化裡好一點(看看https://github.com/mnot/thor/blob/master/thor/events.py和https://github.com /mnot/thor/blob/master/doc/events.md 舉的例子。

PEP 3148 描述concurrent.futures.Future.

PEP 3153,雖然被拒絕了, 但很好的描述了分離傳輸和協議的需要。

Tulip repo: http://code.google.com/p/tulip/


Nick Coghlan在一些背景下寫的一個很好的博客條目,關於異步I/O不同處理、gevent、以及怎樣使用future就像wihle、for和with的概念的思想, : http://python-notes.boredomandlaziness.org/en/latest/pep_ideas/async_programming.html

TBD: 有關Twisted、Tornado、ZeroMQ、pyftpdlib、libevent、libftpd 、pyev、libuv、wattle等的參考

鳴謝

除了PEP 3153之外, 受到的影響包括PEP 380 and Greg Ewing的yield from的教程, Twisted, Tornforna, Zerod, pyftp企圖是想把這些全部綜合起來), wattle (Steve Dower的相反的提議), 2012年9月到12月在python-ideas上大量討論, 一個與Steve Dower和Dino Viehland在Skype上的會話, 和Ben Darnell的電子郵件交流, 一個Niels Provos的聽眾(libevent原始作者),兩場和幾個Twisted開發者的面對面會議, 包括了Glyph、Brian Warner、David Reid、 以及Duncan McGreggor。同樣的,作者前期在為Google App Engine的NDB函式庫的非同步支援也有重要的影響。

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn